ws/inbounds: realtime fixes + perf for 10k+ client inbounds

- hub: dedup, throttle, panic-restart, deadlock fix, race tests
- client: backoff cap + slow-retry instead of giving up
- broadcast: delta-only payload, count-based invalidate fallback
- filter: fix empty online list (Inbound has no .id, use dbInbound.toInbound)
- perf: O(N²)→O(N) traffic merge, bulk delete, /setEnable endpoint
- traffic: monotonic all_time + UI clamp + propagate in delta handler
- session: persist on update/logout (fixes logout-after-password-change)
- ui: protocol tags flex, traffic bar normalize
This commit is contained in:
lolka1333 2026-04-28 05:22:39 +02:00
parent 0b5c239f98
commit d14c54d21d
12 changed files with 1260 additions and 718 deletions

View file

@ -1,150 +1,208 @@
/** /**
* WebSocket client for real-time updates * WebSocket client for real-time panel updates.
*
* Public API (kept stable for index.html / inbounds.html / xray.html):
* - connect() open the connection (idempotent)
* - disconnect() close and stop reconnecting
* - on(event, callback) subscribe to event
* - off(event, callback) unsubscribe
* - send(data) send JSON to the server
* - isConnected boolean, current state
* - reconnectAttempts number, attempts since last success
* - maxReconnectAttempts number, give-up threshold
*
* Built-in events:
* 'connected', 'disconnected', 'error', 'message',
* plus any server-emitted message type (status, traffic, client_stats, ...).
*/ */
class WebSocketClient { class WebSocketClient {
static #MAX_PAYLOAD_BYTES = 10 * 1024 * 1024; // 10 MB, mirrors hub maxMessageSize.
static #BASE_RECONNECT_MS = 1000;
static #MAX_RECONNECT_MS = 30_000;
// After exhausting maxReconnectAttempts we switch to a polite slow-retry
// cadence rather than giving up forever — a panel that recovers an hour
// later should reconnect without a manual page reload.
static #SLOW_RETRY_MS = 60_000;
constructor(basePath = '') { constructor(basePath = '') {
this.basePath = basePath; this.basePath = basePath;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10; this.maxReconnectAttempts = 10;
this.reconnectDelay = 1000; this.reconnectAttempts = 0;
this.listeners = new Map();
this.isConnected = false; this.isConnected = false;
this.ws = null;
this.shouldReconnect = true; this.shouldReconnect = true;
this.reconnectTimer = null;
this.listeners = new Map(); // event → Set<callback>
} }
// Open the connection. Safe to call repeatedly — no-op if already
// open/connecting. Re-enables reconnects if previously disabled. Cancels
// any pending reconnect timer so an external connect() can't race a
// delayed retry into spawning a second socket.
connect() { connect() {
if (this.ws && (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING)) { if (this.ws && (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING)) {
return; return;
} }
this.shouldReconnect = true; this.shouldReconnect = true;
this.#cancelReconnect();
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; this.#openSocket();
// Ensure basePath ends with '/' for proper URL construction
let basePath = this.basePath || '';
if (basePath && !basePath.endsWith('/')) {
basePath += '/';
}
const wsUrl = `${protocol}//${window.location.host}${basePath}ws`;
console.log('WebSocket connecting to:', wsUrl, 'basePath:', this.basePath);
try {
this.ws = new WebSocket(wsUrl);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.isConnected = true;
this.reconnectAttempts = 0;
this.emit('connected');
};
this.ws.onmessage = (event) => {
try {
// Validate message size (prevent memory issues)
const maxMessageSize = 10 * 1024 * 1024; // 10MB
if (event.data && event.data.length > maxMessageSize) {
console.error('WebSocket message too large:', event.data.length, 'bytes');
this.ws.close();
return;
}
const message = JSON.parse(event.data);
if (!message || typeof message !== 'object') {
console.error('Invalid WebSocket message format');
return;
}
this.handleMessage(message);
} catch (e) {
console.error('Failed to parse WebSocket message:', e);
}
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
this.emit('error', error);
};
this.ws.onclose = () => {
console.log('WebSocket disconnected');
this.isConnected = false;
this.emit('disconnected');
if (this.shouldReconnect && this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
setTimeout(() => this.connect(), delay);
}
};
} catch (e) {
console.error('Failed to create WebSocket connection:', e);
this.emit('error', e);
}
}
handleMessage(message) {
const { type, payload, time } = message;
// Emit to specific type listeners
this.emit(type, payload, time);
// Emit to all listeners
this.emit('message', { type, payload, time });
}
on(event, callback) {
if (!this.listeners.has(event)) {
this.listeners.set(event, []);
}
const callbacks = this.listeners.get(event);
if (!callbacks.includes(callback)) {
callbacks.push(callback);
}
}
off(event, callback) {
if (!this.listeners.has(event)) {
return;
}
const callbacks = this.listeners.get(event);
const index = callbacks.indexOf(callback);
if (index > -1) {
callbacks.splice(index, 1);
}
}
emit(event, ...args) {
if (this.listeners.has(event)) {
this.listeners.get(event).forEach(callback => {
try {
callback(...args);
} catch (e) {
console.error('Error in WebSocket event handler:', e);
}
});
}
} }
// Close the connection and stop any pending reconnect attempt. Resets the
// attempt counter so a future connect() starts fresh from the small backoff.
disconnect() { disconnect() {
this.shouldReconnect = false; this.shouldReconnect = false;
this.#cancelReconnect();
this.reconnectAttempts = 0;
if (this.ws) { if (this.ws) {
this.ws.close(); try { this.ws.close(1000, 'client disconnect'); } catch { /* ignore */ }
this.ws = null; this.ws = null;
} }
this.isConnected = false;
} }
// Subscribe to an event. Re-subscribing the same callback is a no-op.
on(event, callback) {
if (typeof callback !== 'function') return;
let set = this.listeners.get(event);
if (!set) {
set = new Set();
this.listeners.set(event, set);
}
set.add(callback);
}
// Unsubscribe from an event.
off(event, callback) {
const set = this.listeners.get(event);
if (!set) return;
set.delete(callback);
if (set.size === 0) this.listeners.delete(event);
}
// Send JSON to the server. Drops silently if not connected — callers
// should rely on connect()/server pushes rather than client-initiated sends.
send(data) { send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) { if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data)); this.ws.send(JSON.stringify(data));
}
}
// ───── internals ─────
#openSocket() {
const url = this.#buildUrl();
let socket;
try {
socket = new WebSocket(url);
} catch (err) {
console.error('WebSocket: failed to construct connection', err);
this.#emit('error', err);
this.#scheduleReconnect();
return;
}
this.ws = socket;
socket.addEventListener('open', () => {
this.isConnected = true;
this.reconnectAttempts = 0;
this.#emit('connected');
});
socket.addEventListener('message', (event) => this.#onMessage(event));
socket.addEventListener('error', (event) => {
// Browsers fire 'error' before 'close' on failure. We surface it for
// consumers (so polling fallbacks can engage) but don't log every blip
// — bad networks would flood the console otherwise.
this.#emit('error', event);
});
socket.addEventListener('close', () => {
this.isConnected = false;
this.ws = null;
this.#emit('disconnected');
if (this.shouldReconnect) this.#scheduleReconnect();
});
}
#buildUrl() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
let basePath = this.basePath || '';
if (basePath && !basePath.endsWith('/')) basePath += '/';
return `${protocol}//${window.location.host}${basePath}ws`;
}
#onMessage(event) {
const data = event.data;
// Reject oversized payloads up front. event.data is a string for
// text frames; .length is the character count which is always ≤ byte
// count, so checking it is a conservative gate.
if (typeof data === 'string' && data.length > WebSocketClient.#MAX_PAYLOAD_BYTES) {
console.error(`WebSocket: payload too large (${data.length} chars), closing`);
try { this.ws?.close(1009, 'message too big'); } catch { /* ignore */ }
return;
}
let message;
try {
message = JSON.parse(data);
} catch (err) {
console.error('WebSocket: invalid JSON message', err);
return;
}
if (!message || typeof message !== 'object' || typeof message.type !== 'string') {
console.error('WebSocket: malformed message envelope');
return;
}
this.#emit(message.type, message.payload, message.time);
this.#emit('message', message);
}
#emit(event, ...args) {
const set = this.listeners.get(event);
if (!set) return;
for (const callback of set) {
try {
callback(...args);
} catch (err) {
console.error(`WebSocket: handler for "${event}" threw`, err);
}
}
}
#scheduleReconnect() {
if (!this.shouldReconnect) return;
this.#cancelReconnect();
let base;
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts += 1;
// Exponential backoff inside the active window.
const exp = WebSocketClient.#BASE_RECONNECT_MS * 2 ** (this.reconnectAttempts - 1);
base = Math.min(WebSocketClient.#MAX_RECONNECT_MS, exp);
} else { } else {
console.warn('WebSocket is not connected'); // Active window exhausted — keep trying once a minute. The page-level
// polling fallback runs in parallel; this just brings WS back when the
// network recovers.
base = WebSocketClient.#SLOW_RETRY_MS;
}
// ±25% jitter so reloads after a panel restart don't reconnect in lockstep.
const delay = base * (0.75 + Math.random() * 0.5);
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.#openSocket();
}, delay);
}
#cancelReconnect() {
if (this.reconnectTimer !== null) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
} }
} }
} }
// Create global WebSocket client instance // Global instance — basePath is set by page.html before this script loads.
// Safely get basePath from global scope (defined in page.html)
window.wsClient = new WebSocketClient(typeof basePath !== 'undefined' ? basePath : ''); window.wsClient = new WebSocketClient(typeof basePath !== 'undefined' ? basePath : '');

View file

