package ws import ( "log/slog" "net/http" "sync" "time" "github.com/gorilla/websocket" ) const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second // Time allowed to read the next pong message from the peer. pongWait = 60 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, // Разрешаем CORS для разработки (в продакшене лучше ограничить) CheckOrigin: func(r *http.Request) bool { return true }, } // Hub maintains the set of active clients and broadcasts messages to the clients. type Hub struct { // Registered clients. clients map[*Client]bool // Inbound messages from the clients (not used for now, only broadcast). broadcast chan []byte // Register requests from the clients. register chan *Client // Unregister requests from clients. unregister chan *Client mu sync.Mutex } func NewHub() *Hub { return &Hub{ broadcast: make(chan []byte), register: make(chan *Client), unregister: make(chan *Client), clients: make(map[*Client]bool), } } func (h *Hub) Run() { for { select { case client := <-h.register: h.mu.Lock() h.clients[client] = true h.mu.Unlock() slog.Debug("WS: Client registered") case client := <-h.unregister: h.mu.Lock() if _, ok := h.clients[client]; ok { delete(h.clients, client) close(client.send) slog.Debug("WS: Client unregistered") } h.mu.Unlock() case message := <-h.broadcast: h.mu.Lock() for client := range h.clients { select { case client.send <- message: default: close(client.send) delete(h.clients, client) } } h.mu.Unlock() } } } // Broadcast отправляет сообщение всем подключенным клиентам func (h *Hub) BroadcastMessage(msg []byte) { h.broadcast <- msg } // Client is a middleman between the websocket connection and the hub. type Client struct { hub *Hub // The websocket connection. conn *websocket.Conn // Buffered channel of outbound messages. send chan []byte } // writePump pumps messages from the hub to the websocket connection. func (c *Client) writePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() c.conn.Close() }() for { select { case message, ok := <-c.send: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { // The hub closed the channel. c.conn.WriteMessage(websocket.CloseMessage, []byte{}) return } w, err := c.conn.NextWriter(websocket.TextMessage) if err != nil { return } w.Write(message) if err := w.Close(); err != nil { return } case <-ticker.C: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } // readPump pumps messages from the websocket connection to the hub. // (Needed to process PONGs and detect disconnects) func (c *Client) readPump() { defer func() { c.hub.unregister <- c c.conn.Close() }() c.conn.SetReadLimit(512) c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) for { _, _, err := c.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { slog.Error("WS: error", "error", err) } break } } } // ServeWs handles websocket requests from the peer. func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { slog.Error("WS: Failed to upgrade connection", "error", err) return } client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)} client.hub.register <- client // Allow collection of memory referenced by the caller by doing all work in // new goroutines. go client.writePump() go client.readPump() }