Compare commits

..

1 commit

Author SHA1 Message Date
lolka1333
edf82cf547
Merge 7e6e66ebfc into 169b216d7e 2026-04-06 14:13:59 +00:00
2 changed files with 33 additions and 16 deletions

View file

@ -85,8 +85,8 @@ func (j *XrayTrafficJob) Run() {
logger.Warning("get all outbounds for websocket failed:", err)
}
// The WebSocket hub will automatically check the payload size.
// If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead.
// The web socket hub will automatically check the payload size.
// If < 1MB, it pushes real-time. If > 1MB, it sends a lightweight 'invalidate' signal.
if updatedInbounds != nil {
websocket.BroadcastInbounds(updatedInbounds)
}

View file

@ -33,11 +33,10 @@ type Message struct {
// Client represents a WebSocket client connection
type Client struct {
ID string
Send chan []byte
Hub *Hub
Topics map[MessageType]bool // Subscribed topics
closeOnce sync.Once // Ensures Send channel is closed exactly once
ID string
Send chan []byte
Hub *Hub
Topics map[MessageType]bool // Subscribed topics
}
// Hub maintains the set of active clients and broadcasts messages to them
@ -63,6 +62,7 @@ type Hub struct {
// Worker pool for parallel broadcasting
workerPoolSize int
broadcastWg sync.WaitGroup
}
// NewHub creates a new WebSocket hub
@ -105,12 +105,20 @@ func (h *Hub) Run() {
// Graceful shutdown: close all clients
h.mu.Lock()
for client := range h.clients {
client.closeOnce.Do(func() {
// Safely close channel (avoid double close panic)
select {
case _, stillOpen := <-client.Send:
if stillOpen {
close(client.Send)
}
default:
close(client.Send)
})
}
}
h.clients = make(map[*Client]bool)
h.mu.Unlock()
// Wait for all broadcast workers to finish
h.broadcastWg.Wait()
logger.Info("WebSocket hub stopped gracefully")
return
@ -131,9 +139,19 @@ func (h *Hub) Run() {
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
client.closeOnce.Do(func() {
// Safely close channel (avoid double close panic)
// Check if channel is already closed by trying to read from it
select {
case _, stillOpen := <-client.Send:
if stillOpen {
// Channel was open and had data, now it's empty, safe to close
close(client.Send)
}
// If stillOpen is false, channel was already closed, do nothing
default:
// Channel is empty and open, safe to close
close(client.Send)
})
}
}
count := len(h.clients)
h.mu.Unlock()
@ -203,12 +221,11 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
}
close(clientChan)
// Use a local WaitGroup to avoid blocking hub shutdown
var wg sync.WaitGroup
wg.Add(h.workerPoolSize)
// Start workers for parallel processing
h.broadcastWg.Add(h.workerPoolSize)
for i := 0; i < h.workerPoolSize; i++ {
go func() {
defer wg.Done()
defer h.broadcastWg.Done()
for client := range clientChan {
func() {
defer func() {
@ -230,7 +247,7 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
}
// Wait for all workers to finish
wg.Wait()
h.broadcastWg.Wait()
}
// Broadcast sends a message to all connected clients