@ -27,6 +27,34 @@ func NewInboundController(g *gin.RouterGroup) *InboundController {
return a return a
} }
// broadcastInboundsUpdateClientLimit is the threshold past which we skip the
// full-list push over WebSocket and signal the frontend to re-fetch via REST.
// Mirrors the same heuristic used by the periodic traffic job.
const broadcastInboundsUpdateClientLimit = 5000
// broadcastInboundsUpdate fetches and broadcasts the inbound list for userId.
// At scale (10k+ clients) the marshaled JSON exceeds the WS payload ceiling,
// so we send an invalidate signal instead — frontend re-fetches via REST.
// Skipped entirely when no WebSocket clients are connected.
func (a *InboundController) broadcastInboundsUpdate(userId int) {
if !websocket.HasClients() {
return
}
inbounds, err := a.inboundService.GetInbounds(userId)
if err != nil {
return
}
totalClients := 0
for _, ib := range inbounds {
totalClients += len(ib.ClientStats)
}
if totalClients > broadcastInboundsUpdateClientLimit {
websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
return
}
websocket.BroadcastInbounds(inbounds)
}
// initRouter initializes the routes for inbound-related operations. // initRouter initializes the routes for inbound-related operations.
func (a *InboundController) initRouter(g *gin.RouterGroup) { func (a *InboundController) initRouter(g *gin.RouterGroup) {
@ -38,6 +66,7 @@ func (a *InboundController) initRouter(g *gin.RouterGroup) {
g.POST("/add", a.addInbound) g.POST("/add", a.addInbound)
g.POST("/del/:id", a.delInbound) g.POST("/del/:id", a.delInbound)
g.POST("/update/:id", a.updateInbound) g.POST("/update/:id", a.updateInbound)
g.POST("/setEnable/:id", a.setInboundEnable)
g.POST("/clientIps/:email", a.getClientIps) g.POST("/clientIps/:email", a.getClientIps)
g.POST("/clearClientIps/:email", a.clearClientIps) g.POST("/clearClientIps/:email", a.clearClientIps)
g.POST("/addClient", a.addInboundClient) g.POST("/addClient", a.addInboundClient)
@ -134,9 +163,7 @@ func (a *InboundController) addInbound(c *gin.Context) {
if needRestart { if needRestart {
a.xrayService.SetToNeedRestart() a.xrayService.SetToNeedRestart()
} }
// Broadcast inbounds update via WebSocket a.broadcastInboundsUpdate(user.Id)
inbounds, _ := a.inboundService.GetInbounds(user.Id)
websocket.BroadcastInbounds(inbounds)
} }
// delInbound deletes an inbound configuration by its ID. // delInbound deletes an inbound configuration by its ID.
@ -155,10 +182,8 @@ func (a *InboundController) delInbound(c *gin.Context) {
if needRestart { if needRestart {
a.xrayService.SetToNeedRestart() a.xrayService.SetToNeedRestart()
} }
// Broadcast inbounds update via WebSocket
user := session.GetLoginUser(c) user := session.GetLoginUser(c)
inbounds, _ := a.inboundService.GetInbounds(user.Id) a.broadcastInboundsUpdate(user.Id)
websocket.BroadcastInbounds(inbounds)
} }
// updateInbound updates an existing inbound configuration. // updateInbound updates an existing inbound configuration.
@ -185,10 +210,43 @@ func (a *InboundController) updateInbound(c *gin.Context) {
if needRestart { if needRestart {
a.xrayService.SetToNeedRestart() a.xrayService.SetToNeedRestart()
} }
// Broadcast inbounds update via WebSocket
user := session.GetLoginUser(c) user := session.GetLoginUser(c)
inbounds, _ := a.inboundService.GetInbounds(user.Id) a.broadcastInboundsUpdate(user.Id)
websocket.BroadcastInbounds(inbounds) }
// setInboundEnable flips only the enable flag of an inbound. This is a
// dedicated endpoint because the regular update path serialises the entire
// settings JSON (every client) — far too heavy for an interactive switch
// on inbounds with thousands of clients. Frontend optimistically updates
// the UI; we just persist + sync xray + nudge other open admin sessions.
func (a *InboundController) setInboundEnable(c *gin.Context) {
id, err := strconv.Atoi(c.Param("id"))
if err != nil {
jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundUpdateSuccess"), err)
return
}
type form struct {
Enable bool `json:"enable" form:"enable"`
}
var f form
if err := c.ShouldBind(&f); err != nil {
jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
return
}
needRestart, err := a.inboundService.SetInboundEnable(id, f.Enable)
if err != nil {
jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
return
}
jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundUpdateSuccess"), nil)
if needRestart {
a.xrayService.SetToNeedRestart()
}
// Cross-admin sync: lightweight invalidate signal (a few hundred bytes)
// instead of fetching + serialising the whole inbound list. Other open
// sessions re-fetch via REST. The toggling admin's own UI already
// updated optimistically.
websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
} }
// getClientIps retrieves the IP addresses associated with a client by email. // getClientIps retrieves the IP addresses associated with a client by email.

View file

@ -10,7 +10,6 @@ import (
"github.com/mhsanaei/3x-ui/v2/web/service" "github.com/mhsanaei/3x-ui/v2/web/service"
"github.com/mhsanaei/3x-ui/v2/web/session" "github.com/mhsanaei/3x-ui/v2/web/session"
"github.com/gin-contrib/sessions"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
@ -95,9 +94,8 @@ func (a *IndexController) login(c *gin.Context) {
logger.Infof("%s logged in successfully, Ip Address: %s\n", safeUser, getRemoteIp(c)) logger.Infof("%s logged in successfully, Ip Address: %s\n", safeUser, getRemoteIp(c))
a.tgbot.UserLoginNotify(safeUser, ``, getRemoteIp(c), timeStr, 1) a.tgbot.UserLoginNotify(safeUser, ``, getRemoteIp(c), timeStr, 1)
session.SetLoginUser(c, user) if err := session.SetLoginUser(c, user); err != nil {
if err := sessions.Default(c).Save(); err != nil { logger.Warning("Unable to save session:", err)
logger.Warning("Unable to save session: ", err)
return return
} }
@ -111,9 +109,8 @@ func (a *IndexController) logout(c *gin.Context) {
if user != nil { if user != nil {
logger.Infof("%s logged out successfully", user.Username) logger.Infof("%s logged out successfully", user.Username)
} }
session.ClearSession(c) if err := session.ClearSession(c); err != nil {
if err := sessions.Default(c).Save(); err != nil { logger.Warning("Unable to clear session on logout:", err)
logger.Warning("Unable to save session after clearing:", err)
} }
c.Redirect(http.StatusTemporaryRedirect, c.GetString("base_path")) c.Redirect(http.StatusTemporaryRedirect, c.GetString("base_path"))
} }

View file

@ -99,7 +99,9 @@ func (a *SettingController) updateUser(c *gin.Context) {
if err == nil { if err == nil {
user.Username = form.NewUsername user.Username = form.NewUsername
user.Password, _ = crypto.HashPasswordAsBcrypt(form.NewPassword) user.Password, _ = crypto.HashPasswordAsBcrypt(form.NewPassword)
session.SetLoginUser(c, user) if saveErr := session.SetLoginUser(c, user); saveErr != nil {
err = saveErr
}
} }
jsonMsg(c, I18nWeb(c, "pages.settings.toasts.modifyUser"), err) jsonMsg(c, I18nWeb(c, "pages.settings.toasts.modifyUser"), err)
} }

View file

