Compare commits

...

2 commits

Author SHA1 Message Date
lolka1333
0414ef17b0
Merge 93436bccba into 169b216d7e 2026-04-07 01:52:34 +00:00
test999
93436bccba fix(websocket): resolve channel race condition and graceful shutdown deadlock 2026-04-07 04:52:28 +03:00
2 changed files with 16 additions and 33 deletions

View file

@ -85,8 +85,8 @@ func (j *XrayTrafficJob) Run() {
logger.Warning("get all outbounds for websocket failed:", err)
}
// The web socket hub will automatically check the payload size.
// If < 1MB, it pushes real-time. If > 1MB, it sends a lightweight 'invalidate' signal.
// The WebSocket hub will automatically check the payload size.
// If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead.
if updatedInbounds != nil {
websocket.BroadcastInbounds(updatedInbounds)
}

View file

@ -33,10 +33,11 @@ type Message struct {
// Client represents a WebSocket client connection
type Client struct {
ID string
Send chan []byte
Hub *Hub
Topics map[MessageType]bool // Subscribed topics
ID string
Send chan []byte
Hub *Hub
Topics map[MessageType]bool // Subscribed topics
closeOnce sync.Once // Ensures Send channel is closed exactly once
}
// Hub maintains the set of active clients and broadcasts messages to them
@ -62,7 +63,6 @@ type Hub struct {
// Worker pool for parallel broadcasting
workerPoolSize int
broadcastWg sync.WaitGroup
}
// NewHub creates a new WebSocket hub
@ -105,20 +105,12 @@ func (h *Hub) Run() {
// Graceful shutdown: close all clients
h.mu.Lock()
for client := range h.clients {
// Safely close channel (avoid double close panic)
select {
case _, stillOpen := <-client.Send:
if stillOpen {
close(client.Send)
}
default:
client.closeOnce.Do(func() {
close(client.Send)
}
})
}
h.clients = make(map[*Client]bool)
h.mu.Unlock()
// Wait for all broadcast workers to finish
h.broadcastWg.Wait()
logger.Info("WebSocket hub stopped gracefully")
return
@ -139,19 +131,9 @@ func (h *Hub) Run() {
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
// Safely close channel (avoid double close panic)
// Check if channel is already closed by trying to read from it
select {
case _, stillOpen := <-client.Send:
if stillOpen {
// Channel was open and had data, now it's empty, safe to close
close(client.Send)
}
// If stillOpen is false, channel was already closed, do nothing
default:
// Channel is empty and open, safe to close
client.closeOnce.Do(func() {
close(client.Send)
}
})
}
count := len(h.clients)
h.mu.Unlock()
@ -221,11 +203,12 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
}
close(clientChan)
// Start workers for parallel processing
h.broadcastWg.Add(h.workerPoolSize)
// Use a local WaitGroup to avoid blocking hub shutdown
var wg sync.WaitGroup
wg.Add(h.workerPoolSize)
for i := 0; i < h.workerPoolSize; i++ {
go func() {
defer h.broadcastWg.Done()
defer wg.Done()
for client := range clientChan {
func() {
defer func() {
@ -247,7 +230,7 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
}
// Wait for all workers to finish
h.broadcastWg.Wait()
wg.Wait()
}
// Broadcast sends a message to all connected clients