mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2025-12-23 06:42:41 +00:00
379 lines
9.2 KiB
Go
379 lines
9.2 KiB
Go
// Package websocket provides WebSocket hub for real-time updates and notifications.
|
|
package websocket
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mhsanaei/3x-ui/v2/logger"
|
|
)
|
|
|
|
// MessageType represents the type of WebSocket message
|
|
type MessageType string
|
|
|
|
const (
|
|
MessageTypeStatus MessageType = "status" // Server status update
|
|
MessageTypeTraffic MessageType = "traffic" // Traffic statistics update
|
|
MessageTypeInbounds MessageType = "inbounds" // Inbounds list update
|
|
MessageTypeNotification MessageType = "notification" // System notification
|
|
MessageTypeXrayState MessageType = "xray_state" // Xray state change
|
|
)
|
|
|
|
// Message represents a WebSocket message
|
|
type Message struct {
|
|
Type MessageType `json:"type"`
|
|
Payload interface{} `json:"payload"`
|
|
Time int64 `json:"time"`
|
|
}
|
|
|
|
// Client represents a WebSocket client connection
|
|
type Client struct {
|
|
ID string
|
|
Send chan []byte
|
|
Hub *Hub
|
|
Topics map[MessageType]bool // Subscribed topics
|
|
}
|
|
|
|
// Hub maintains the set of active clients and broadcasts messages to them
|
|
type Hub struct {
|
|
// Registered clients
|
|
clients map[*Client]bool
|
|
|
|
// Inbound messages from clients
|
|
broadcast chan []byte
|
|
|
|
// Register requests from clients
|
|
register chan *Client
|
|
|
|
// Unregister requests from clients
|
|
unregister chan *Client
|
|
|
|
// Mutex for thread-safe operations
|
|
mu sync.RWMutex
|
|
|
|
// Context for graceful shutdown
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
// Worker pool for parallel broadcasting
|
|
workerPoolSize int
|
|
broadcastWg sync.WaitGroup
|
|
}
|
|
|
|
// NewHub creates a new WebSocket hub
|
|
func NewHub() *Hub {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
// Calculate optimal worker pool size (CPU cores * 2, but max 100)
|
|
workerPoolSize := runtime.NumCPU() * 2
|
|
if workerPoolSize > 100 {
|
|
workerPoolSize = 100
|
|
}
|
|
if workerPoolSize < 10 {
|
|
workerPoolSize = 10
|
|
}
|
|
|
|
return &Hub{
|
|
clients: make(map[*Client]bool),
|
|
broadcast: make(chan []byte, 2048), // Increased from 256 to 2048 for high load
|
|
register: make(chan *Client, 100), // Buffered channel for fast registration
|
|
unregister: make(chan *Client, 100), // Buffered channel for fast unregistration
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
workerPoolSize: workerPoolSize,
|
|
}
|
|
}
|
|
|
|
// Run starts the hub's main loop
|
|
func (h *Hub) Run() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logger.Error("WebSocket hub panic recovered:", r)
|
|
// Restart the hub loop
|
|
go h.Run()
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-h.ctx.Done():
|
|
// Graceful shutdown: close all clients
|
|
h.mu.Lock()
|
|
for client := range h.clients {
|
|
// Safely close channel (avoid double close panic)
|
|
select {
|
|
case _, stillOpen := <-client.Send:
|
|
if stillOpen {
|
|
close(client.Send)
|
|
}
|
|
default:
|
|
close(client.Send)
|
|
}
|
|
}
|
|
h.clients = make(map[*Client]bool)
|
|
h.mu.Unlock()
|
|
// Wait for all broadcast workers to finish
|
|
h.broadcastWg.Wait()
|
|
logger.Info("WebSocket hub stopped gracefully")
|
|
return
|
|
|
|
case client := <-h.register:
|
|
if client == nil {
|
|
continue
|
|
}
|
|
h.mu.Lock()
|
|
h.clients[client] = true
|
|
count := len(h.clients)
|
|
h.mu.Unlock()
|
|
logger.Infof("WebSocket client connected: %s (total: %d)", client.ID, count)
|
|
|
|
case client := <-h.unregister:
|
|
if client == nil {
|
|
continue
|
|
}
|
|
h.mu.Lock()
|
|
if _, ok := h.clients[client]; ok {
|
|
delete(h.clients, client)
|
|
// Safely close channel (avoid double close panic)
|
|
// Check if channel is already closed by trying to read from it
|
|
select {
|
|
case _, stillOpen := <-client.Send:
|
|
if stillOpen {
|
|
// Channel was open and had data, now it's empty, safe to close
|
|
close(client.Send)
|
|
}
|
|
// If stillOpen is false, channel was already closed, do nothing
|
|
default:
|
|
// Channel is empty and open, safe to close
|
|
close(client.Send)
|
|
}
|
|
}
|
|
count := len(h.clients)
|
|
h.mu.Unlock()
|
|
logger.Infof("WebSocket client disconnected: %s (total: %d)", client.ID, count)
|
|
|
|
case message := <-h.broadcast:
|
|
if message == nil {
|
|
continue
|
|
}
|
|
// Optimization: quickly copy client list and release lock
|
|
h.mu.RLock()
|
|
clientCount := len(h.clients)
|
|
if clientCount == 0 {
|
|
h.mu.RUnlock()
|
|
continue
|
|
}
|
|
|
|
// Pre-allocate memory for client list
|
|
clients := make([]*Client, 0, clientCount)
|
|
for client := range h.clients {
|
|
clients = append(clients, client)
|
|
}
|
|
h.mu.RUnlock()
|
|
|
|
// Parallel broadcast using worker pool
|
|
h.broadcastParallel(clients, message)
|
|
}
|
|
}
|
|
}
|
|
|
|
// broadcastParallel sends message to all clients in parallel for maximum performance
|
|
func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
|
|
if len(clients) == 0 {
|
|
return
|
|
}
|
|
|
|
// For small number of clients, use simple parallel sending
|
|
if len(clients) < h.workerPoolSize {
|
|
var wg sync.WaitGroup
|
|
for _, client := range clients {
|
|
wg.Add(1)
|
|
go func(c *Client) {
|
|
defer wg.Done()
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
// Channel may be closed, safely ignore
|
|
logger.Debugf("WebSocket broadcast panic recovered for client %s: %v", c.ID, r)
|
|
}
|
|
}()
|
|
select {
|
|
case c.Send <- message:
|
|
default:
|
|
// Client's send buffer is full, disconnect
|
|
logger.Debugf("WebSocket client %s send buffer full, disconnecting", c.ID)
|
|
h.Unregister(c)
|
|
}
|
|
}(client)
|
|
}
|
|
wg.Wait()
|
|
return
|
|
}
|
|
|
|
// For large number of clients, use worker pool for optimal performance
|
|
clientChan := make(chan *Client, len(clients))
|
|
for _, client := range clients {
|
|
clientChan <- client
|
|
}
|
|
close(clientChan)
|
|
|
|
// Start workers for parallel processing
|
|
h.broadcastWg.Add(h.workerPoolSize)
|
|
for i := 0; i < h.workerPoolSize; i++ {
|
|
go func() {
|
|
defer h.broadcastWg.Done()
|
|
for client := range clientChan {
|
|
func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
// Channel may be closed, safely ignore
|
|
logger.Debugf("WebSocket broadcast panic recovered for client %s: %v", client.ID, r)
|
|
}
|
|
}()
|
|
select {
|
|
case client.Send <- message:
|
|
default:
|
|
// Client's send buffer is full, disconnect
|
|
logger.Debugf("WebSocket client %s send buffer full, disconnecting", client.ID)
|
|
h.Unregister(client)
|
|
}
|
|
}()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Wait for all workers to finish
|
|
h.broadcastWg.Wait()
|
|
}
|
|
|
|
// Broadcast sends a message to all connected clients
|
|
func (h *Hub) Broadcast(messageType MessageType, payload interface{}) {
|
|
if h == nil {
|
|
return
|
|
}
|
|
if payload == nil {
|
|
logger.Warning("Attempted to broadcast nil payload")
|
|
return
|
|
}
|
|
|
|
msg := Message{
|
|
Type: messageType,
|
|
Payload: payload,
|
|
Time: getCurrentTimestamp(),
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
logger.Error("Failed to marshal WebSocket message:", err)
|
|
return
|
|
}
|
|
|
|
// Limit message size to prevent memory issues
|
|
const maxMessageSize = 1024 * 1024 // 1MB
|
|
if len(data) > maxMessageSize {
|
|
logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
|
|
return
|
|
}
|
|
|
|
// Non-blocking send with timeout to prevent delays
|
|
select {
|
|
case h.broadcast <- data:
|
|
case <-time.After(100 * time.Millisecond):
|
|
logger.Warning("WebSocket broadcast channel is full, dropping message")
|
|
case <-h.ctx.Done():
|
|
// Hub is shutting down
|
|
}
|
|
}
|
|
|
|
// BroadcastToTopic sends a message only to clients subscribed to the specific topic
|
|
func (h *Hub) BroadcastToTopic(messageType MessageType, payload interface{}) {
|
|
if h == nil {
|
|
return
|
|
}
|
|
if payload == nil {
|
|
logger.Warning("Attempted to broadcast nil payload to topic")
|
|
return
|
|
}
|
|
|
|
msg := Message{
|
|
Type: messageType,
|
|
Payload: payload,
|
|
Time: getCurrentTimestamp(),
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
logger.Error("Failed to marshal WebSocket message:", err)
|
|
return
|
|
}
|
|
|
|
// Limit message size to prevent memory issues
|
|
const maxMessageSize = 1024 * 1024 // 1MB
|
|
if len(data) > maxMessageSize {
|
|
logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
|
|
return
|
|
}
|
|
|
|
h.mu.RLock()
|
|
// Filter clients by topics and quickly release lock
|
|
subscribedClients := make([]*Client, 0)
|
|
for client := range h.clients {
|
|
if len(client.Topics) == 0 || client.Topics[messageType] {
|
|
subscribedClients = append(subscribedClients, client)
|
|
}
|
|
}
|
|
h.mu.RUnlock()
|
|
|
|
// Parallel send to subscribed clients
|
|
if len(subscribedClients) > 0 {
|
|
h.broadcastParallel(subscribedClients, data)
|
|
}
|
|
}
|
|
|
|
// GetClientCount returns the number of connected clients
|
|
func (h *Hub) GetClientCount() int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
return len(h.clients)
|
|
}
|
|
|
|
// Register registers a new client with the hub
|
|
func (h *Hub) Register(client *Client) {
|
|
if h == nil || client == nil {
|
|
return
|
|
}
|
|
select {
|
|
case h.register <- client:
|
|
case <-h.ctx.Done():
|
|
// Hub is shutting down
|
|
}
|
|
}
|
|
|
|
// Unregister unregisters a client from the hub
|
|
func (h *Hub) Unregister(client *Client) {
|
|
if h == nil || client == nil {
|
|
return
|
|
}
|
|
select {
|
|
case h.unregister <- client:
|
|
case <-h.ctx.Done():
|
|
// Hub is shutting down
|
|
}
|
|
}
|
|
|
|
// Stop gracefully stops the hub and closes all connections
|
|
func (h *Hub) Stop() {
|
|
if h == nil {
|
|
return
|
|
}
|
|
if h.cancel != nil {
|
|
h.cancel()
|
|
}
|
|
}
|
|
|
|
// getCurrentTimestamp returns current Unix timestamp in milliseconds
|
|
func getCurrentTimestamp() int64 {
|
|
return time.Now().UnixMilli()
|
|
}
|