@ -1,7 +1,9 @@
package controller package controller
import ( import (
"net"
"net/http" "net/http"
"net/url"
"strings" "strings"
"time" "time"
@ -16,105 +18,74 @@ import (
) )
const ( const (
// Time allowed to write a message to the peer writeWait = 10 * time.Second
writeWait = 10 * time.Second pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
// Time allowed to read the next pong message from the peer clientReadLimit = 512
pongWait = 60 * time.Second
// Send pings to peer with this period (must be less than pongWait)
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer
maxMessageSize = 512
) )
var upgrader = ws.Upgrader{ var upgrader = ws.Upgrader{
ReadBufferSize: 32768, ReadBufferSize: 32768,
WriteBufferSize: 32768, WriteBufferSize: 32768,
EnableCompression: true, // Negotiate permessage-deflate compression if the client supports it EnableCompression: true,
CheckOrigin: checkSameOrigin,
CheckOrigin: func(r *http.Request) bool {
// Check origin for security
origin := r.Header.Get("Origin")
if origin == "" {
// Allow connections without Origin header (same-origin requests)
return true
}
// Get the host from the request
host := r.Host
// Extract scheme and host from origin
originURL := origin
// Simple check: origin should match the request host
// This prevents cross-origin WebSocket hijacking
if strings.HasPrefix(originURL, "http://") || strings.HasPrefix(originURL, "https://") {
// Extract host from origin
originHost := strings.TrimPrefix(strings.TrimPrefix(originURL, "http://"), "https://")
if idx := strings.Index(originHost, "/"); idx != -1 {
originHost = originHost[:idx]
}
if idx := strings.Index(originHost, ":"); idx != -1 {
originHost = originHost[:idx]
}
// Compare hosts (without port)
requestHost := host
if idx := strings.Index(requestHost, ":"); idx != -1 {
requestHost = requestHost[:idx]
}
return originHost == requestHost || originHost == "" || requestHost == ""
}
return false
},
} }
// WebSocketController handles WebSocket connections for real-time updates // checkSameOrigin allows requests with no Origin header (same-origin or non-browser
// clients) and otherwise requires the Origin hostname to match the request hostname.
// Comparison is case-insensitive (RFC 7230 §2.7.3) and ignores port differences
// (the panel often sits behind a reverse proxy on a different port).
func checkSameOrigin(r *http.Request) bool {
origin := r.Header.Get("Origin")
if origin == "" {
return true
}
u, err := url.Parse(origin)
if err != nil || u.Hostname() == "" {
return false
}
host, _, err := net.SplitHostPort(r.Host)
if err != nil {
host = r.Host
}
return strings.EqualFold(u.Hostname(), host)
}
// WebSocketController handles WebSocket connections for real-time updates.
type WebSocketController struct { type WebSocketController struct {
BaseController BaseController
hub *websocket.Hub hub *websocket.Hub
} }
// NewWebSocketController creates a new WebSocket controller // NewWebSocketController creates a new WebSocket controller.
func NewWebSocketController(hub *websocket.Hub) *WebSocketController { func NewWebSocketController(hub *websocket.Hub) *WebSocketController {
return &WebSocketController{ return &WebSocketController{hub: hub}
hub: hub,
}
} }
// HandleWebSocket handles WebSocket connections // HandleWebSocket upgrades the HTTP connection and starts the read/write pumps.
func (w *WebSocketController) HandleWebSocket(c *gin.Context) { func (w *WebSocketController) HandleWebSocket(c *gin.Context) {
// Check authentication
if !session.IsLogin(c) { if !session.IsLogin(c) {
logger.Warningf("Unauthorized WebSocket connection attempt from %s", getRemoteIp(c)) logger.Warningf("Unauthorized WebSocket connection attempt from %s", getRemoteIp(c))
c.AbortWithStatus(http.StatusUnauthorized) c.AbortWithStatus(http.StatusUnauthorized)
return return
} }
// Upgrade connection to WebSocket
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil { if err != nil {
logger.Error("Failed to upgrade WebSocket connection:", err) logger.Error("Failed to upgrade WebSocket connection:", err)
return return
} }
// Create client client := websocket.NewClient(uuid.New().String())
clientID := uuid.New().String()
client := &websocket.Client{
ID: clientID,
Hub: w.hub,
Send: make(chan []byte, 512), // Increased from 256 to 512 to prevent overflow
Topics: make(map[websocket.MessageType]bool),
}
// Register client
w.hub.Register(client) w.hub.Register(client)
logger.Debugf("WebSocket client %s registered from %s", clientID, getRemoteIp(c)) logger.Debugf("WebSocket client %s registered from %s", client.ID, getRemoteIp(c))
// Start goroutines for reading and writing
go w.writePump(client, conn) go w.writePump(client, conn)
go w.readPump(client, conn) go w.readPump(client, conn)
} }
// readPump pumps messages from the WebSocket connection to the hub // readPump consumes inbound frames so the gorilla deadline/pong machinery keeps
// running. Clients send no commands today; frames are discarded.
func (w *WebSocketController) readPump(client *websocket.Client, conn *ws.Conn) { func (w *WebSocketController) readPump(client *websocket.Client, conn *ws.Conn) {
defer func() { defer func() {
if r := common.Recover("WebSocket readPump panic"); r != nil { if r := common.Recover("WebSocket readPump panic"); r != nil {
@ -124,35 +95,23 @@ func (w *WebSocketController) readPump(client *websocket.Client, conn *ws.Conn)
conn.Close() conn.Close()
}() }()
conn.SetReadLimit(clientReadLimit)
conn.SetReadDeadline(time.Now().Add(pongWait)) conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error { conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(pongWait)) return conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
}) })
conn.SetReadLimit(maxMessageSize)
for { for {
_, message, err := conn.ReadMessage() if _, _, err := conn.ReadMessage(); err != nil {
if err != nil {
if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) { if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) {
logger.Debugf("WebSocket read error for client %s: %v", client.ID, err) logger.Debugf("WebSocket read error for client %s: %v", client.ID, err)
} }
break return
} }
// Validate message size
if len(message) > maxMessageSize {
logger.Warningf("WebSocket message from client %s exceeds max size: %d bytes", client.ID, len(message))
continue
}
// Handle incoming messages (e.g., subscription requests)
// For now, we'll just log them
logger.Debugf("Received WebSocket message from client %s: %s", client.ID, string(message))
} }
} }
// writePump pumps messages from the hub to the WebSocket connection // writePump pushes hub messages to the connection and emits keepalive pings.
func (w *WebSocketController) writePump(client *websocket.Client, conn *ws.Conn) { func (w *WebSocketController) writePump(client *websocket.Client, conn *ws.Conn) {
ticker := time.NewTicker(pingPeriod) ticker := time.NewTicker(pingPeriod)
defer func() { defer func() {
@ -165,17 +124,13 @@ func (w *WebSocketController) writePump(client *websocket.Client, conn *ws.Conn)
for { for {
select { select {
case message, ok := <-client.Send: case msg, ok := <-client.Send:
conn.SetWriteDeadline(time.Now().Add(writeWait)) conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok { if !ok {
// Hub closed the channel
conn.WriteMessage(ws.CloseMessage, []byte{}) conn.WriteMessage(ws.CloseMessage, []byte{})
return return
} }
if err := conn.WriteMessage(ws.TextMessage, msg); err != nil {
// Send each message individually (no batching)
// This ensures each JSON message is sent separately and can be parsed correctly
if err := conn.WriteMessage(ws.TextMessage, message); err != nil {
logger.Debugf("WebSocket write error for client %s: %v", client.ID, err) logger.Debugf("WebSocket write error for client %s: %v", client.ID, err)
return return
} }

View file

@ -318,19 +318,15 @@
</a-dropdown> </a-dropdown>
</template> </template>
<template slot="protocol" slot-scope="text, dbInbound"> <template slot="protocol" slot-scope="text, dbInbound">
<a-tag :style="{ margin: '0' }" color="purple">[[ <div class="protocol-tags">
dbInbound.protocol ]]</a-tag> <a-tag color="purple">[[ dbInbound.protocol ]]</a-tag>
<template <template
v-if="dbInbound.isVMess || dbInbound.isVLess || dbInbound.isTrojan || dbInbound.isSS"> v-if="dbInbound.isVMess || dbInbound.isVLess || dbInbound.isTrojan || dbInbound.isSS">
<a-tag :style="{ margin: '0' }" color="green">[[ <a-tag color="green">[[ dbInbound.toInbound().stream.network ]]</a-tag>
dbInbound.toInbound().stream.network ]]</a-tag> <a-tag v-if="dbInbound.toInbound().stream.isTls" color="blue">TLS</a-tag>
<a-tag :style="{ margin: '0' }" <a-tag v-if="dbInbound.toInbound().stream.isReality" color="blue">Reality</a-tag>
v-if="dbInbound.toInbound().stream.isTls" </template>
color="blue">TLS</a-tag> </div>
<a-tag :style="{ margin: '0' }"
v-if="dbInbound.toInbound().stream.isReality"
color="blue">Reality</a-tag>
</template>
</template> </template>
<template slot="clients" slot-scope="text, dbInbound"> <template slot="clients" slot-scope="text, dbInbound">
<template v-if="clientCount[dbInbound.id]"> <template v-if="clientCount[dbInbound.id]">
@ -1107,7 +1103,7 @@
trafficDiff: 0, trafficDiff: 0,
defaultCert: '', defaultCert: '',
defaultKey: '', defaultKey: '',
clientCount: [], clientCount: {},
onlineClients: [], onlineClients: [],
lastOnlineMap: {}, lastOnlineMap: {},
isRefreshEnabled: localStorage.getItem("isRefreshEnabled") === "true" ? true : false, isRefreshEnabled: localStorage.getItem("isRefreshEnabled") === "true" ? true : false,
@ -1131,6 +1127,71 @@
loading(spinning = true) { loading(spinning = true) {
this.loadingStates.spinning = spinning; this.loadingStates.spinning = spinning;
}, },
// applyClientStatsDelta updates client traffic counters and inbound totals
// in-place from a WebSocket delta payload. Avoids full-list re-fetch and
// re-render — critical at 10k+ client scale.
applyClientStatsDelta(payload) {
if (!payload || typeof payload !== 'object') return;
const inboundsById = new Map();
this.dbInbounds.forEach(ib => inboundsById.set(ib.id, ib));
const touched = new Set();
if (Array.isArray(payload.clients) && payload.clients.length > 0) {
for (const stat of payload.clients) {
const dbInbound = inboundsById.get(stat.inboundId);
if (!dbInbound || !Array.isArray(dbInbound.clientStats)) continue;
const cs = dbInbound.clientStats.find(c => c.email === stat.email);
if (!cs) continue;
cs.up = stat.up;
cs.down = stat.down;
// allTime is the cumulative-historical counter shown in the
// "Общий трафик" column. The previous handler updated up/down/
// total but skipped allTime, so that column stayed frozen at
// its initial-page-load value until a manual refresh.
if (stat.allTime !== undefined) cs.allTime = stat.allTime;
if (stat.total !== undefined) cs.total = stat.total;
if (stat.expiryTime !== undefined) cs.expiryTime = stat.expiryTime;
if (stat.lastOnline !== undefined) cs.lastOnline = stat.lastOnline;
if (stat.enable !== undefined) cs.enable = stat.enable;
touched.add(stat.inboundId);
}
}
if (Array.isArray(payload.inbounds) && payload.inbounds.length > 0) {
for (const summary of payload.inbounds) {
const dbInbound = inboundsById.get(summary.id);
if (!dbInbound) continue;
dbInbound.up = summary.up;
dbInbound.down = summary.down;
if (summary.total !== undefined) dbInbound.total = summary.total;
if (summary.allTime !== undefined) dbInbound.allTime = summary.allTime;
if (summary.enable !== undefined) dbInbound.enable = summary.enable;
}
}
// Recompute clientCount for inbounds whose stats changed. The cached
// parsed Inbound is fetched via dbInbound.toInbound() — earlier
// versions used `this.inbounds.find(ib => ib.id === id)` which
// ALWAYS returned undefined (the Inbound class has no id field), so
// this branch silently never ran and depleted/expiring/online filters
// never refreshed from delta updates.
if (touched.size > 0) {
for (const id of touched) {
const dbInbound = inboundsById.get(id);
if (dbInbound) {
this.$set(this.clientCount, id, this.getClientCounts(dbInbound, dbInbound.toInbound()));
}
}
}
// Re-run filter/search so the displayed slice picks up updated values.
if (this.enableFilter) {
this.filterInbounds();
} else {
this.searchInbounds(this.searchKey);
}
},
async getDBInbounds() { async getDBInbounds() {
this.refreshing = true; this.refreshing = true;
const msg = await HttpUtil.get('/panel/api/inbounds/list'); const msg = await HttpUtil.get('/panel/api/inbounds/list');
@ -1185,7 +1246,11 @@
setInbounds(dbInbounds) { setInbounds(dbInbounds) {
this.inbounds.splice(0); this.inbounds.splice(0);
this.dbInbounds.splice(0); this.dbInbounds.splice(0);
this.clientCount.splice(0); // Drop every existing key — Vue.delete keeps it reactive so any
// template expression watching clientCount[id] re-renders cleanly.
for (const key of Object.keys(this.clientCount)) {
this.$delete(this.clientCount, key);
}
for (const inbound of dbInbounds) { for (const inbound of dbInbounds) {
const dbInbound = new DBInbound(inbound); const dbInbound = new DBInbound(inbound);
to_inbound = dbInbound.toInbound() to_inbound = dbInbound.toInbound()
@ -1195,7 +1260,9 @@
if (dbInbound.isSS && (!to_inbound.isSSMultiUser)) { if (dbInbound.isSS && (!to_inbound.isSSMultiUser)) {
continue; continue;
} }
this.clientCount[inbound.id] = this.getClientCounts(inbound, to_inbound); // Reactive add — direct assignment on the map would not trigger
// template updates in Vue 2.
this.$set(this.clientCount, inbound.id, this.getClientCounts(inbound, to_inbound));
} }
} }
if (!this.loadingStates.fetched) { if (!this.loadingStates.fetched) {
@ -1681,37 +1748,29 @@
newDbInbound = this.checkFallback(dbInbound); newDbInbound = this.checkFallback(dbInbound);
infoModal.show(newDbInbound, index); infoModal.show(newDbInbound, index);
}, },
switchEnable(dbInboundId, state) { // switchEnable toggles inbound.enable through a dedicated lightweight
let dbInbound = this.dbInbounds.find(row => row.id === dbInboundId); // endpoint. The previous implementation re-submitted the entire
// inbound settings JSON (every client) just to flip a boolean — on a
// 7000+ client inbound that meant a multi-MB request, an O(N) traffic
// diff and a full xray-config rebuild for every click of the switch.
async switchEnable(dbInboundId, state) {
const dbInbound = this.dbInbounds.find(row => row.id === dbInboundId);
if (!dbInbound) return; if (!dbInbound) return;
dbInbound.enable = state;
let inbound = dbInbound.toInbound();
const data = {
up: dbInbound.up,
down: dbInbound.down,
total: dbInbound.total,
remark: dbInbound.remark,
enable: dbInbound.enable,
expiryTime: dbInbound.expiryTime,
trafficReset: dbInbound.trafficReset,
lastTrafficResetTime: dbInbound.lastTrafficResetTime,
listen: inbound.listen, const previous = dbInbound.enable;
port: inbound.port, dbInbound.enable = state; // optimistic: UI reflects the click immediately
protocol: inbound.protocol,
settings: inbound.settings.toString(),
};
if (inbound.canEnableStream()) {
data.streamSettings = inbound.stream.toString();
} else if (inbound.stream?.sockopt) {
data.streamSettings = JSON.stringify({ sockopt: inbound.stream.sockopt.toJson() }, null, 2);
}
data.sniffing = inbound.sniffing.toString();
const formData = new FormData(); const formData = new FormData();
Object.keys(data).forEach(key => formData.append(key, data[key])); formData.append('enable', String(state));
this.submit(`/panel/api/inbounds/update/${dbInboundId}`, formData); try {
const msg = await HttpUtil.post(`/panel/api/inbounds/setEnable/${dbInboundId}`, formData);
if (!msg || !msg.success) {
dbInbound.enable = previous;
}
} catch (e) {
dbInbound.enable = previous;
}
}, },
async switchEnableClient(dbInboundId, client, state) { async switchEnableClient(dbInboundId, client, state) {
this.loading(); this.loading();
@ -1820,9 +1879,15 @@
}, },
getAllTimeClient(dbInbound, email) { getAllTimeClient(dbInbound, email) {
if (!email || email.length == 0) return 0; if (!email || email.length == 0) return 0;
let clientStats = this.getClientStats(dbInbound, email); const clientStats = this.getClientStats(dbInbound, email);
if (!clientStats) return 0; if (!clientStats) return 0;
return clientStats.allTime || (clientStats.up + clientStats.down); // allTime represents cumulative historical usage and must never
// appear smaller than the currently-tracked counters. If a stale
// row drifts below up+down (manual edits, partial migrations) we
// surface the live total instead of the misleading historical one.
const current = (clientStats.up || 0) + (clientStats.down || 0);
const allTime = clientStats.allTime || 0;
return allTime > current ? allTime : current;
}, },
getRemStats(dbInbound, email) { getRemStats(dbInbound, email) {
if (!email || email.length == 0) return 0; if (!email || email.length == 0) return 0;
@ -2029,16 +2094,21 @@
} }
this.loading(); this.loading();
this.getDefaultSettings(); this.getDefaultSettings();
// Initial data fetch // Bootstrap from REST first, then attach WebSocket subscriptions.
// Doing this in order eliminates a race where an early `inbounds` push
// fires getClientCounts() before this.onlineClients is populated,
// leaving online[] empty for every inbound and breaking the filter.
this.getDBInbounds().then(() => { this.getDBInbounds().then(() => {
this.loading(false); this.loading(false);
});
// Setup WebSocket for real-time updates if (!window.wsClient) {
if (window.wsClient) { // Fallback to polling if WebSocket is not available
if (this.isRefreshEnabled) this.startDataRefreshLoop();
return;
}
window.wsClient.connect(); window.wsClient.connect();
// Listen for inbounds updates // Listen for inbounds updates
window.wsClient.on('inbounds', (payload) => { window.wsClient.on('inbounds', (payload) => {
if (payload && Array.isArray(payload)) { if (payload && Array.isArray(payload)) {
@ -2047,12 +2117,13 @@
} }
}); });
// Listen for invalidate signals (sent when payload is too large for WebSocket) // Listen for invalidate signals — last-resort safety only.
// The server sends a lightweight notification and we re-fetch via REST API // Under normal operation the server pushes 'client_stats' deltas
// instead, so this fires only when an admin mutation produces an
// oversized full-list payload.
let invalidateTimer = null; let invalidateTimer = null;
window.wsClient.on('invalidate', (payload) => { window.wsClient.on('invalidate', (payload) => {
if (payload && (payload.type === 'inbounds' || payload.type === 'traffic')) { if (payload && (payload.type === 'inbounds' || payload.type === 'traffic')) {
// Debounce to avoid flooding the REST API with multiple invalidate signals
if (invalidateTimer) clearTimeout(invalidateTimer); if (invalidateTimer) clearTimeout(invalidateTimer);
invalidateTimer = setTimeout(() => { invalidateTimer = setTimeout(() => {
invalidateTimer = null; invalidateTimer = null;
@ -2061,15 +2132,36 @@
} }
}); });
// Listen for traffic updates // Real-time delta updates: per-client absolute counters + inbound
// totals applied in-place. Replaces the periodic full-list refresh
// and scales to 10k+ clients without REST fallback.
window.wsClient.on('client_stats', (payload) => {
if (!payload) return;
this.applyClientStatsDelta(payload);
});
// Listen for traffic updates.
// Note: clientTraffics contains DELTA values (incremental since last
// tick), not absolute totals. Absolute counters are updated through
// the 'client_stats' event in applyClientStatsDelta.
window.wsClient.on('traffic', (payload) => { window.wsClient.on('traffic', (payload) => {
// Note: Do NOT update total consumed traffic (stats.up, stats.down) from this event if (!payload || typeof payload !== 'object') return;
// because clientTraffics contains delta/incremental values, not total accumulated values.
// Total traffic is updated via the 'inbounds' WebSocket event (or 'invalidate' fallback for large panels). // Normalize onlineClients: server marshals a nil []string slice as
// JSON null when nobody is online. Treat null/undefined/missing as
// Update online clients list in real-time // an empty array so the "everyone went offline" transition still
if (payload && Array.isArray(payload.onlineClients)) { // updates the UI — without this fix, the last set of online users
const nextOnlineClients = payload.onlineClients; // stayed visible (and the online filter kept showing them) until
// someone came back online.
const hasOnlinePayload =
'onlineClients' in payload &&
(Array.isArray(payload.onlineClients) || payload.onlineClients == null);
if (hasOnlinePayload) {
const nextOnlineClients = Array.isArray(payload.onlineClients)
? payload.onlineClients
: [];
// Detect change in either direction: length differs OR sets differ.
let onlineChanged = this.onlineClients.length !== nextOnlineClients.length; let onlineChanged = this.onlineClients.length !== nextOnlineClients.length;
if (!onlineChanged) { if (!onlineChanged) {
const prevSet = new Set(this.onlineClients); const prevSet = new Set(this.onlineClients);
@ -2080,18 +2172,24 @@
} }
} }
} }
this.onlineClients = nextOnlineClients; this.onlineClients = nextOnlineClients;
if (onlineChanged) { if (onlineChanged) {
// Recalculate client counts to update online status // Recompute clientCount for every inbound whose stats can host
// Use $set for Vue 2 reactivity — direct array index assignment is not reactive // online clients. `dbInbound.toInbound()` returns the cached
// parsed Inbound (with the .clients array) — using it directly
// avoids a brittle `this.inbounds.find(ib => ib.id === ...)`
// lookup that ALWAYS failed because the Inbound class has no
// `id` field. That silent failure was the real cause of the
// online filter showing an empty list while a client was
// clearly online elsewhere on the page.
this.dbInbounds.forEach(dbInbound => { this.dbInbounds.forEach(dbInbound => {
const inbound = this.inbounds.find(ib => ib.id === dbInbound.id); const inbound = dbInbound.toInbound();
if (inbound && this.clientCount[dbInbound.id]) { this.$set(this.clientCount, dbInbound.id, this.getClientCounts(dbInbound, inbound));
this.$set(this.clientCount, dbInbound.id, this.getClientCounts(dbInbound, inbound));
}
}); });
// Always trigger UI refresh — not just when filter is enabled // Re-run filter/search so the UI reflects the new state — both
// when clients come online and when they go offline.
if (this.enableFilter) { if (this.enableFilter) {
this.filterInbounds(); this.filterInbounds();
} else { } else {
@ -2099,10 +2197,10 @@
} }
} }
} }
// Update last online map in real-time // Update last-online map. Server sends the full map (not delta) so
// Replace entirely (server sends the full map) to avoid unbounded growth from deleted clients // we can replace entirely without growing unbounded from deleted clients.
if (payload && payload.lastOnlineMap && typeof payload.lastOnlineMap === 'object') { if (payload.lastOnlineMap && typeof payload.lastOnlineMap === 'object') {
this.lastOnlineMap = payload.lastOnlineMap; this.lastOnlineMap = payload.lastOnlineMap;
} }
}); });
@ -2123,12 +2221,7 @@
} }
} }
}); });
} else { });
// Fallback to polling if WebSocket is not available
if (this.isRefreshEnabled) {
this.startDataRefreshLoop();
}
}
}, },
computed: { computed: {
total() { total() {
@ -2171,5 +2264,65 @@
left: 50vw !important; left: 50vw !important;
} }
} }
/* Protocol cell — wrap tags into a flex grid with consistent gap so
vless/xhttp/Reality line up cleanly instead of stacking awkwardly. */
.inbounds-page .protocol-tags {
display: inline-flex;
flex-wrap: wrap;
gap: 4px;
align-items: center;
max-width: 100%;
}
.inbounds-page .protocol-tags .ant-tag {
margin: 0;
line-height: 20px;
}
/* Traffic cell — text on the sides sizes to its content, the progress bar
takes whatever's left. Without this, fixed-width text cells leave gaps
around short values like "∞" and clip long ones like "999.99 GB". */
.inbounds-page .tr-table-box {
display: inline-flex;
gap: 6px;
align-items: center;
width: 100%;
min-width: 0;
}
.inbounds-page .tr-table-rt,
.inbounds-page .tr-table-lt {
flex: 0 0 auto;
white-space: nowrap;
font-variant-numeric: tabular-nums;
}
.inbounds-page .tr-table-rt { text-align: end; }
.inbounds-page .tr-table-lt { text-align: start; }
.inbounds-page .tr-table-bar {
flex: 1 1 auto;
min-width: 60px;
}
/* Make the progress widget fill its flex cell, and align the inner fill
pill with the outer track pill (the "two pills" drift was caused by
box-sizing: content-box plus a 1px border on .ant-progress-bg). */
.inbounds-page .tr-table-bar .ant-progress,
.inbounds-page .tr-table-bar .ant-progress-outer,
.inbounds-page .tr-table-bar .ant-progress-inner {
display: block;
width: 100%;
margin: 0;
padding: 0;
}
.inbounds-page .infinite-bar .ant-progress-inner,
.inbounds-page .tr-table-bar .ant-progress-inner {
box-sizing: border-box;
border-radius: 100px;
overflow: hidden;
}
.inbounds-page .infinite-bar .ant-progress-inner .ant-progress-bg,
.inbounds-page .tr-table-bar .ant-progress-inner .ant-progress-bg {
box-sizing: border-box;
border: 0 !important;
}
</style> </style>
{{ template "page/body_end" .}} {{ template "page/body_end" .}}

View file

@ -24,7 +24,9 @@ func NewXrayTrafficJob() *XrayTrafficJob {
return new(XrayTrafficJob) return new(XrayTrafficJob)
} }
// Run collects traffic statistics from Xray and updates the database, triggering restart if needed. // Run collects traffic statistics from Xray, updates the database, and pushes
// real-time updates over WebSocket using compact delta payloads — no REST
// fallback, scales to 10k20k+ clients per inbound.
func (j *XrayTrafficJob) Run() { func (j *XrayTrafficJob) Run() {
if !j.xrayService.IsXrayRunning() { if !j.xrayService.IsXrayRunning() {
return return
@ -50,50 +52,85 @@ func (j *XrayTrafficJob) Run() {
j.xrayService.SetToNeedRestart() j.xrayService.SetToNeedRestart()
} }
// If no frontend client is connected, skip all WebSocket broadcasting routines, // If no frontend client is connected, skip all WebSocket broadcasting
// including expensive DB queries for online clients and JSON marshaling. // routines — including the active-client DB query and JSON marshaling.
if !websocket.HasClients() { if !websocket.HasClients() {
return return
} }
// Update online clients list and map // Online presence + traffic deltas — small payload, always fits in WS.
// Force non-nil slice/map so JSON marshalling produces [] / {} instead of
// `null` when everyone is offline. The frontend's traffic handler treats
// a missing/null onlineClients field as "no update", so without this the
// "everyone went offline" transition was silently dropped — stale online
// users lingered in the list and the online filter kept showing them.
onlineClients := j.inboundService.GetOnlineClients() onlineClients := j.inboundService.GetOnlineClients()
if onlineClients == nil {
onlineClients = []string{}
}
lastOnlineMap, err := j.inboundService.GetClientsLastOnline() lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
if err != nil { if err != nil {
logger.Warning("get clients last online failed:", err) logger.Warning("get clients last online failed:", err)
}
if lastOnlineMap == nil {
lastOnlineMap = make(map[string]int64) lastOnlineMap = make(map[string]int64)
} }
websocket.BroadcastTraffic(map[string]any{
// Broadcast traffic update (deltas and online stats) via WebSocket
trafficUpdate := map[string]any{
"traffics": traffics, "traffics": traffics,
"clientTraffics": clientTraffics, "clientTraffics": clientTraffics,
"onlineClients": onlineClients, "onlineClients": onlineClients,
"lastOnlineMap": lastOnlineMap, "lastOnlineMap": lastOnlineMap,
} })
websocket.BroadcastTraffic(trafficUpdate)
// Fetch updated inbounds from database with accumulated traffic values // Compact delta payload: per-client absolute counters for clients active
// This ensures frontend receives the actual total traffic for real-time UI refresh. // this cycle, plus inbound-level absolute totals. Frontend applies both
updatedInbounds, err := j.inboundService.GetAllInbounds() // in-place — typical payload ~1050KB even for 10k+ client deployments.
if err != nil { // Replaces the old full-inbound-list broadcast that hit WS size limits
logger.Warning("get all inbounds for websocket failed:", err) // (510MB) and forced the frontend into a REST refetch.
clientStatsPayload := map[string]any{}
if activeEmails := activeEmails(clientTraffics); len(activeEmails) > 0 {
if stats, err := j.inboundService.GetActiveClientTraffics(activeEmails); err != nil {
logger.Warning("get active client traffics for websocket failed:", err)
} else if len(stats) > 0 {
clientStatsPayload["clients"] = stats
}
}
if inboundSummary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
logger.Warning("get inbounds traffic summary for websocket failed:", err)
} else if len(inboundSummary) > 0 {
clientStatsPayload["inbounds"] = inboundSummary
}
if len(clientStatsPayload) > 0 {
websocket.BroadcastClientStats(clientStatsPayload)
} }
updatedOutbounds, err := j.outboundService.GetOutboundsTraffic() // Outbounds list is small (one row per outbound, no per-client expansion)
if err != nil { // so the full snapshot still fits comfortably in WS.
if updatedOutbounds, err := j.outboundService.GetOutboundsTraffic(); err == nil && updatedOutbounds != nil {
websocket.BroadcastOutbounds(updatedOutbounds)
} else if err != nil {
logger.Warning("get all outbounds for websocket failed:", err) logger.Warning("get all outbounds for websocket failed:", err)
} }
}
// The WebSocket hub will automatically check the payload size. // activeEmails returns the set of client emails that had non-zero traffic in
// If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead. // the current collection window. Idle clients are skipped — no need to push
if updatedInbounds != nil { // their (unchanged) counters to the frontend.
websocket.BroadcastInbounds(updatedInbounds) func activeEmails(clientTraffics []*xray.ClientTraffic) []string {
if len(clientTraffics) == 0 {
return nil
} }
emails := make([]string, 0, len(clientTraffics))
if updatedOutbounds != nil { for _, ct := range clientTraffics {
websocket.BroadcastOutbounds(updatedOutbounds) if ct == nil || ct.Email == "" {
continue
}
if ct.Up == 0 && ct.Down == 0 {
continue
}
emails = append(emails, ct.Email)
} }
return emails
} }
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) { func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {

View file

@ -366,10 +366,21 @@ func (s *InboundService) DelInbound(id int) (bool, error) {
if err != nil { if err != nil {
return false, err return false, err
} }
for _, client := range clients { // Bulk-delete client IPs for every email in this inbound. The previous
err := s.DelClientIPs(db, client.Email) // per-client loop fired one DELETE per row — at 7k+ clients that meant
if err != nil { // thousands of synchronous SQL roundtrips and a multi-second freeze.
return false, err if len(clients) > 0 {
emails := make([]string, 0, len(clients))
for i := range clients {
if clients[i].Email != "" {
emails = append(emails, clients[i].Email)
}
}
if len(emails) > 0 {
if err := db.Where("client_email IN ?", emails).
Delete(model.InboundClientIps{}).Error; err != nil {
return false, err
}
} }
} }
@ -386,6 +397,58 @@ func (s *InboundService) GetInbound(id int) (*model.Inbound, error) {
return inbound, nil return inbound, nil
} }
// SetInboundEnable toggles only the enable flag of an inbound, without
// rewriting the (potentially multi-MB) settings JSON. Used by the UI's
// per-row enable switch — for inbounds with thousands of clients the full
// UpdateInbound path is an order of magnitude too slow for an interactive
// toggle (parses + reserialises every client, runs O(N) traffic diff).
//
// Returns (needRestart, error). needRestart is true when the xray runtime
// could not be re-synced from the cached config and a full restart is
// required to pick up the change.
func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) {
inbound, err := s.GetInbound(id)
if err != nil {
return false, err
}
if inbound.Enable == enable {
return false, nil
}
db := database.GetDB()
if err := db.Model(model.Inbound{}).Where("id = ?", id).
Update("enable", enable).Error; err != nil {
return false, err
}
inbound.Enable = enable
// Sync xray runtime: drop the live inbound, add it back if we're enabling.
needRestart := false
s.xrayApi.Init(p.GetAPIPort())
defer s.xrayApi.Close()
_ = s.xrayApi.DelInbound(inbound.Tag)
if !enable {
return false, nil
}
runtimeInbound, err := s.buildRuntimeInboundForAPI(db, inbound)
if err != nil {
logger.Debug("SetInboundEnable: build runtime config failed:", err)
return true, nil
}
inboundJson, err := json.MarshalIndent(runtimeInbound.GenXrayInboundConfig(), "", " ")
if err != nil {
logger.Debug("SetInboundEnable: marshal runtime config failed:", err)
return true, nil
}
if err := s.xrayApi.AddInbound(inboundJson); err != nil {
logger.Debug("SetInboundEnable: AddInbound via api failed:", err)
needRestart = true
}
return needRestart, nil
}
// UpdateInbound modifies an existing inbound configuration. // UpdateInbound modifies an existing inbound configuration.
// It validates changes, updates the database, and syncs with the running Xray instance. // It validates changes, updates the database, and syncs with the running Xray instance.
// Returns the updated inbound, whether Xray needs restart, and any error. // Returns the updated inbound, whether Xray needs restart, and any error.
@ -589,6 +652,11 @@ func (s *InboundService) buildRuntimeInboundForAPI(tx *gorm.DB, inbound *model.I
return &runtimeInbound, nil return &runtimeInbound, nil
} }
// updateClientTraffics syncs the ClientTraffic rows with the inbound's clients
// list: removes rows for emails that disappeared, inserts rows for newly-added
// emails. Uses sets for O(N) lookup — the previous nested-loop implementation
// was O(N²) and degraded into multi-second pauses on inbounds with thousands
// of clients (toggling, saving, or deleting any such inbound felt frozen).
func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inbound, newInbound *model.Inbound) error { func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inbound, newInbound *model.Inbound) error {
oldClients, err := s.GetClients(oldInbound) oldClients, err := s.GetClients(oldInbound)
if err != nil { if err != nil {
@ -599,36 +667,31 @@ func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inb
return err return err
} }
var emailExists bool oldEmails := make(map[string]struct{}, len(oldClients))
for i := range oldClients {
oldEmails[oldClients[i].Email] = struct{}{}
}
newEmails := make(map[string]struct{}, len(newClients))
for i := range newClients {
newEmails[newClients[i].Email] = struct{}{}
}
for _, oldClient := range oldClients { // Removed clients — drop their stats rows.
emailExists = false for i := range oldClients {
for _, newClient := range newClients { if _, kept := newEmails[oldClients[i].Email]; kept {
if oldClient.Email == newClient.Email { continue
emailExists = true
break
}
} }
if !emailExists { if err := s.DelClientStat(tx, oldClients[i].Email); err != nil {
err = s.DelClientStat(tx, oldClient.Email) return err
if err != nil {
return err
}
} }
} }
for _, newClient := range newClients { // Added clients — create their stats rows.
emailExists = false for i := range newClients {
for _, oldClient := range oldClients { if _, existed := oldEmails[newClients[i].Email]; existed {
if newClient.Email == oldClient.Email { continue
emailExists = true
break
}
} }
if !emailExists { if err := s.AddClientStat(tx, oldInbound.Id, &newClients[i]); err != nil {
err = s.AddClientStat(tx, oldInbound.Id, &newClient) return err
if err != nil {
return err
}
} }
} }
return nil return nil
@ -1320,20 +1383,27 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr
return err return err
} }
// Index by email for O(N) merge — the previous nested loop was O(N²)
// and dominated each cron tick on inbounds with thousands of active
// clients (7500 × 7500 = 56M string comparisons every 10 seconds).
trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
for i := range traffics {
if traffics[i] != nil {
trafficByEmail[traffics[i].Email] = traffics[i]
}
}
now := time.Now().UnixMilli()
for dbTraffic_index := range dbClientTraffics { for dbTraffic_index := range dbClientTraffics {
for traffic_index := range traffics { t, ok := trafficByEmail[dbClientTraffics[dbTraffic_index].Email]
if dbClientTraffics[dbTraffic_index].Email == traffics[traffic_index].Email { if !ok {
dbClientTraffics[dbTraffic_index].Up += traffics[traffic_index].Up continue
dbClientTraffics[dbTraffic_index].Down += traffics[traffic_index].Down }
dbClientTraffics[dbTraffic_index].AllTime += (traffics[traffic_index].Up + traffics[traffic_index].Down) dbClientTraffics[dbTraffic_index].Up += t.Up
dbClientTraffics[dbTraffic_index].Down += t.Down
// Add user in onlineUsers array on traffic dbClientTraffics[dbTraffic_index].AllTime += t.Up + t.Down
if traffics[traffic_index].Up+traffics[traffic_index].Down > 0 { if t.Up+t.Down > 0 {
onlineClients = append(onlineClients, traffics[traffic_index].Email) onlineClients = append(onlineClients, t.Email)
dbClientTraffics[dbTraffic_index].LastOnline = time.Now().UnixMilli() dbClientTraffics[dbTraffic_index].LastOnline = now
}
break
}
} }
} }
@ -2318,6 +2388,50 @@ func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffi
return traffics, nil return traffics, nil
} }
// GetActiveClientTraffics returns the absolute ClientTraffic rows for the given
// emails in a single batched query. Used by the WebSocket delta path to push
// per-client absolute counters without re-serializing the full inbound list.
// Empty input or a "record not found" result returns an empty slice.
func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) {
if len(emails) == 0 {
return nil, nil
}
db := database.GetDB()
var traffics []*xray.ClientTraffic
err := db.Model(xray.ClientTraffic{}).Where("email IN (?)", emails).Find(&traffics).Error
if err != nil && err != gorm.ErrRecordNotFound {
return nil, err
}
return traffics, nil
}
// InboundTrafficSummary is the minimal projection of an inbound's traffic
// counters used by the WebSocket delta path. Excludes Settings/StreamSettings
// blobs so the broadcast stays compact even with many inbounds.
type InboundTrafficSummary struct {
Id int `json:"id"`
Up int64 `json:"up"`
Down int64 `json:"down"`
Total int64 `json:"total"`
AllTime int64 `json:"allTime"`
Enable bool `json:"enable"`
}
// GetInboundsTrafficSummary returns inbound-level absolute traffic counters
// (no per-client expansion). Companion to GetActiveClientTraffics — together
// they replace the heavy "full inbound list" broadcast on each cron tick.
func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
db := database.GetDB()
var summaries []InboundTrafficSummary
err := db.Model(&model.Inbound{}).
Select("id, up, down, total, all_time, enable").
Find(&summaries).Error
if err != nil && err != gorm.ErrRecordNotFound {
return nil, err
}
return summaries, nil
}
func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.ClientTraffic, err error) { func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.ClientTraffic, err error) {
// Prefer retrieving along with client to reflect actual enabled state from inbound settings // Prefer retrieving along with client to reflect actual enabled state from inbound settings
t, client, err := s.GetClientByEmail(email) t, client, err := s.GetClientByEmail(email)
@ -2336,9 +2450,17 @@ func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.Cl
func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error { func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error {
db := database.GetDB() db := database.GetDB()
// Keep all_time monotonic: it represents historical cumulative usage and
// must never be less than the currently-tracked up+down. Without this,
// the UI showed "Общий трафик" (allTime) below the live consumed value
// after admins manually edited a client's counters.
result := db.Model(xray.ClientTraffic{}). result := db.Model(xray.ClientTraffic{}).
Where("email = ?", email). Where("email = ?", email).
Updates(map[string]any{"up": upload, "down": download}) Updates(map[string]any{
"up": upload,
"down": download,
"all_time": gorm.Expr("CASE WHEN COALESCE(all_time, 0) < ? THEN ? ELSE all_time END", upload+download, upload+download),
})
err := result.Error err := result.Error
if err != nil { if err != nil {

View file

@ -7,6 +7,7 @@ import (
"net/http" "net/http"
"github.com/mhsanaei/3x-ui/v2/database/model" "github.com/mhsanaei/3x-ui/v2/database/model"
"github.com/mhsanaei/3x-ui/v2/logger"
"github.com/gin-contrib/sessions" "github.com/gin-contrib/sessions"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@ -20,14 +21,16 @@ func init() {
gob.Register(model.User{}) gob.Register(model.User{})
} }
// SetLoginUser stores the authenticated user in the session. // SetLoginUser stores the authenticated user in the session and persists it.
// The user object is serialized and stored for subsequent requests. // gin-contrib/sessions does not auto-save; callers that forget Save() leave
func SetLoginUser(c *gin.Context, user *model.User) { // the cookie out of sync with server state — this helper avoids that pitfall.
func SetLoginUser(c *gin.Context, user *model.User) error {
if user == nil { if user == nil {
return return nil
} }
s := sessions.Default(c) s := sessions.Default(c)
s.Set(loginUserKey, *user) s.Set(loginUserKey, *user)
return s.Save()
} }
// GetLoginUser retrieves the authenticated user from the session. // GetLoginUser retrieves the authenticated user from the session.
@ -40,22 +43,26 @@ func GetLoginUser(c *gin.Context) *model.User {
} }
user, ok := obj.(model.User) user, ok := obj.(model.User)
if !ok { if !ok {
// Stale or incompatible session payload — wipe and persist immediately
// so subsequent requests don't keep hitting the same broken cookie.
s.Delete(loginUserKey) s.Delete(loginUserKey)
if err := s.Save(); err != nil {
logger.Warning("session: failed to drop stale user payload:", err)
}
return nil return nil
} }
return &user return &user
} }
// IsLogin checks if a user is currently authenticated in the session. // IsLogin checks if a user is currently authenticated in the session.
// Returns true if a valid user session exists, false otherwise.
func IsLogin(c *gin.Context) bool { func IsLogin(c *gin.Context) bool {
return GetLoginUser(c) != nil return GetLoginUser(c) != nil
} }
// ClearSession removes all session data and invalidates the session. // ClearSession invalidates the session and tells the browser to drop the cookie.
// This effectively logs out the user and clears any stored session information. // The cookie attributes (Path/HttpOnly/SameSite) must mirror those used when
func ClearSession(c *gin.Context) { // the cookie was created or browsers will keep it.
func ClearSession(c *gin.Context) error {
s := sessions.Default(c) s := sessions.Default(c)
s.Clear() s.Clear()
cookiePath := c.GetString("base_path") cookiePath := c.GetString("base_path")
@ -68,4 +75,5 @@ func ClearSession(c *gin.Context) {
HttpOnly: true, HttpOnly: true,
SameSite: http.SameSiteLaxMode, SameSite: http.SameSiteLaxMode,
}) })
return s.Save()
} }

