This commit is contained in:
2025-11-14 00:34:28 +01:00
parent 340d1b69f9
commit 8acfbf1215
5 changed files with 163 additions and 25 deletions

View File

@@ -0,0 +1,68 @@
//go:build linux || darwin
// +build linux darwin
package udp
import (
"fmt"
"net"
"golang.org/x/sys/unix"
)
func listenUDP(bindAddr string, workerCount int) ([]*net.UDPConn, error) {
addr, err := net.ResolveUDPAddr("udp", bindAddr)
if err != nil {
return nil, fmt.Errorf("cannot resolve address: %w", err)
}
conns := make([]*net.UDPConn, 0, workerCount)
for i := 0; i < workerCount; i++ {
conn, err := net.ListenUDP("udp", addr)
if err != nil {
// En cas derreur, on ferme ce quon a ouvert.
for _, c := range conns {
_ = c.Close()
}
return nil, fmt.Errorf("cannot listen on address (worker %d): %w", i, err)
}
rawConn, err := conn.SyscallConn()
if err != nil {
_ = conn.Close()
for _, c := range conns {
_ = c.Close()
}
return nil, fmt.Errorf("cannot get raw connection (worker %d): %w", i, err)
}
var sockErr error
err = rawConn.Control(func(fd uintptr) {
sockErr = unix.SetsockoptInt(
int(fd),
unix.SOL_SOCKET,
unix.SO_REUSEPORT,
1,
)
})
if err != nil {
_ = conn.Close()
for _, c := range conns {
_ = c.Close()
}
return nil, fmt.Errorf("control error (worker %d): %w", i, err)
}
if sockErr != nil {
_ = conn.Close()
for _, c := range conns {
_ = c.Close()
}
return nil, fmt.Errorf("cannot set SO_REUSEPORT (worker %d): %w", i, sockErr)
}
conns = append(conns, conn)
}
return conns, nil
}

View File

@@ -0,0 +1,24 @@
//go:build windows
// +build windows
package udp
import (
"fmt"
"net"
)
func listenUDP(bindAddr string, workerCount int) ([]*net.UDPConn, error) {
addr, err := net.ResolveUDPAddr("udp", bindAddr)
if err != nil {
return nil, fmt.Errorf("cannot resolve address: %w", err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, fmt.Errorf("cannot listen on address: %w", err)
}
// Un seul conn partagé par tous les workers.
return []*net.UDPConn{conn}, nil
}

View File

@@ -12,7 +12,7 @@ type Server struct {
bindAddr string
routingTable *RoutingTable
conn *net.UDPConn
conns []*net.UDPConn
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
@@ -28,40 +28,44 @@ func NewServer(bindAddr string) *Server {
}
}
func (s *Server) run() error {
add, err := net.ResolveUDPAddr("udp", s.bindAddr)
if err != nil {
return fmt.Errorf("cannot resolve address: %w", err)
}
func (s *Server) Run() error {
workerCount := runtime.NumCPU()
s.conn, err = net.ListenUDP("udp", add)
conns, err := listenUDP(s.bindAddr, workerCount)
if err != nil {
return fmt.Errorf("cannot listen on address: %w", err)
}
if len(conns) == 0 {
return fmt.Errorf("no UDP connections created")
}
s.conns = conns
for _, conn := range s.conns {
conn.SetReadBuffer(8 * 1024 * 1024)
conn.SetWriteBuffer(8 * 1024 * 1024)
}
s.conn.SetReadBuffer(8 * 1024 * 1024)
s.conn.SetWriteBuffer(8 * 1024 * 1024)
fmt.Println("Listening on", s.bindAddr)
for i := 0; i < runtime.NumCPU(); i++ {
for i, conn := range s.conns {
s.wg.Add(1)
// todo : add so_reuseport option when on unix like system
go s.workerLoop(i)
go s.workerLoop(i, conn)
}
return nil
}
func (s *Server) sendTo(data []byte, addr *net.UDPAddr) error {
if s.conn == nil {
if len(s.conns) == 0 || s.conns[0] == nil {
return fmt.Errorf("server not started")
}
_, err := s.conn.WriteToUDP(data, addr)
// On utilise la première conn pour lenvoi (cest suffisant pour UDP).
_, err := s.conns[0].WriteToUDP(data, addr)
return err
}
func (s *Server) workerLoop(id int) {
func (s *Server) workerLoop(id int, conn *net.UDPConn) {
defer s.wg.Done()
buffer := make([]byte, 1500)
@@ -73,7 +77,7 @@ func (s *Server) workerLoop(id int) {
fmt.Println("Worker", id, "stopped")
return
default:
size, addr, err := s.conn.ReadFromUDP(buffer)
size, addr, err := conn.ReadFromUDP(buffer)
if err != nil {
if s.ctx.Err() != nil {
return
@@ -81,7 +85,7 @@ func (s *Server) workerLoop(id int) {
if opErr, ok := err.(*net.OpError); ok && opErr.Temporary() {
continue
}
fmt.Printf("Error reading from UDP: %v\n", err)
fmt.Printf("Error reading from UDP (worker %d): %v\n", id, err)
continue
}
@@ -91,13 +95,14 @@ func (s *Server) workerLoop(id int) {
}
func (s *Server) handlePacket(data []byte, addr *net.UDPAddr) {
if len(data) == 0 {
return
}
pt := PacketType(data[0])
switch pt {
case PacketTypePing:
err := s.sendTo([]byte{byte(PacketTypePing)}, addr)
if err != nil {
return
}
_ = s.sendTo([]byte{byte(PacketTypePing)}, addr)
return
case PacketTypeConnect:
return
@@ -105,11 +110,9 @@ func (s *Server) handlePacket(data []byte, addr *net.UDPAddr) {
return
case PacketTypeVoiceData:
// todo : déterminer le format du packet
//channelID := string(data[1:5])
// channelID := string(data[1:5])
return
default:
return
}
}