diff --git a/web/html/inbounds.html b/web/html/inbounds.html
index b945da90..fa6462ee 100644
--- a/web/html/inbounds.html
+++ b/web/html/inbounds.html
@@ -1605,11 +1605,25 @@
}
});
+ // Listen for invalidate signals (sent when payload is too large for WebSocket)
+ // The server sends a lightweight notification and we re-fetch via REST API
+ let invalidateTimer = null;
+ window.wsClient.on('invalidate', (payload) => {
+ if (payload && (payload.type === 'inbounds' || payload.type === 'traffic')) {
+ // Debounce to avoid flooding the REST API with multiple invalidate signals
+ if (invalidateTimer) clearTimeout(invalidateTimer);
+ invalidateTimer = setTimeout(() => {
+ invalidateTimer = null;
+ this.getDBInbounds();
+ }, 1000);
+ }
+ });
+
// Listen for traffic updates
window.wsClient.on('traffic', (payload) => {
// Note: Do NOT update total consumed traffic (stats.up, stats.down) from this event
// because clientTraffics contains delta/incremental values, not total accumulated values.
- // Total traffic is updated via the 'inbounds' event which contains accumulated values from database.
+ // Total traffic is updated via the 'inbounds' WebSocket event (or 'invalidate' fallback for large panels).
// Update online clients list in real-time
if (payload && Array.isArray(payload.onlineClients)) {
@@ -1627,22 +1641,27 @@
this.onlineClients = nextOnlineClients;
if (onlineChanged) {
// Recalculate client counts to update online status
+ // Use $set for Vue 2 reactivity — direct array index assignment is not reactive
this.dbInbounds.forEach(dbInbound => {
const inbound = this.inbounds.find(ib => ib.id === dbInbound.id);
if (inbound && this.clientCount[dbInbound.id]) {
- this.clientCount[dbInbound.id] = this.getClientCounts(dbInbound, inbound);
+ this.$set(this.clientCount, dbInbound.id, this.getClientCounts(dbInbound, inbound));
}
});
+ // Always trigger UI refresh — not just when filter is enabled
if (this.enableFilter) {
this.filterInbounds();
+ } else {
+ this.searchInbounds(this.searchKey);
}
}
}
// Update last online map in real-time
+ // Replace entirely (server sends the full map) to avoid unbounded growth from deleted clients
if (payload && payload.lastOnlineMap && typeof payload.lastOnlineMap === 'object') {
- this.lastOnlineMap = { ...this.lastOnlineMap, ...payload.lastOnlineMap };
+ this.lastOnlineMap = payload.lastOnlineMap;
}
});
diff --git a/web/html/xray.html b/web/html/xray.html
index ebe31f48..d1a12e45 100644
--- a/web/html/xray.html
+++ b/web/html/xray.html
@@ -1075,6 +1075,14 @@
this.$forceUpdate();
}
});
+
+ // Handle invalidate signals (sent when payload is too large for WebSocket,
+ // or when traffic job notifies about data changes)
+ window.wsClient.on('invalidate', (payload) => {
+ if (payload && payload.type === 'outbounds') {
+ this.refreshOutboundTraffic();
+ }
+ });
}
while (true) {
diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go
index 8d2bfbd6..b1a775f0 100644
--- a/web/job/xray_traffic_job.go
+++ b/web/job/xray_traffic_job.go
@@ -50,7 +50,13 @@ func (j *XrayTrafficJob) Run() {
j.xrayService.SetToNeedRestart()
}
- // Get online clients and last online map for real-time status updates
+ // If no frontend client is connected, skip all WebSocket broadcasting routines,
+ // including expensive DB queries for online clients and JSON marshaling.
+ if !websocket.HasClients() {
+ return
+ }
+
+ // Update online clients list and map
onlineClients := j.inboundService.GetOnlineClients()
lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
if err != nil {
@@ -58,8 +64,17 @@ func (j *XrayTrafficJob) Run() {
lastOnlineMap = make(map[string]int64)
}
+ // Broadcast traffic update (deltas and online stats) via WebSocket
+ trafficUpdate := map[string]any{
+ "traffics": traffics,
+ "clientTraffics": clientTraffics,
+ "onlineClients": onlineClients,
+ "lastOnlineMap": lastOnlineMap,
+ }
+ websocket.BroadcastTraffic(trafficUpdate)
+
// Fetch updated inbounds from database with accumulated traffic values
- // This ensures frontend receives the actual total traffic, not just delta values
+ // This ensures frontend receives the actual total traffic for real-time UI refresh.
updatedInbounds, err := j.inboundService.GetAllInbounds()
if err != nil {
logger.Warning("get all inbounds for websocket failed:", err)
@@ -70,16 +85,8 @@ func (j *XrayTrafficJob) Run() {
logger.Warning("get all outbounds for websocket failed:", err)
}
- // Broadcast traffic update via WebSocket with accumulated values from database
- trafficUpdate := map[string]any{
- "traffics": traffics,
- "clientTraffics": clientTraffics,
- "onlineClients": onlineClients,
- "lastOnlineMap": lastOnlineMap,
- }
- websocket.BroadcastTraffic(trafficUpdate)
-
- // Broadcast full inbounds update for real-time UI refresh
+ // The web socket hub will automatically check the payload size.
+ // If < 1MB, it pushes real-time. If > 1MB, it sends a lightweight 'invalidate' signal.
if updatedInbounds != nil {
websocket.BroadcastInbounds(updatedInbounds)
}
@@ -87,7 +94,6 @@ func (j *XrayTrafficJob) Run() {
if updatedOutbounds != nil {
websocket.BroadcastOutbounds(updatedOutbounds)
}
-
}
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {
diff --git a/web/websocket/hub.go b/web/websocket/hub.go
index 8aa5903c..e2c0072c 100644
--- a/web/websocket/hub.go
+++ b/web/websocket/hub.go
@@ -21,6 +21,7 @@ const (
MessageTypeNotification MessageType = "notification" // System notification
MessageTypeXrayState MessageType = "xray_state" // Xray state change
MessageTypeOutbounds MessageType = "outbounds" // Outbounds list update
+ MessageTypeInvalidate MessageType = "invalidate" // Lightweight signal telling frontend to re-fetch data via REST
)
// Message represents a WebSocket message
@@ -259,6 +260,11 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) {
return
}
+ // Skip all work if no clients are connected
+ if h.GetClientCount() == 0 {
+ return
+ }
+
msg := Message{
Type: messageType,
Payload: payload,
@@ -271,10 +277,12 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) {
return
}
- // Limit message size to prevent memory issues
+ // If message exceeds size limit, send a lightweight invalidate notification
+ // instead of dropping it entirely — the frontend will re-fetch via REST API
const maxMessageSize = 1024 * 1024 // 1MB
if len(data) > maxMessageSize {
- logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
+ logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType)
+ h.broadcastInvalidate(messageType)
return
}
@@ -298,6 +306,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) {
return
}
+ // Skip all work if no clients are connected
+ if h.GetClientCount() == 0 {
+ return
+ }
+
msg := Message{
Type: messageType,
Payload: payload,
@@ -310,10 +323,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) {
return
}
- // Limit message size to prevent memory issues
+ // If message exceeds size limit, send a lightweight invalidate notification
const maxMessageSize = 1024 * 1024 // 1MB
if len(data) > maxMessageSize {
- logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
+ logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType)
+ h.broadcastInvalidate(messageType)
return
}
@@ -374,6 +388,31 @@ func (h *Hub) Stop() {
}
}
+// broadcastInvalidate sends a lightweight invalidate message to all clients,
+// telling them to re-fetch the specified data type via REST API.
+// This is used when the full payload exceeds the WebSocket message size limit.
+func (h *Hub) broadcastInvalidate(originalType MessageType) {
+ msg := Message{
+ Type: MessageTypeInvalidate,
+ Payload: map[string]string{"type": string(originalType)},
+ Time: getCurrentTimestamp(),
+ }
+
+ data, err := json.Marshal(msg)
+ if err != nil {
+ logger.Error("Failed to marshal invalidate message:", err)
+ return
+ }
+
+ // Non-blocking send with timeout
+ select {
+ case h.broadcast <- data:
+ case <-time.After(100 * time.Millisecond):
+ logger.Warning("WebSocket broadcast channel is full, dropping invalidate message")
+ case <-h.ctx.Done():
+ }
+}
+
// getCurrentTimestamp returns current Unix timestamp in milliseconds
func getCurrentTimestamp() int64 {
return time.Now().UnixMilli()
diff --git a/web/websocket/notifier.go b/web/websocket/notifier.go
index 74cf61b2..d271aff6 100644
--- a/web/websocket/notifier.go
+++ b/web/websocket/notifier.go
@@ -24,6 +24,16 @@ func GetHub() *Hub {
return wsHub
}
+// HasClients returns true if there are any WebSocket clients connected.
+// Use this to skip expensive work (DB queries, serialization) when no browser is open.
+func HasClients() bool {
+ hub := GetHub()
+ if hub == nil {
+ return false
+ }
+ return hub.GetClientCount() > 0
+}
+
// BroadcastStatus broadcasts server status update to all connected clients
func BroadcastStatus(status any) {
hub := GetHub()
@@ -80,3 +90,15 @@ func BroadcastXrayState(state string, errorMsg string) {
hub.Broadcast(MessageTypeXrayState, stateUpdate)
}
}
+
+// BroadcastInvalidate sends a lightweight invalidate signal for the given data type,
+// telling connected frontends to re-fetch data via REST API.
+// Use this instead of BroadcastInbounds/BroadcastOutbounds when you know the payload
+// will be too large, to avoid wasting resources on serialization.
+func BroadcastInvalidate(dataType MessageType) {
+ hub := GetHub()
+ if hub != nil {
+ hub.broadcastInvalidate(dataType)
+ }
+}
+