View file

@ -1,402 +1,360 @@
// Package websocket provides WebSocket hub for real-time updates and notifications. // Package websocket provides a WebSocket hub for real-time updates and notifications.
package websocket package websocket
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"runtime"
"sync" "sync"
"time" "time"
"github.com/mhsanaei/3x-ui/v2/logger" "github.com/mhsanaei/3x-ui/v2/logger"
) )
// MessageType represents the type of WebSocket message // MessageType identifies the kind of WebSocket message.
type MessageType string type MessageType string
const ( const (
MessageTypeStatus MessageType = "status" // Server status update MessageTypeStatus MessageType = "status"
MessageTypeTraffic MessageType = "traffic" // Traffic statistics update MessageTypeTraffic MessageType = "traffic"
MessageTypeInbounds MessageType = "inbounds" // Inbounds list update MessageTypeInbounds MessageType = "inbounds"
MessageTypeNotification MessageType = "notification" // System notification MessageTypeOutbounds MessageType = "outbounds"
MessageTypeXrayState MessageType = "xray_state" // Xray state change MessageTypeNotification MessageType = "notification"
MessageTypeOutbounds MessageType = "outbounds" // Outbounds list update MessageTypeXrayState MessageType = "xray_state"
MessageTypeInvalidate MessageType = "invalidate" // Lightweight signal telling frontend to re-fetch data via REST // MessageTypeClientStats carries absolute traffic counters for the clients
// that had activity in the latest collection window. Frontend applies these
// in-place — far smaller than re-broadcasting the full inbound list and
// scales to 10k+ clients without falling back to REST.
MessageTypeClientStats MessageType = "client_stats"
MessageTypeInvalidate MessageType = "invalidate" // Tells frontend to re-fetch via REST (last-resort).
// maxMessageSize caps the WebSocket payload. Beyond this the hub sends a
// lightweight invalidate signal and the frontend re-fetches via REST.
// 10MB lets typical 2k8k-client deployments push directly via WS (low
// latency); larger installs fall back to invalidate.
maxMessageSize = 10 * 1024 * 1024 // 10MB
enqueueTimeout = 100 * time.Millisecond
clientSendQueue = 512 // ~50s of buffering for a momentarily slow browser.
hubBroadcastQueue = 2048 // Headroom for cron-storm + admin-mutation bursts.
// minBroadcastInterval throttles per-type broadcasts so cron storms or
// rapid mutations cannot drown the hub. Bursts collapse to one delivery.
// Status/traffic/notifications/xray_state/invalidate bypass this gate so
// real-time signals are never delayed.
minBroadcastInterval = 250 * time.Millisecond
// hubRestartAttempts caps panic-recovery restarts. After this many
// consecutive failures we stop trying and log; the panel keeps running
// (frontend falls back to REST polling) and the operator can investigate.
hubRestartAttempts = 3
) )
// Message represents a WebSocket message // NewClient builds a Client ready for hub registration.
func NewClient(id string) *Client {
return &Client{
ID: id,
Send: make(chan []byte, clientSendQueue),
}
}
// Message is the wire format sent to clients.
type Message struct { type Message struct {
Type MessageType `json:"type"` Type MessageType `json:"type"`
Payload any `json:"payload"` Payload any `json:"payload"`
Time int64 `json:"time"` Time int64 `json:"time"`
} }
// Client represents a WebSocket client connection // Client represents a single WebSocket connection.
type Client struct { type Client struct {
ID string ID string
Send chan []byte Send chan []byte
Hub *Hub closeOnce sync.Once
Topics map[MessageType]bool // Subscribed topics
closeOnce sync.Once // Ensures Send channel is closed exactly once
} }
// Hub maintains the set of active clients and broadcasts messages to them // Hub fan-outs messages to all connected clients.
type Hub struct { type Hub struct {
// Registered clients clients map[*Client]struct{}
clients map[*Client]bool broadcast chan []byte
register chan *Client
// Inbound messages from clients
broadcast chan []byte
// Register requests from clients
register chan *Client
// Unregister requests from clients
unregister chan *Client unregister chan *Client
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
// Mutex for thread-safe operations throttleMu sync.Mutex
mu sync.RWMutex lastBroadcast map[MessageType]time.Time
// Context for graceful shutdown
ctx context.Context
cancel context.CancelFunc
// Worker pool for parallel broadcasting
workerPoolSize int
} }
// NewHub creates a new WebSocket hub // NewHub creates a hub. Call Run in a goroutine to start its event loop.
func NewHub() *Hub { func NewHub() *Hub {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// Calculate optimal worker pool size (CPU cores * 2, but max 100)
workerPoolSize := runtime.NumCPU() * 2
if workerPoolSize > 100 {
workerPoolSize = 100
}
if workerPoolSize < 10 {
workerPoolSize = 10
}
return &Hub{ return &Hub{
clients: make(map[*Client]bool), clients: make(map[*Client]struct{}),
broadcast: make(chan []byte, 2048), // Increased from 256 to 2048 for high load broadcast: make(chan []byte, hubBroadcastQueue),
register: make(chan *Client, 100), // Buffered channel for fast registration register: make(chan *Client, 64),
unregister: make(chan *Client, 100), // Buffered channel for fast unregistration unregister: make(chan *Client, 64),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
workerPoolSize: workerPoolSize, lastBroadcast: make(map[MessageType]time.Time),
} }
} }
// Run starts the hub's main loop // shouldThrottle returns true if a broadcast of msgType happened within
// minBroadcastInterval. Status/traffic/invalidate skip the gate so heartbeats
// and re-fetch signals are never dropped.
func (h *Hub) shouldThrottle(msgType MessageType) bool {
switch msgType {
case MessageTypeStatus, MessageTypeTraffic, MessageTypeClientStats,
MessageTypeInvalidate, MessageTypeNotification, MessageTypeXrayState:
return false
}
h.throttleMu.Lock()
defer h.throttleMu.Unlock()
now := time.Now()
if last, ok := h.lastBroadcast[msgType]; ok && now.Sub(last) < minBroadcastInterval {
return true
}
h.lastBroadcast[msgType] = now
return false
}
// Run drives the hub. The inner loop is wrapped in a panic-recovery harness
// that retries up to hubRestartAttempts times with backoff so a transient
// panic doesn't permanently kill real-time updates for commercial deployments.
// After the cap, the hub stays down and the frontend falls back to REST polling.
func (h *Hub) Run() { func (h *Hub) Run() {
for attempt := 0; attempt < hubRestartAttempts; attempt++ {
stopped := h.runOnce()
if stopped {
return
}
if attempt < hubRestartAttempts-1 {
wait := time.Duration(1<<attempt) * time.Second // 1s, 2s, 4s
logger.Errorf("WebSocket hub crashed, restarting in %s (%d/%d)", wait, attempt+1, hubRestartAttempts-1)
select {
case <-time.After(wait):
case <-h.ctx.Done():
return
}
}
}
logger.Error("WebSocket hub stopped after exhausting restart attempts")
}
// runOnce drives the event loop once and returns true if the hub stopped
// cleanly (context cancelled). On panic, recover logs and returns false so
// Run can decide whether to retry.
func (h *Hub) runOnce() (stopped bool) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
logger.Error("WebSocket hub panic recovered:", r) logger.Errorf("WebSocket hub panic recovered: %v", r)
// Restart the hub loop stopped = false
go h.Run()
} }
}() }()
for { for {
select { select {
case <-h.ctx.Done(): case <-h.ctx.Done():
// Graceful shutdown: close all clients h.shutdown()
h.mu.Lock() return true
for client := range h.clients {
client.closeOnce.Do(func() {
close(client.Send)
})
}
h.clients = make(map[*Client]bool)
h.mu.Unlock()
logger.Info("WebSocket hub stopped gracefully")
return
case client := <-h.register: case c := <-h.register:
if client == nil { if c == nil {
continue continue
} }
h.mu.Lock() h.mu.Lock()
h.clients[client] = true h.clients[c] = struct{}{}
count := len(h.clients) n := len(h.clients)
h.mu.Unlock() h.mu.Unlock()
logger.Debugf("WebSocket client connected: %s (total: %d)", client.ID, count) logger.Debugf("WebSocket client connected: %s (total: %d)", c.ID, n)
case client := <-h.unregister: case c := <-h.unregister:
if client == nil { if c == nil {
continue continue
} }
h.mu.Lock() h.removeClient(c)
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
client.closeOnce.Do(func() {
close(client.Send)
})
}
count := len(h.clients)
h.mu.Unlock()
logger.Debugf("WebSocket client disconnected: %s (total: %d)", client.ID, count)
case message := <-h.broadcast: case msg := <-h.broadcast:
if message == nil { h.fanout(msg)
continue
}
// Optimization: quickly copy client list and release lock
h.mu.RLock()
clientCount := len(h.clients)
if clientCount == 0 {
h.mu.RUnlock()
continue
}
// Pre-allocate memory for client list
clients := make([]*Client, 0, clientCount)
for client := range h.clients {
clients = append(clients, client)
}
h.mu.RUnlock()
// Parallel broadcast using worker pool
h.broadcastParallel(clients, message)
} }
} }
} }
// broadcastParallel sends message to all clients in parallel for maximum performance // shutdown closes all client send channels and clears the registry.
func (h *Hub) broadcastParallel(clients []*Client, message []byte) { func (h *Hub) shutdown() {
if len(clients) == 0 { h.mu.Lock()
return for c := range h.clients {
c.closeOnce.Do(func() { close(c.Send) })
} }
h.clients = make(map[*Client]struct{})
// For small number of clients, use simple parallel sending h.mu.Unlock()
if len(clients) < h.workerPoolSize { logger.Info("WebSocket hub stopped")
var wg sync.WaitGroup
for _, client := range clients {
wg.Add(1)
go func(c *Client) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
// Channel may be closed, safely ignore
logger.Debugf("WebSocket broadcast panic recovered for client %s: %v", c.ID, r)
}
}()
select {
case c.Send <- message:
default:
// Client's send buffer is full, disconnect
logger.Debugf("WebSocket client %s send buffer full, disconnecting", c.ID)
h.Unregister(c)
}
}(client)
}
wg.Wait()
return
}
// For large number of clients, use worker pool for optimal performance
clientChan := make(chan *Client, len(clients))
for _, client := range clients {
clientChan <- client
}
close(clientChan)
// Use a local WaitGroup to avoid blocking hub shutdown
var wg sync.WaitGroup
wg.Add(h.workerPoolSize)
for i := 0; i < h.workerPoolSize; i++ {
go func() {
defer wg.Done()
for client := range clientChan {
func() {
defer func() {
if r := recover(); r != nil {
// Channel may be closed, safely ignore
logger.Debugf("WebSocket broadcast panic recovered for client %s: %v", client.ID, r)
}
}()
select {
case client.Send <- message:
default:
// Client's send buffer is full, disconnect
logger.Debugf("WebSocket client %s send buffer full, disconnecting", client.ID)
h.Unregister(client)
}
}()
}
}()
}
// Wait for all workers to finish
wg.Wait()
} }
// Broadcast sends a message to all connected clients // removeClient deletes a client and closes its send channel exactly once.
func (h *Hub) Broadcast(messageType MessageType, payload any) { func (h *Hub) removeClient(c *Client) {
if h == nil { h.mu.Lock()
return if _, ok := h.clients[c]; ok {
} delete(h.clients, c)
if payload == nil { c.closeOnce.Do(func() { close(c.Send) })
logger.Warning("Attempted to broadcast nil payload")
return
}
// Skip all work if no clients are connected
if h.GetClientCount() == 0 {
return
}
msg := Message{
Type: messageType,
Payload: payload,
Time: getCurrentTimestamp(),
}
data, err := json.Marshal(msg)
if err != nil {
logger.Error("Failed to marshal WebSocket message:", err)
return
}
// If message exceeds size limit, send a lightweight invalidate notification
// instead of dropping it entirely — the frontend will re-fetch via REST API
const maxMessageSize = 10 * 1024 * 1024 // 10MB
if len(data) > maxMessageSize {
logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType)
h.broadcastInvalidate(messageType)
return
}
// Non-blocking send with timeout to prevent delays
select {
case h.broadcast <- data:
case <-time.After(100 * time.Millisecond):
logger.Warning("WebSocket broadcast channel is full, dropping message")
case <-h.ctx.Done():
// Hub is shutting down
} }
n := len(h.clients)
h.mu.Unlock()
logger.Debugf("WebSocket client disconnected: %s (total: %d)", c.ID, n)
} }
// BroadcastToTopic sends a message only to clients subscribed to the specific topic // fanout delivers msg to every client. Each send is non-blocking — a client
func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) { // whose buffer is full is collected for direct removal at the end. We do NOT
if h == nil { // route slow-client unregistrations through the unregister channel: under
// burst load (panel restart, network blip) that channel can fill up while the
// hub itself is the consumer, causing a self-deadlock.
func (h *Hub) fanout(msg []byte) {
if msg == nil {
return return
} }
if payload == nil {
logger.Warning("Attempted to broadcast nil payload to topic")
return
}
// Skip all work if no clients are connected
if h.GetClientCount() == 0 {
return
}
msg := Message{
Type: messageType,
Payload: payload,
Time: getCurrentTimestamp(),
}
data, err := json.Marshal(msg)
if err != nil {
logger.Error("Failed to marshal WebSocket message:", err)
return
}
// If message exceeds size limit, send a lightweight invalidate notification
const maxMessageSize = 10 * 1024 * 1024 // 10MB
if len(data) > maxMessageSize {
logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType)
h.broadcastInvalidate(messageType)
return
}
h.mu.RLock() h.mu.RLock()
// Filter clients by topics and quickly release lock if len(h.clients) == 0 {
subscribedClients := make([]*Client, 0) h.mu.RUnlock()
for client := range h.clients { return
if len(client.Topics) == 0 || client.Topics[messageType] { }
subscribedClients = append(subscribedClients, client) targets := make([]*Client, 0, len(h.clients))
} for c := range h.clients {
targets = append(targets, c)
} }
h.mu.RUnlock() h.mu.RUnlock()
// Parallel send to subscribed clients var dead []*Client
if len(subscribedClients) > 0 { for _, c := range targets {
h.broadcastParallel(subscribedClients, data) if !trySend(c, msg) {
dead = append(dead, c)
}
}
if len(dead) == 0 {
return
}
h.mu.Lock()
for _, c := range dead {
if _, ok := h.clients[c]; ok {
delete(h.clients, c)
c.closeOnce.Do(func() { close(c.Send) })
logger.Debugf("WebSocket client %s send buffer full, disconnected", c.ID)
}
}
h.mu.Unlock()
}
// trySend performs a non-blocking write to the client's Send channel.
// Returns false if the client should be evicted (full buffer or closed channel).
// A defer-recover guards against the rare race where the channel was closed
// concurrently — sending on a closed channel always panics, even with select+default.
func trySend(c *Client, msg []byte) (ok bool) {
defer func() {
if r := recover(); r != nil {
ok = false
}
}()
select {
case c.Send <- msg:
return true
default:
return false
} }
} }
// GetClientCount returns the number of connected clients // Broadcast serializes payload and queues it for delivery to all clients.
// If the serialized message exceeds maxMessageSize, an invalidate signal is
// queued instead so the frontend re-fetches via REST. Bursts of the same
// message type within minBroadcastInterval collapse to a single delivery.
func (h *Hub) Broadcast(messageType MessageType, payload any) {
if h == nil || payload == nil || h.GetClientCount() == 0 {
return
}
if h.shouldThrottle(messageType) {
return
}
data, err := json.Marshal(Message{
Type: messageType,
Payload: payload,
Time: time.Now().UnixMilli(),
})
if err != nil {
logger.Error("WebSocket marshal failed:", err)
return
}
if len(data) > maxMessageSize {
logger.Debugf("WebSocket payload %d bytes exceeds limit, sending invalidate for %s", len(data), messageType)
h.broadcastInvalidate(messageType)
return
}
h.enqueue(data)
}
// broadcastInvalidate queues a lightweight signal telling clients to re-fetch
// the named data type via REST.
func (h *Hub) broadcastInvalidate(originalType MessageType) {
data, err := json.Marshal(Message{
Type: MessageTypeInvalidate,
Payload: map[string]string{"type": string(originalType)},
Time: time.Now().UnixMilli(),
})
if err != nil {
logger.Error("WebSocket invalidate marshal failed:", err)
return
}
h.enqueue(data)
}
// enqueue submits raw bytes to the broadcast channel. Dropped on backpressure
// (channel full for >100ms) or shutdown.
func (h *Hub) enqueue(data []byte) {
select {
case h.broadcast <- data:
case <-time.After(enqueueTimeout):
logger.Warning("WebSocket broadcast channel full, dropping message")
case <-h.ctx.Done():
}
}
// GetClientCount returns the number of connected clients.
func (h *Hub) GetClientCount() int { func (h *Hub) GetClientCount() int {
if h == nil {
return 0
}
h.mu.RLock() h.mu.RLock()
defer h.mu.RUnlock() defer h.mu.RUnlock()
return len(h.clients) return len(h.clients)
} }
// Register registers a new client with the hub // Register adds a client to the hub.
func (h *Hub) Register(client *Client) { func (h *Hub) Register(c *Client) {
if h == nil || client == nil { if h == nil || c == nil {
return return
} }
select { select {
case h.register <- client: case h.register <- c:
case <-h.ctx.Done(): case <-h.ctx.Done():
// Hub is shutting down
} }
} }
// Unregister unregisters a client from the hub // Unregister removes a client from the hub. Non-blocking: if the unregister
func (h *Hub) Unregister(client *Client) { // channel is full (transient burst), the request is dropped — the client will
if h == nil || client == nil { // be unregistered on its next failed send or when the hub shuts down.
// A blocking send here is unsafe because callers may include the hub goroutine
// itself, which would self-deadlock.
func (h *Hub) Unregister(c *Client) {
if h == nil || c == nil {
return return
} }
select { select {
case h.unregister <- client: case h.unregister <- c:
case <-h.ctx.Done(): default:
// Hub is shutting down
} }
} }
// Stop gracefully stops the hub and closes all connections // Stop signals the hub to shut down and close all client connections.
func (h *Hub) Stop() { func (h *Hub) Stop() {
if h == nil { if h != nil && h.cancel != nil {
return
}
if h.cancel != nil {
h.cancel() h.cancel()
} }
} }
// broadcastInvalidate sends a lightweight invalidate message to all clients,
// telling them to re-fetch the specified data type via REST API.
// This is used when the full payload exceeds the WebSocket message size limit.
func (h *Hub) broadcastInvalidate(originalType MessageType) {
msg := Message{
Type: MessageTypeInvalidate,
Payload: map[string]string{"type": string(originalType)},
Time: getCurrentTimestamp(),
}
data, err := json.Marshal(msg)
if err != nil {
logger.Error("Failed to marshal invalidate message:", err)
return
}
// Non-blocking send with timeout
select {
case h.broadcast <- data:
case <-time.After(100 * time.Millisecond):
logger.Warning("WebSocket broadcast channel is full, dropping invalidate message")
case <-h.ctx.Done():
}
}
// getCurrentTimestamp returns current Unix timestamp in milliseconds
func getCurrentTimestamp() int64 {
return time.Now().UnixMilli()
}

192
web/websocket/hub_test.go Normal file
View file

@ -0,0 +1,192 @@
package websocket
import (
"os"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/mhsanaei/3x-ui/v2/logger"
"github.com/op/go-logging"
)
func TestMain(m *testing.M) {
// Initialize logger so hub.go calls don't panic on nil global.
logger.InitLogger(logging.CRITICAL)
code := m.Run()
logger.CloseLogger()
// Clean up the log directory created by InitLogger so the test leaves
// no artefacts in the working tree.
os.RemoveAll("log")
os.Exit(code)
}
// TestFanoutNoDeadlockOnSlowClients verifies that the hub does NOT self-deadlock
// when many clients have full Send buffers simultaneously. Regression guard for
// the bug where fanout called Unregister() on each slow client, the unregister
// channel filled (cap 64), and the hub blocked on its own consumer.
func TestFanoutNoDeadlockOnSlowClients(t *testing.T) {
hub := NewHub()
go hub.Run()
defer hub.Stop()
// Spawn 200 clients but never read from their Send channels — all are "slow".
// 200 > unregister channel capacity (64), which would have triggered the
// deadlock in the old code.
const n = 200
clients := make([]*Client, n)
for i := 0; i < n; i++ {
clients[i] = NewClient(string(rune('a' + i%26)))
hub.Register(clients[i])
}
// Wait for registrations to be processed.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) && hub.GetClientCount() < n {
time.Sleep(10 * time.Millisecond)
}
if got := hub.GetClientCount(); got < n {
t.Fatalf("only %d/%d clients registered after 2s", got, n)
}
// Fill every client's send buffer so the next broadcast triggers eviction.
for _, c := range clients {
for i := 0; i < clientSendQueue; i++ {
select {
case c.Send <- []byte("filler"):
default:
}
}
}
// This broadcast should evict ALL clients without deadlocking the hub.
hub.Broadcast(MessageTypeStatus, map[string]string{"x": "y"})
// Wait for eviction with a hard cap — if the hub deadlocked, this hangs
// past the timeout and t.Fatalf fires.
deadline = time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) && hub.GetClientCount() > 0 {
time.Sleep(10 * time.Millisecond)
}
if got := hub.GetClientCount(); got > 0 {
t.Fatalf("deadlock: %d clients still registered after broadcast (expected 0)", got)
}
}
// TestConcurrentBroadcastAndDisconnect stresses the hub with parallel
// Broadcast calls while clients connect and disconnect. Regression guard for
// races between fanout, removeClient, and shutdown.
func TestConcurrentBroadcastAndDisconnect(t *testing.T) {
hub := NewHub()
go hub.Run()
defer hub.Stop()
var wg sync.WaitGroup
stop := make(chan struct{})
// Continuous broadcasters.
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
hub.Broadcast(MessageTypeStatus, map[string]int{"v": 1})
}
}
}()
}
// Continuous register/unregister churn.
var connected int64
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
c := NewClient("churn")
hub.Register(c)
atomic.AddInt64(&connected, 1)
// Drain a few messages so we don't block.
go func() {
for range c.Send {
}
}()
time.Sleep(time.Millisecond)
hub.Unregister(c)
}
}
}()
time.Sleep(500 * time.Millisecond)
close(stop)
wg.Wait()
if atomic.LoadInt64(&connected) == 0 {
t.Fatal("no clients churned through hub")
}
}
// TestThrottlingBlocksBurstButLetsRealtimeThrough verifies that real-time
// message types (status, traffic) are NEVER throttled, while inbounds bursts
// are throttled.
func TestThrottlingBlocksBurstButLetsRealtimeThrough(t *testing.T) {
hub := NewHub()
if hub.shouldThrottle(MessageTypeStatus) {
t.Error("status must never be throttled")
}
if hub.shouldThrottle(MessageTypeTraffic) {
t.Error("traffic must never be throttled")
}
if hub.shouldThrottle(MessageTypeNotification) {
t.Error("notification must never be throttled")
}
if hub.shouldThrottle(MessageTypeInvalidate) {
t.Error("invalidate must never be throttled")
}
// First inbounds broadcast goes through, immediate retry is throttled.
if hub.shouldThrottle(MessageTypeInbounds) {
t.Error("first inbounds broadcast must pass")
}
if !hub.shouldThrottle(MessageTypeInbounds) {
t.Error("second inbounds broadcast within window must throttle")
}
// After the window passes, throttle releases.
time.Sleep(minBroadcastInterval + 10*time.Millisecond)
if hub.shouldThrottle(MessageTypeInbounds) {
t.Error("inbounds broadcast after window must pass")
}
}
// TestHubStopUnblocksWaiters ensures that pending Broadcast/Register/Unregister
// calls don't leak goroutines after Stop().
func TestHubStopUnblocksWaiters(t *testing.T) {
hub := NewHub()
// Don't start Run — leave channels unfeed so any blocking call would hang.
hub.Stop()
done := make(chan struct{})
go func() {
// All these should return promptly since ctx is cancelled.
hub.Register(NewClient("x"))
hub.Unregister(NewClient("x"))
hub.Broadcast(MessageTypeStatus, "data")
close(done)
}()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("calls did not return after Stop()")
}
}

