mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-04-14 19:45:47 +00:00
fix(websocket): resolve channel race condition and graceful shutdown deadlock
This commit is contained in:
parent
7e6e66ebfc
commit
93436bccba
2 changed files with 16 additions and 33 deletions
|
|
@ -85,8 +85,8 @@ func (j *XrayTrafficJob) Run() {
|
||||||
logger.Warning("get all outbounds for websocket failed:", err)
|
logger.Warning("get all outbounds for websocket failed:", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The web socket hub will automatically check the payload size.
|
// The WebSocket hub will automatically check the payload size.
|
||||||
// If < 1MB, it pushes real-time. If > 1MB, it sends a lightweight 'invalidate' signal.
|
// If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead.
|
||||||
if updatedInbounds != nil {
|
if updatedInbounds != nil {
|
||||||
websocket.BroadcastInbounds(updatedInbounds)
|
websocket.BroadcastInbounds(updatedInbounds)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -33,10 +33,11 @@ type Message struct {
|
||||||
|
|
||||||
// Client represents a WebSocket client connection
|
// Client represents a WebSocket client connection
|
||||||
type Client struct {
|
type Client struct {
|
||||||
ID string
|
ID string
|
||||||
Send chan []byte
|
Send chan []byte
|
||||||
Hub *Hub
|
Hub *Hub
|
||||||
Topics map[MessageType]bool // Subscribed topics
|
Topics map[MessageType]bool // Subscribed topics
|
||||||
|
closeOnce sync.Once // Ensures Send channel is closed exactly once
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hub maintains the set of active clients and broadcasts messages to them
|
// Hub maintains the set of active clients and broadcasts messages to them
|
||||||
|
|
@ -62,7 +63,6 @@ type Hub struct {
|
||||||
|
|
||||||
// Worker pool for parallel broadcasting
|
// Worker pool for parallel broadcasting
|
||||||
workerPoolSize int
|
workerPoolSize int
|
||||||
broadcastWg sync.WaitGroup
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHub creates a new WebSocket hub
|
// NewHub creates a new WebSocket hub
|
||||||
|
|
@ -105,20 +105,12 @@ func (h *Hub) Run() {
|
||||||
// Graceful shutdown: close all clients
|
// Graceful shutdown: close all clients
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
for client := range h.clients {
|
for client := range h.clients {
|
||||||
// Safely close channel (avoid double close panic)
|
client.closeOnce.Do(func() {
|
||||||
select {
|
|
||||||
case _, stillOpen := <-client.Send:
|
|
||||||
if stillOpen {
|
|
||||||
close(client.Send)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
close(client.Send)
|
close(client.Send)
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
h.clients = make(map[*Client]bool)
|
h.clients = make(map[*Client]bool)
|
||||||
h.mu.Unlock()
|
h.mu.Unlock()
|
||||||
// Wait for all broadcast workers to finish
|
|
||||||
h.broadcastWg.Wait()
|
|
||||||
logger.Info("WebSocket hub stopped gracefully")
|
logger.Info("WebSocket hub stopped gracefully")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -139,19 +131,9 @@ func (h *Hub) Run() {
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
if _, ok := h.clients[client]; ok {
|
if _, ok := h.clients[client]; ok {
|
||||||
delete(h.clients, client)
|
delete(h.clients, client)
|
||||||
// Safely close channel (avoid double close panic)
|
client.closeOnce.Do(func() {
|
||||||
// Check if channel is already closed by trying to read from it
|
|
||||||
select {
|
|
||||||
case _, stillOpen := <-client.Send:
|
|
||||||
if stillOpen {
|
|
||||||
// Channel was open and had data, now it's empty, safe to close
|
|
||||||
close(client.Send)
|
|
||||||
}
|
|
||||||
// If stillOpen is false, channel was already closed, do nothing
|
|
||||||
default:
|
|
||||||
// Channel is empty and open, safe to close
|
|
||||||
close(client.Send)
|
close(client.Send)
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
count := len(h.clients)
|
count := len(h.clients)
|
||||||
h.mu.Unlock()
|
h.mu.Unlock()
|
||||||
|
|
@ -221,11 +203,12 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
|
||||||
}
|
}
|
||||||
close(clientChan)
|
close(clientChan)
|
||||||
|
|
||||||
// Start workers for parallel processing
|
// Use a local WaitGroup to avoid blocking hub shutdown
|
||||||
h.broadcastWg.Add(h.workerPoolSize)
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(h.workerPoolSize)
|
||||||
for i := 0; i < h.workerPoolSize; i++ {
|
for i := 0; i < h.workerPoolSize; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
defer h.broadcastWg.Done()
|
defer wg.Done()
|
||||||
for client := range clientChan {
|
for client := range clientChan {
|
||||||
func() {
|
func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -247,7 +230,7 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for all workers to finish
|
// Wait for all workers to finish
|
||||||
h.broadcastWg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Broadcast sends a message to all connected clients
|
// Broadcast sends a message to all connected clients
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue