3x-ui/web/job/xray_traffic_job.go
MHSanaei 3af2da0142
fix(online): scope online status per node instead of a global union
The inbounds page and Nodes page checked each client's email against a
single deduped union of every node's online clients, so a client connected
to one node showed as online on every inbound across every node. The local
online set was also derived from the email-keyed client_traffics.last_online
column, which remote-node syncs bump too, leaking remote-only clients onto
local inbounds.

Track online clients per node: the local panel's own xray clients under key
0 (derived from live traffic-poll deltas via RefreshLocalOnline, kept in
memory and independent of the shared last_online column) and each remote
node under its id. Add GetOnlineClientsByNode plus a /clients/onlinesByNode
endpoint and onlineByNode WS field; node.go and the inbounds rollup now scope
online by node. The flat GetOnlineClients union is kept for client-centric and
total-count views (Clients page, dashboard, telegram).

Closes #4809
2026-06-02 18:33:21 +02:00

152 lines
5.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package job
import (
"encoding/json"
"github.com/mhsanaei/3x-ui/v3/logger"
"github.com/mhsanaei/3x-ui/v3/web/service"
"github.com/mhsanaei/3x-ui/v3/web/websocket"
"github.com/mhsanaei/3x-ui/v3/xray"
"github.com/valyala/fasthttp"
)
// XrayTrafficJob collects and processes traffic statistics from Xray, updating the database and optionally informing external APIs.
type XrayTrafficJob struct {
settingService service.SettingService
xrayService service.XrayService
inboundService service.InboundService
outboundService service.OutboundService
}
// NewXrayTrafficJob creates a new traffic collection job instance.
func NewXrayTrafficJob() *XrayTrafficJob {
return new(XrayTrafficJob)
}
// Run collects traffic statistics from Xray, updates the database, and pushes
// real-time updates over WebSocket using compact delta payloads — no REST
// fallback, scales to 10k20k+ clients per inbound.
func (j *XrayTrafficJob) Run() {
if !j.xrayService.IsXrayRunning() {
return
}
traffics, clientTraffics, err := j.xrayService.GetXrayTraffic()
if err != nil {
return
}
needRestart0, clientsDisabled, err := j.inboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add inbound traffic failed:", err)
}
err, needRestart1 := j.outboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add outbound traffic failed:", err)
}
if clientsDisabled {
restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable()
if settingErr != nil {
logger.Warning("get RestartXrayOnClientDisable failed:", settingErr)
}
if restartOnDisable {
if err := j.xrayService.RestartXray(true); err != nil {
logger.Warning("restart xray after disabling clients failed:", err)
j.xrayService.SetToNeedRestart()
}
}
websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
}
if ExternalTrafficInformEnable, err := j.settingService.GetExternalTrafficInformEnable(); ExternalTrafficInformEnable {
j.informTrafficToExternalAPI(traffics, clientTraffics)
} else if err != nil {
logger.Warning("get ExternalTrafficInformEnable failed:", err)
}
if needRestart0 || needRestart1 {
j.xrayService.SetToNeedRestart()
}
lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
if err != nil {
logger.Warning("get clients last online failed:", err)
}
if lastOnlineMap == nil {
lastOnlineMap = make(map[string]int64)
}
// Derive the local online set from this poll's per-email deltas rather
// than the shared last_online column, which remote-node syncs also bump
// and would otherwise make a client active only on a remote node appear
// online on local inbounds.
activeEmails := make([]string, 0, len(clientTraffics))
for _, ct := range clientTraffics {
if ct != nil && ct.Up+ct.Down > 0 {
activeEmails = append(activeEmails, ct.Email)
}
}
j.inboundService.RefreshLocalOnlineClients(activeEmails)
if !websocket.HasClients() {
return
}
onlineClients := j.inboundService.GetOnlineClients()
if onlineClients == nil {
onlineClients = []string{}
}
websocket.BroadcastTraffic(map[string]any{
"traffics": traffics,
"clientTraffics": clientTraffics,
"onlineClients": onlineClients,
"onlineByNode": j.inboundService.GetOnlineClientsByNode(),
"lastOnlineMap": lastOnlineMap,
})
clientStatsPayload := map[string]any{}
if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
logger.Warning("get all client traffics for websocket failed:", err)
} else if len(stats) > 0 {
clientStatsPayload["clients"] = stats
}
if inboundSummary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
logger.Warning("get inbounds traffic summary for websocket failed:", err)
} else if len(inboundSummary) > 0 {
clientStatsPayload["inbounds"] = inboundSummary
}
if len(clientStatsPayload) > 0 {
websocket.BroadcastClientStats(clientStatsPayload)
}
if updatedOutbounds, err := j.outboundService.GetOutboundsTraffic(); err == nil && updatedOutbounds != nil {
websocket.BroadcastOutbounds(updatedOutbounds)
} else if err != nil {
logger.Warning("get all outbounds for websocket failed:", err)
}
}
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {
informURL, err := j.settingService.GetExternalTrafficInformURI()
if err != nil {
logger.Warning("get ExternalTrafficInformURI failed:", err)
return
}
informURL, err = service.SanitizePublicHTTPURL(informURL, false)
if err != nil {
logger.Warning("ExternalTrafficInformURI blocked:", err)
return
}
requestBody, err := json.Marshal(map[string]any{"clientTraffics": clientTraffics, "inboundTraffics": inboundTraffics})
if err != nil {
logger.Warning("parse client/inbound traffic failed:", err)
return
}
request := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(request)
request.Header.SetMethod("POST")
request.Header.SetContentType("application/json; charset=UTF-8")
request.SetBody([]byte(requestBody))
request.SetRequestURI(informURL)
response := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(response)
if err := fasthttp.Do(request, response); err != nil {
logger.Warning("POST ExternalTrafficInformURI failed:", err)
}
}