diff --git a/app/app.go b/app/app.go index fb758ce..5bd8564 100644 --- a/app/app.go +++ b/app/app.go @@ -4,16 +4,23 @@ import ( "context" "fmt" "go_oxspeak_server/config" + "go_oxspeak_server/database" + "go_oxspeak_server/models" "go_oxspeak_server/network/http" "go_oxspeak_server/network/udp" "os" "os/signal" "syscall" + + "gorm.io/gorm" ) type App struct { cfg *config.Config + // DB + db *gorm.DB + // Serveurs udpServer *udp.Server httpServer *http.Server @@ -54,7 +61,25 @@ func (app *App) Run() error { // Context pour gérer l'arrêt gracieux defer app.cancel() - // Lancer les app ici + dbConfig := database.DBConfig{ + Driver: app.cfg.Database.Type, + DSN: app.cfg.GetDSN(), + } + + // 1) Initialiser la DB + if err := database.Initialize(dbConfig); err != nil { + return fmt.Errorf("failed to initialize database: %w", err) + } + + // (optionnel) garder une référence locale si tu veux utiliser app.db ailleurs + app.db = database.DB + + // 2) Lancer les migrations en utilisant le registry des modèles + if err := database.AutoMigrate(models.All()...); err != nil { + return fmt.Errorf("failed to auto-migrate database: %w", err) + } + + // 3) Lancer les workers / serveurs go app.runWorkers() fmt.Println("App started, press CTRL+C to stop...") @@ -69,6 +94,10 @@ func (app *App) Run() error { } } +func (app *App) runDBMigrations() { + database.AutoMigrate() +} + func (app *App) runWorkers() { // lancer le serveur udp go func() { diff --git a/models/registry.go b/models/registry.go new file mode 100644 index 0000000..3f5b8c7 --- /dev/null +++ b/models/registry.go @@ -0,0 +1,14 @@ +package models + +func All() []interface{} { + return []interface{}{ + &User{}, + &Server{}, + &ServerUser{}, + &Category{}, + &Channel{}, + &ChannelUser{}, + &Message{}, + &Attachment{}, + } +} diff --git a/network/udp/listen_unix.go b/network/udp/listen_unix.go new file mode 100644 index 0000000..d32e7e5 --- /dev/null +++ b/network/udp/listen_unix.go @@ -0,0 +1,68 @@ +//go:build linux || darwin +// +build linux darwin + +package udp + +import ( + "fmt" + "net" + + "golang.org/x/sys/unix" +) + +func listenUDP(bindAddr string, workerCount int) ([]*net.UDPConn, error) { + addr, err := net.ResolveUDPAddr("udp", bindAddr) + if err != nil { + return nil, fmt.Errorf("cannot resolve address: %w", err) + } + + conns := make([]*net.UDPConn, 0, workerCount) + + for i := 0; i < workerCount; i++ { + conn, err := net.ListenUDP("udp", addr) + if err != nil { + // En cas d’erreur, on ferme ce qu’on a ouvert. + for _, c := range conns { + _ = c.Close() + } + return nil, fmt.Errorf("cannot listen on address (worker %d): %w", i, err) + } + + rawConn, err := conn.SyscallConn() + if err != nil { + _ = conn.Close() + for _, c := range conns { + _ = c.Close() + } + return nil, fmt.Errorf("cannot get raw connection (worker %d): %w", i, err) + } + + var sockErr error + err = rawConn.Control(func(fd uintptr) { + sockErr = unix.SetsockoptInt( + int(fd), + unix.SOL_SOCKET, + unix.SO_REUSEPORT, + 1, + ) + }) + if err != nil { + _ = conn.Close() + for _, c := range conns { + _ = c.Close() + } + return nil, fmt.Errorf("control error (worker %d): %w", i, err) + } + if sockErr != nil { + _ = conn.Close() + for _, c := range conns { + _ = c.Close() + } + return nil, fmt.Errorf("cannot set SO_REUSEPORT (worker %d): %w", i, sockErr) + } + + conns = append(conns, conn) + } + + return conns, nil +} diff --git a/network/udp/listen_windows.go b/network/udp/listen_windows.go new file mode 100644 index 0000000..1b25cea --- /dev/null +++ b/network/udp/listen_windows.go @@ -0,0 +1,24 @@ +//go:build windows +// +build windows + +package udp + +import ( + "fmt" + "net" +) + +func listenUDP(bindAddr string, workerCount int) ([]*net.UDPConn, error) { + addr, err := net.ResolveUDPAddr("udp", bindAddr) + if err != nil { + return nil, fmt.Errorf("cannot resolve address: %w", err) + } + + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, fmt.Errorf("cannot listen on address: %w", err) + } + + // Un seul conn partagé par tous les workers. + return []*net.UDPConn{conn}, nil +} diff --git a/network/udp/server.go b/network/udp/server.go index f6829d6..eacd036 100644 --- a/network/udp/server.go +++ b/network/udp/server.go @@ -12,7 +12,7 @@ type Server struct { bindAddr string routingTable *RoutingTable - conn *net.UDPConn + conns []*net.UDPConn wg sync.WaitGroup ctx context.Context cancel context.CancelFunc @@ -28,40 +28,44 @@ func NewServer(bindAddr string) *Server { } } -func (s *Server) run() error { - add, err := net.ResolveUDPAddr("udp", s.bindAddr) - if err != nil { - return fmt.Errorf("cannot resolve address: %w", err) - } +func (s *Server) Run() error { + workerCount := runtime.NumCPU() - s.conn, err = net.ListenUDP("udp", add) + conns, err := listenUDP(s.bindAddr, workerCount) if err != nil { return fmt.Errorf("cannot listen on address: %w", err) } + if len(conns) == 0 { + return fmt.Errorf("no UDP connections created") + } + s.conns = conns + + for _, conn := range s.conns { + conn.SetReadBuffer(8 * 1024 * 1024) + conn.SetWriteBuffer(8 * 1024 * 1024) + } - s.conn.SetReadBuffer(8 * 1024 * 1024) - s.conn.SetWriteBuffer(8 * 1024 * 1024) fmt.Println("Listening on", s.bindAddr) - for i := 0; i < runtime.NumCPU(); i++ { + for i, conn := range s.conns { s.wg.Add(1) - // todo : add so_reuseport option when on unix like system - go s.workerLoop(i) + go s.workerLoop(i, conn) } return nil } func (s *Server) sendTo(data []byte, addr *net.UDPAddr) error { - if s.conn == nil { + if len(s.conns) == 0 || s.conns[0] == nil { return fmt.Errorf("server not started") } - _, err := s.conn.WriteToUDP(data, addr) + // On utilise la première conn pour l’envoi (c’est suffisant pour UDP). + _, err := s.conns[0].WriteToUDP(data, addr) return err } -func (s *Server) workerLoop(id int) { +func (s *Server) workerLoop(id int, conn *net.UDPConn) { defer s.wg.Done() buffer := make([]byte, 1500) @@ -73,7 +77,7 @@ func (s *Server) workerLoop(id int) { fmt.Println("Worker", id, "stopped") return default: - size, addr, err := s.conn.ReadFromUDP(buffer) + size, addr, err := conn.ReadFromUDP(buffer) if err != nil { if s.ctx.Err() != nil { return @@ -81,7 +85,7 @@ func (s *Server) workerLoop(id int) { if opErr, ok := err.(*net.OpError); ok && opErr.Temporary() { continue } - fmt.Printf("Error reading from UDP: %v\n", err) + fmt.Printf("Error reading from UDP (worker %d): %v\n", id, err) continue } @@ -91,13 +95,14 @@ func (s *Server) workerLoop(id int) { } func (s *Server) handlePacket(data []byte, addr *net.UDPAddr) { + if len(data) == 0 { + return + } + pt := PacketType(data[0]) switch pt { case PacketTypePing: - err := s.sendTo([]byte{byte(PacketTypePing)}, addr) - if err != nil { - return - } + _ = s.sendTo([]byte{byte(PacketTypePing)}, addr) return case PacketTypeConnect: return @@ -105,11 +110,9 @@ func (s *Server) handlePacket(data []byte, addr *net.UDPAddr) { return case PacketTypeVoiceData: // todo : déterminer le format du packet - //channelID := string(data[1:5]) + // channelID := string(data[1:5]) return default: return - } - }