Compare commits

...

2 commits

Author SHA1 Message Date
lolka1333
0414ef17b0
Merge 93436bccba into 169b216d7e 2026-04-07 01:52:34 +00:00
test999
93436bccba fix(websocket): resolve channel race condition and graceful shutdown deadlock 2026-04-07 04:52:28 +03:00
2 changed files with 16 additions and 33 deletions

View file

@ -85,8 +85,8 @@ func (j *XrayTrafficJob) Run() {
logger.Warning("get all outbounds for websocket failed:", err) logger.Warning("get all outbounds for websocket failed:", err)
} }
// The web socket hub will automatically check the payload size. // The WebSocket hub will automatically check the payload size.
// If < 1MB, it pushes real-time. If > 1MB, it sends a lightweight 'invalidate' signal. // If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead.
if updatedInbounds != nil { if updatedInbounds != nil {
websocket.BroadcastInbounds(updatedInbounds) websocket.BroadcastInbounds(updatedInbounds)
} }

View file

@ -33,10 +33,11 @@ type Message struct {
// Client represents a WebSocket client connection // Client represents a WebSocket client connection
type Client struct { type Client struct {
ID string ID string
Send chan []byte Send chan []byte
Hub *Hub Hub *Hub
Topics map[MessageType]bool // Subscribed topics Topics map[MessageType]bool // Subscribed topics
closeOnce sync.Once // Ensures Send channel is closed exactly once
} }
// Hub maintains the set of active clients and broadcasts messages to them // Hub maintains the set of active clients and broadcasts messages to them
@ -62,7 +63,6 @@ type Hub struct {
// Worker pool for parallel broadcasting // Worker pool for parallel broadcasting
workerPoolSize int workerPoolSize int
broadcastWg sync.WaitGroup
} }
// NewHub creates a new WebSocket hub // NewHub creates a new WebSocket hub
@ -105,20 +105,12 @@ func (h *Hub) Run() {
// Graceful shutdown: close all clients // Graceful shutdown: close all clients
h.mu.Lock() h.mu.Lock()
for client := range h.clients { for client := range h.clients {
// Safely close channel (avoid double close panic) client.closeOnce.Do(func() {
select {
case _, stillOpen := <-client.Send:
if stillOpen {
close(client.Send)
}
default:
close(client.Send) close(client.Send)
} })
} }
h.clients = make(map[*Client]bool) h.clients = make(map[*Client]bool)
h.mu.Unlock() h.mu.Unlock()
// Wait for all broadcast workers to finish
h.broadcastWg.Wait()
logger.Info("WebSocket hub stopped gracefully") logger.Info("WebSocket hub stopped gracefully")
return return
@ -139,19 +131,9 @@ func (h *Hub) Run() {
h.mu.Lock() h.mu.Lock()
if _, ok := h.clients[client]; ok { if _, ok := h.clients[client]; ok {
delete(h.clients, client) delete(h.clients, client)
// Safely close channel (avoid double close panic) client.closeOnce.Do(func() {
// Check if channel is already closed by trying to read from it
select {
case _, stillOpen := <-client.Send:
if stillOpen {
// Channel was open and had data, now it's empty, safe to close
close(client.Send)
}
// If stillOpen is false, channel was already closed, do nothing
default:
// Channel is empty and open, safe to close
close(client.Send) close(client.Send)
} })
} }
count := len(h.clients) count := len(h.clients)
h.mu.Unlock() h.mu.Unlock()
@ -221,11 +203,12 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
} }
close(clientChan) close(clientChan)
// Start workers for parallel processing // Use a local WaitGroup to avoid blocking hub shutdown
h.broadcastWg.Add(h.workerPoolSize) var wg sync.WaitGroup
wg.Add(h.workerPoolSize)
for i := 0; i < h.workerPoolSize; i++ { for i := 0; i < h.workerPoolSize; i++ {
go func() { go func() {
defer h.broadcastWg.Done() defer wg.Done()
for client := range clientChan { for client := range clientChan {
func() { func() {
defer func() { defer func() {
@ -247,7 +230,7 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
} }
// Wait for all workers to finish // Wait for all workers to finish
h.broadcastWg.Wait() wg.Wait()
} }
// Broadcast sends a message to all connected clients // Broadcast sends a message to all connected clients