View file

@ -6,7 +6,7 @@ import (
"github.com/mhsanaei/3x-ui/v2/web/global" "github.com/mhsanaei/3x-ui/v2/web/global"
) )
// GetHub returns the global WebSocket hub instance // GetHub returns the global WebSocket hub instance.
func GetHub() *Hub { func GetHub() *Hub {
webServer := global.GetWebServer() webServer := global.GetWebServer()
if webServer == nil { if webServer == nil {
@ -24,80 +24,82 @@ func GetHub() *Hub {
return wsHub return wsHub
} }
// HasClients returns true if there are any WebSocket clients connected. // HasClients returns true if any WebSocket client is connected.
// Use this to skip expensive work (DB queries, serialization) when no browser is open. // Use this to skip expensive work (DB queries, serialization) when no browser is open.
func HasClients() bool { func HasClients() bool {
hub := GetHub() hub := GetHub()
if hub == nil { return hub != nil && hub.GetClientCount() > 0
return false
}
return hub.GetClientCount() > 0
} }
// BroadcastStatus broadcasts server status update to all connected clients // BroadcastStatus broadcasts server status update to all connected clients.
func BroadcastStatus(status any) { func BroadcastStatus(status any) {
hub := GetHub() if hub := GetHub(); hub != nil {
if hub != nil {
hub.Broadcast(MessageTypeStatus, status) hub.Broadcast(MessageTypeStatus, status)
} }
} }
// BroadcastTraffic broadcasts traffic statistics update to all connected clients // BroadcastTraffic broadcasts traffic statistics update to all connected clients.
func BroadcastTraffic(traffic any) { func BroadcastTraffic(traffic any) {
hub := GetHub() if hub := GetHub(); hub != nil {
if hub != nil {
hub.Broadcast(MessageTypeTraffic, traffic) hub.Broadcast(MessageTypeTraffic, traffic)
} }
} }
// BroadcastInbounds broadcasts inbounds list update to all connected clients // BroadcastClientStats broadcasts absolute per-client traffic counters for the
// clients that had activity in the latest collection window. Use this instead
// of re-broadcasting the full inbound list — it scales to 10k+ clients because
// the payload only includes active rows (typically a fraction of total).
func BroadcastClientStats(stats any) {
if hub := GetHub(); hub != nil {
hub.Broadcast(MessageTypeClientStats, stats)
}
}
// BroadcastInbounds broadcasts inbounds list update to all connected clients.
func BroadcastInbounds(inbounds any) { func BroadcastInbounds(inbounds any) {
hub := GetHub() if hub := GetHub(); hub != nil {
if hub != nil {
hub.Broadcast(MessageTypeInbounds, inbounds) hub.Broadcast(MessageTypeInbounds, inbounds)
} }
} }
// BroadcastOutbounds broadcasts outbounds list update to all connected clients // BroadcastOutbounds broadcasts outbounds list update to all connected clients.
func BroadcastOutbounds(outbounds any) { func BroadcastOutbounds(outbounds any) {
hub := GetHub() if hub := GetHub(); hub != nil {
if hub != nil {
hub.Broadcast(MessageTypeOutbounds, outbounds) hub.Broadcast(MessageTypeOutbounds, outbounds)
} }
} }
// BroadcastNotification broadcasts a system notification to all connected clients // BroadcastNotification broadcasts a system notification to all connected clients.
func BroadcastNotification(title, message, level string) { func BroadcastNotification(title, message, level string) {
hub := GetHub() hub := GetHub()
if hub != nil { if hub == nil {
notification := map[string]string{ return
"title": title,
"message": message,
"level": level, // info, warning, error, success
}
hub.Broadcast(MessageTypeNotification, notification)
} }
hub.Broadcast(MessageTypeNotification, map[string]string{
"title": title,
"message": message,
"level": level,
})
} }
// BroadcastXrayState broadcasts Xray state change to all connected clients // BroadcastXrayState broadcasts Xray state change to all connected clients.
func BroadcastXrayState(state string, errorMsg string) { func BroadcastXrayState(state string, errorMsg string) {
hub := GetHub() hub := GetHub()
if hub != nil { if hub == nil {
stateUpdate := map[string]string{ return
"state": state,
"errorMsg": errorMsg,
}
hub.Broadcast(MessageTypeXrayState, stateUpdate)
} }
hub.Broadcast(MessageTypeXrayState, map[string]string{
"state": state,
"errorMsg": errorMsg,
})
} }
// BroadcastInvalidate sends a lightweight invalidate signal for the given data type, // BroadcastInvalidate sends a lightweight signal telling clients to re-fetch
// telling connected frontends to re-fetch data via REST API. // the named data type via REST. Use this when the caller already knows the
// Use this instead of BroadcastInbounds/BroadcastOutbounds when you know the payload // payload is too large to push directly (e.g., 10k+ clients) to skip the
// will be too large, to avoid wasting resources on serialization. // JSON-marshal cost on the hot path.
func BroadcastInvalidate(dataType MessageType) { func BroadcastInvalidate(dataType MessageType) {
hub := GetHub() if hub := GetHub(); hub != nil {
if hub != nil {
hub.broadcastInvalidate(dataType) hub.broadcastInvalidate(dataType)
} }
} }