diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go index b1a775f0..f443aa77 100644 --- a/web/job/xray_traffic_job.go +++ b/web/job/xray_traffic_job.go @@ -85,8 +85,8 @@ func (j *XrayTrafficJob) Run() { logger.Warning("get all outbounds for websocket failed:", err) } - // The web socket hub will automatically check the payload size. - // If < 1MB, it pushes real-time. If > 1MB, it sends a lightweight 'invalidate' signal. + // The WebSocket hub will automatically check the payload size. + // If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead. if updatedInbounds != nil { websocket.BroadcastInbounds(updatedInbounds) } diff --git a/web/websocket/hub.go b/web/websocket/hub.go index 3de1c81c..646ea9e2 100644 --- a/web/websocket/hub.go +++ b/web/websocket/hub.go @@ -33,10 +33,11 @@ type Message struct { // Client represents a WebSocket client connection type Client struct { - ID string - Send chan []byte - Hub *Hub - Topics map[MessageType]bool // Subscribed topics + ID string + Send chan []byte + Hub *Hub + Topics map[MessageType]bool // Subscribed topics + closeOnce sync.Once // Ensures Send channel is closed exactly once } // Hub maintains the set of active clients and broadcasts messages to them @@ -62,7 +63,6 @@ type Hub struct { // Worker pool for parallel broadcasting workerPoolSize int - broadcastWg sync.WaitGroup } // NewHub creates a new WebSocket hub @@ -105,20 +105,12 @@ func (h *Hub) Run() { // Graceful shutdown: close all clients h.mu.Lock() for client := range h.clients { - // Safely close channel (avoid double close panic) - select { - case _, stillOpen := <-client.Send: - if stillOpen { - close(client.Send) - } - default: + client.closeOnce.Do(func() { close(client.Send) - } + }) } h.clients = make(map[*Client]bool) h.mu.Unlock() - // Wait for all broadcast workers to finish - h.broadcastWg.Wait() logger.Info("WebSocket hub stopped gracefully") return @@ -139,19 +131,9 @@ func (h *Hub) Run() { h.mu.Lock() if _, ok := h.clients[client]; ok { delete(h.clients, client) - // Safely close channel (avoid double close panic) - // Check if channel is already closed by trying to read from it - select { - case _, stillOpen := <-client.Send: - if stillOpen { - // Channel was open and had data, now it's empty, safe to close - close(client.Send) - } - // If stillOpen is false, channel was already closed, do nothing - default: - // Channel is empty and open, safe to close + client.closeOnce.Do(func() { close(client.Send) - } + }) } count := len(h.clients) h.mu.Unlock() @@ -221,11 +203,12 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) { } close(clientChan) - // Start workers for parallel processing - h.broadcastWg.Add(h.workerPoolSize) + // Use a local WaitGroup to avoid blocking hub shutdown + var wg sync.WaitGroup + wg.Add(h.workerPoolSize) for i := 0; i < h.workerPoolSize; i++ { go func() { - defer h.broadcastWg.Done() + defer wg.Done() for client := range clientChan { func() { defer func() { @@ -247,7 +230,7 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) { } // Wait for all workers to finish - h.broadcastWg.Wait() + wg.Wait() } // Broadcast sends a message to all connected clients