mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-04-16 04:25:46 +00:00
Compare commits
1 commit
0414ef17b0
...
edf82cf547
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
edf82cf547 |
2 changed files with 33 additions and 16 deletions
|
|
@ -85,8 +85,8 @@ func (j *XrayTrafficJob) Run() {
|
||||||
logger.Warning("get all outbounds for websocket failed:", err)
|
logger.Warning("get all outbounds for websocket failed:", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The WebSocket hub will automatically check the payload size.
|
// The web socket hub will automatically check the payload size.
|
||||||
// If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead.
|
// If < 1MB, it pushes real-time. If > 1MB, it sends a lightweight 'invalidate' signal.
|
||||||
if updatedInbounds != nil {
|
if updatedInbounds != nil {
|
||||||
websocket.BroadcastInbounds(updatedInbounds)
|
websocket.BroadcastInbounds(updatedInbounds)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -33,11 +33,10 @@ type Message struct {
|
||||||
|
|
||||||
// Client represents a WebSocket client connection
|
// Client represents a WebSocket client connection
|
||||||
type Client struct {
|
type Client struct {
|
||||||
ID string
|
ID string
|
||||||
Send chan []byte
|
Send chan []byte
|
||||||
Hub *Hub
|
Hub *Hub
|
||||||
Topics map[MessageType]bool // Subscribed topics
|
Topics map[MessageType]bool // Subscribed topics
|
||||||
closeOnce sync.Once // Ensures Send channel is closed exactly once
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hub maintains the set of active clients and broadcasts messages to them
|
// Hub maintains the set of active clients and broadcasts messages to them
|
||||||
|
|
@ -63,6 +62,7 @@ type Hub struct {
|
||||||
|
|
||||||
// Worker pool for parallel broadcasting
|
// Worker pool for parallel broadcasting
|
||||||
workerPoolSize int
|
workerPoolSize int
|
||||||
|
broadcastWg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHub creates a new WebSocket hub
|
// NewHub creates a new WebSocket hub
|
||||||
|
|
@ -105,12 +105,20 @@ func (h *Hub) Run() {
|
||||||
// Graceful shutdown: close all clients
|
// Graceful shutdown: close all clients
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
for client := range h.clients {
|
for client := range h.clients {
|
||||||
client.closeOnce.Do(func() {
|
// Safely close channel (avoid double close panic)
|
||||||
|
select {
|
||||||
|
case _, stillOpen := <-client.Send:
|
||||||
|
if stillOpen {
|
||||||
|
close(client.Send)
|
||||||
|
}
|
||||||
|
default:
|
||||||
close(client.Send)
|
close(client.Send)
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
h.clients = make(map[*Client]bool)
|
h.clients = make(map[*Client]bool)
|
||||||
h.mu.Unlock()
|
h.mu.Unlock()
|
||||||
|
// Wait for all broadcast workers to finish
|
||||||
|
h.broadcastWg.Wait()
|
||||||
logger.Info("WebSocket hub stopped gracefully")
|
logger.Info("WebSocket hub stopped gracefully")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -131,9 +139,19 @@ func (h *Hub) Run() {
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
if _, ok := h.clients[client]; ok {
|
if _, ok := h.clients[client]; ok {
|
||||||
delete(h.clients, client)
|
delete(h.clients, client)
|
||||||
client.closeOnce.Do(func() {
|
// Safely close channel (avoid double close panic)
|
||||||
|
// Check if channel is already closed by trying to read from it
|
||||||
|
select {
|
||||||
|
case _, stillOpen := <-client.Send:
|
||||||
|
if stillOpen {
|
||||||
|
// Channel was open and had data, now it's empty, safe to close
|
||||||
|
close(client.Send)
|
||||||
|
}
|
||||||
|
// If stillOpen is false, channel was already closed, do nothing
|
||||||
|
default:
|
||||||
|
// Channel is empty and open, safe to close
|
||||||
close(client.Send)
|
close(client.Send)
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
count := len(h.clients)
|
count := len(h.clients)
|
||||||
h.mu.Unlock()
|
h.mu.Unlock()
|
||||||
|
|
@ -203,12 +221,11 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
|
||||||
}
|
}
|
||||||
close(clientChan)
|
close(clientChan)
|
||||||
|
|
||||||
// Use a local WaitGroup to avoid blocking hub shutdown
|
// Start workers for parallel processing
|
||||||
var wg sync.WaitGroup
|
h.broadcastWg.Add(h.workerPoolSize)
|
||||||
wg.Add(h.workerPoolSize)
|
|
||||||
for i := 0; i < h.workerPoolSize; i++ {
|
for i := 0; i < h.workerPoolSize; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer h.broadcastWg.Done()
|
||||||
for client := range clientChan {
|
for client := range clientChan {
|
||||||
func() {
|
func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -230,7 +247,7 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for all workers to finish
|
// Wait for all workers to finish
|
||||||
wg.Wait()
|
h.broadcastWg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Broadcast sends a message to all connected clients
|
// Broadcast sends a message to all connected clients
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue