3x-ui/web/job/xray_traffic_job.go
MHSanaei ef8882a5c0
fix(online): scope per-inbound online to inbounds that carried traffic
Multi-inbound clients showed online on every inbound they were attached to. Xray's user-level traffic stat aggregates across all inbounds a client belongs to, so the email signal alone can't say which inbound was used.

Pair it with the inbound-level traffic signal under the same 20s grace and gate the per-inbound rollup on it: a client only shows online on inbounds that actually moved bytes this window. Remote nodes report no per-inbound activity and stay ungated (no regression). Adds GetActiveInboundsByNode, the activeInbounds WS field and POST /panel/api/clients/activeInbounds.

Fixes #4859
2026-06-03 16:19:00 +02:00

164 lines
5.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package job
import (
"encoding/json"
"github.com/mhsanaei/3x-ui/v3/logger"
"github.com/mhsanaei/3x-ui/v3/web/service"
"github.com/mhsanaei/3x-ui/v3/web/websocket"
"github.com/mhsanaei/3x-ui/v3/xray"
"github.com/valyala/fasthttp"
)
// XrayTrafficJob collects and processes traffic statistics from Xray, updating the database and optionally informing external APIs.
type XrayTrafficJob struct {
settingService service.SettingService
xrayService service.XrayService
inboundService service.InboundService
outboundService service.OutboundService
}
// NewXrayTrafficJob creates a new traffic collection job instance.
func NewXrayTrafficJob() *XrayTrafficJob {
return new(XrayTrafficJob)
}
// Run collects traffic statistics from Xray, updates the database, and pushes
// real-time updates over WebSocket using compact delta payloads — no REST
// fallback, scales to 10k20k+ clients per inbound.
func (j *XrayTrafficJob) Run() {
if !j.xrayService.IsXrayRunning() {
return
}
traffics, clientTraffics, err := j.xrayService.GetXrayTraffic()
if err != nil {
return
}
needRestart0, clientsDisabled, err := j.inboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add inbound traffic failed:", err)
}
err, needRestart1 := j.outboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add outbound traffic failed:", err)
}
if clientsDisabled {
restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable()
if settingErr != nil {
logger.Warning("get RestartXrayOnClientDisable failed:", settingErr)
}
if restartOnDisable {
if err := j.xrayService.RestartXray(true); err != nil {
logger.Warning("restart xray after disabling clients failed:", err)
j.xrayService.SetToNeedRestart()
}
}
websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
}
if ExternalTrafficInformEnable, err := j.settingService.GetExternalTrafficInformEnable(); ExternalTrafficInformEnable {
j.informTrafficToExternalAPI(traffics, clientTraffics)
} else if err != nil {
logger.Warning("get ExternalTrafficInformEnable failed:", err)
}
if needRestart0 || needRestart1 {
j.xrayService.SetToNeedRestart()
}
lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
if err != nil {
logger.Warning("get clients last online failed:", err)
}
if lastOnlineMap == nil {
lastOnlineMap = make(map[string]int64)
}
// Derive the local online set from this poll's per-email deltas rather
// than the shared last_online column, which remote-node syncs also bump
// and would otherwise make a client active only on a remote node appear
// online on local inbounds.
activeEmails := make([]string, 0, len(clientTraffics))
for _, ct := range clientTraffics {
if ct != nil && ct.Up+ct.Down > 0 {
activeEmails = append(activeEmails, ct.Email)
}
}
// Pair the email signal with the inbound tags that moved bytes this poll.
// Xray's user>>>email counter aggregates across every inbound a client is
// attached to, so an online email alone can't say which inbound it used —
// gating the per-inbound view on these tags keeps a multi-inbound client
// off inbounds that saw no traffic. See issue #4859.
activeInboundTags := make([]string, 0, len(traffics))
for _, tr := range traffics {
if tr != nil && tr.IsInbound && tr.Up+tr.Down > 0 {
activeInboundTags = append(activeInboundTags, tr.Tag)
}
}
j.inboundService.RefreshLocalOnlineClients(activeEmails, activeInboundTags)
if !websocket.HasClients() {
return
}
onlineClients := j.inboundService.GetOnlineClients()
if onlineClients == nil {
onlineClients = []string{}
}
websocket.BroadcastTraffic(map[string]any{
"traffics": traffics,
"clientTraffics": clientTraffics,
"onlineClients": onlineClients,
"onlineByNode": j.inboundService.GetOnlineClientsByNode(),
"activeInbounds": j.inboundService.GetActiveInboundsByNode(),
"lastOnlineMap": lastOnlineMap,
})
clientStatsPayload := map[string]any{}
if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
logger.Warning("get all client traffics for websocket failed:", err)
} else if len(stats) > 0 {
clientStatsPayload["clients"] = stats
}
if inboundSummary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
logger.Warning("get inbounds traffic summary for websocket failed:", err)
} else if len(inboundSummary) > 0 {
clientStatsPayload["inbounds"] = inboundSummary
}
if len(clientStatsPayload) > 0 {
websocket.BroadcastClientStats(clientStatsPayload)
}
if updatedOutbounds, err := j.outboundService.GetOutboundsTraffic(); err == nil && updatedOutbounds != nil {
websocket.BroadcastOutbounds(updatedOutbounds)
} else if err != nil {
logger.Warning("get all outbounds for websocket failed:", err)
}
}
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {
informURL, err := j.settingService.GetExternalTrafficInformURI()
if err != nil {
logger.Warning("get ExternalTrafficInformURI failed:", err)
return
}
informURL, err = service.SanitizePublicHTTPURL(informURL, false)
if err != nil {
logger.Warning("ExternalTrafficInformURI blocked:", err)
return
}
requestBody, err := json.Marshal(map[string]any{"clientTraffics": clientTraffics, "inboundTraffics": inboundTraffics})
if err != nil {
logger.Warning("parse client/inbound traffic failed:", err)
return
}
request := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(request)
request.Header.SetMethod("POST")
request.Header.SetContentType("application/json; charset=UTF-8")
request.SetBody([]byte(requestBody))
request.SetRequestURI(informURL)
response := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(response)
if err := fasthttp.Do(request, response); err != nil {
logger.Warning("POST ExternalTrafficInformURI failed:", err)
}
}