// Package websocket provides WebSocket hub for real-time updates and notifications. package websocket import ( "context" "encoding/json" "runtime" "sync" "time" "github.com/mhsanaei/3x-ui/v2/logger" ) // MessageType represents the type of WebSocket message type MessageType string const ( MessageTypeStatus MessageType = "status" // Server status update MessageTypeTraffic MessageType = "traffic" // Traffic statistics update MessageTypeInbounds MessageType = "inbounds" // Inbounds list update MessageTypeNotification MessageType = "notification" // System notification MessageTypeXrayState MessageType = "xray_state" // Xray state change ) // Message represents a WebSocket message type Message struct { Type MessageType `json:"type"` Payload interface{} `json:"payload"` Time int64 `json:"time"` } // Client represents a WebSocket client connection type Client struct { ID string Send chan []byte Hub *Hub Topics map[MessageType]bool // Subscribed topics } // Hub maintains the set of active clients and broadcasts messages to them type Hub struct { // Registered clients clients map[*Client]bool // Inbound messages from clients broadcast chan []byte // Register requests from clients register chan *Client // Unregister requests from clients unregister chan *Client // Mutex for thread-safe operations mu sync.RWMutex // Context for graceful shutdown ctx context.Context cancel context.CancelFunc // Worker pool for parallel broadcasting workerPoolSize int broadcastWg sync.WaitGroup } // NewHub creates a new WebSocket hub func NewHub() *Hub { ctx, cancel := context.WithCancel(context.Background()) // Calculate optimal worker pool size (CPU cores * 2, but max 100) workerPoolSize := runtime.NumCPU() * 2 if workerPoolSize > 100 { workerPoolSize = 100 } if workerPoolSize < 10 { workerPoolSize = 10 } return &Hub{ clients: make(map[*Client]bool), broadcast: make(chan []byte, 2048), // Increased from 256 to 2048 for high load register: make(chan *Client, 100), // Buffered channel for fast registration unregister: make(chan *Client, 100), // Buffered channel for fast unregistration ctx: ctx, cancel: cancel, workerPoolSize: workerPoolSize, } } // Run starts the hub's main loop func (h *Hub) Run() { defer func() { if r := recover(); r != nil { logger.Error("WebSocket hub panic recovered:", r) // Restart the hub loop go h.Run() } }() for { select { case <-h.ctx.Done(): // Graceful shutdown: close all clients h.mu.Lock() for client := range h.clients { // Safely close channel (avoid double close panic) select { case _, stillOpen := <-client.Send: if stillOpen { close(client.Send) } default: close(client.Send) } } h.clients = make(map[*Client]bool) h.mu.Unlock() // Wait for all broadcast workers to finish h.broadcastWg.Wait() logger.Info("WebSocket hub stopped gracefully") return case client := <-h.register: if client == nil { continue } h.mu.Lock() h.clients[client] = true count := len(h.clients) h.mu.Unlock() logger.Infof("WebSocket client connected: %s (total: %d)", client.ID, count) case client := <-h.unregister: if client == nil { continue } h.mu.Lock() if _, ok := h.clients[client]; ok { delete(h.clients, client) // Safely close channel (avoid double close panic) // Check if channel is already closed by trying to read from it select { case _, stillOpen := <-client.Send: if stillOpen { // Channel was open and had data, now it's empty, safe to close close(client.Send) } // If stillOpen is false, channel was already closed, do nothing default: // Channel is empty and open, safe to close close(client.Send) } } count := len(h.clients) h.mu.Unlock() logger.Infof("WebSocket client disconnected: %s (total: %d)", client.ID, count) case message := <-h.broadcast: if message == nil { continue } // Optimization: quickly copy client list and release lock h.mu.RLock() clientCount := len(h.clients) if clientCount == 0 { h.mu.RUnlock() continue } // Pre-allocate memory for client list clients := make([]*Client, 0, clientCount) for client := range h.clients { clients = append(clients, client) } h.mu.RUnlock() // Parallel broadcast using worker pool h.broadcastParallel(clients, message) } } } // broadcastParallel sends message to all clients in parallel for maximum performance func (h *Hub) broadcastParallel(clients []*Client, message []byte) { if len(clients) == 0 { return } // For small number of clients, use simple parallel sending if len(clients) < h.workerPoolSize { var wg sync.WaitGroup for _, client := range clients { wg.Add(1) go func(c *Client) { defer wg.Done() defer func() { if r := recover(); r != nil { // Channel may be closed, safely ignore logger.Debugf("WebSocket broadcast panic recovered for client %s: %v", c.ID, r) } }() select { case c.Send <- message: default: // Client's send buffer is full, disconnect logger.Debugf("WebSocket client %s send buffer full, disconnecting", c.ID) h.Unregister(c) } }(client) } wg.Wait() return } // For large number of clients, use worker pool for optimal performance clientChan := make(chan *Client, len(clients)) for _, client := range clients { clientChan <- client } close(clientChan) // Start workers for parallel processing h.broadcastWg.Add(h.workerPoolSize) for i := 0; i < h.workerPoolSize; i++ { go func() { defer h.broadcastWg.Done() for client := range clientChan { func() { defer func() { if r := recover(); r != nil { // Channel may be closed, safely ignore logger.Debugf("WebSocket broadcast panic recovered for client %s: %v", client.ID, r) } }() select { case client.Send <- message: default: // Client's send buffer is full, disconnect logger.Debugf("WebSocket client %s send buffer full, disconnecting", client.ID) h.Unregister(client) } }() } }() } // Wait for all workers to finish h.broadcastWg.Wait() } // Broadcast sends a message to all connected clients func (h *Hub) Broadcast(messageType MessageType, payload interface{}) { if h == nil { return } if payload == nil { logger.Warning("Attempted to broadcast nil payload") return } msg := Message{ Type: messageType, Payload: payload, Time: getCurrentTimestamp(), } data, err := json.Marshal(msg) if err != nil { logger.Error("Failed to marshal WebSocket message:", err) return } // Limit message size to prevent memory issues const maxMessageSize = 1024 * 1024 // 1MB if len(data) > maxMessageSize { logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) return } // Non-blocking send with timeout to prevent delays select { case h.broadcast <- data: case <-time.After(100 * time.Millisecond): logger.Warning("WebSocket broadcast channel is full, dropping message") case <-h.ctx.Done(): // Hub is shutting down } } // BroadcastToTopic sends a message only to clients subscribed to the specific topic func (h *Hub) BroadcastToTopic(messageType MessageType, payload interface{}) { if h == nil { return } if payload == nil { logger.Warning("Attempted to broadcast nil payload to topic") return } msg := Message{ Type: messageType, Payload: payload, Time: getCurrentTimestamp(), } data, err := json.Marshal(msg) if err != nil { logger.Error("Failed to marshal WebSocket message:", err) return } // Limit message size to prevent memory issues const maxMessageSize = 1024 * 1024 // 1MB if len(data) > maxMessageSize { logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) return } h.mu.RLock() // Filter clients by topics and quickly release lock subscribedClients := make([]*Client, 0) for client := range h.clients { if len(client.Topics) == 0 || client.Topics[messageType] { subscribedClients = append(subscribedClients, client) } } h.mu.RUnlock() // Parallel send to subscribed clients if len(subscribedClients) > 0 { h.broadcastParallel(subscribedClients, data) } } // GetClientCount returns the number of connected clients func (h *Hub) GetClientCount() int { h.mu.RLock() defer h.mu.RUnlock() return len(h.clients) } // Register registers a new client with the hub func (h *Hub) Register(client *Client) { if h == nil || client == nil { return } select { case h.register <- client: case <-h.ctx.Done(): // Hub is shutting down } } // Unregister unregisters a client from the hub func (h *Hub) Unregister(client *Client) { if h == nil || client == nil { return } select { case h.unregister <- client: case <-h.ctx.Done(): // Hub is shutting down } } // Stop gracefully stops the hub and closes all connections func (h *Hub) Stop() { if h == nil { return } if h.cancel != nil { h.cancel() } } // getCurrentTimestamp returns current Unix timestamp in milliseconds func getCurrentTimestamp() int64 { return time.Now().UnixMilli() }