3x-ui/web/job/xray_traffic_job.go
MHSanaei 7cd26a0583
Some checks are pending
Release 3X-UI / build (386) (push) Waiting to run
Release 3X-UI / build (amd64) (push) Waiting to run
Release 3X-UI / build (arm64) (push) Waiting to run
Release 3X-UI / build (armv5) (push) Waiting to run
Release 3X-UI / build (armv6) (push) Waiting to run
Release 3X-UI / build (armv7) (push) Waiting to run
Release 3X-UI / build (s390x) (push) Waiting to run
Release 3X-UI / Build for Windows (push) Waiting to run
v3
2026-05-10 02:13:42 +02:00

170 lines
6.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package job
import (
"encoding/json"
"github.com/mhsanaei/3x-ui/v3/logger"
"github.com/mhsanaei/3x-ui/v3/web/service"
"github.com/mhsanaei/3x-ui/v3/web/websocket"
"github.com/mhsanaei/3x-ui/v3/xray"
"github.com/valyala/fasthttp"
)
// XrayTrafficJob collects and processes traffic statistics from Xray, updating the database and optionally informing external APIs.
type XrayTrafficJob struct {
settingService service.SettingService
xrayService service.XrayService
inboundService service.InboundService
outboundService service.OutboundService
}
// NewXrayTrafficJob creates a new traffic collection job instance.
func NewXrayTrafficJob() *XrayTrafficJob {
return new(XrayTrafficJob)
}
// Run collects traffic statistics from Xray, updates the database, and pushes
// real-time updates over WebSocket using compact delta payloads — no REST
// fallback, scales to 10k20k+ clients per inbound.
func (j *XrayTrafficJob) Run() {
if !j.xrayService.IsXrayRunning() {
return
}
traffics, clientTraffics, err := j.xrayService.GetXrayTraffic()
if err != nil {
return
}
needRestart0, clientsDisabled, err := j.inboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add inbound traffic failed:", err)
}
err, needRestart1 := j.outboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add outbound traffic failed:", err)
}
if clientsDisabled {
restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable()
if settingErr != nil {
logger.Warning("get RestartXrayOnClientDisable failed:", settingErr)
}
if restartOnDisable {
if err := j.xrayService.RestartXray(true); err != nil {
logger.Warning("restart xray after disabling clients failed:", err)
j.xrayService.SetToNeedRestart()
}
}
}
if ExternalTrafficInformEnable, err := j.settingService.GetExternalTrafficInformEnable(); ExternalTrafficInformEnable {
j.informTrafficToExternalAPI(traffics, clientTraffics)
} else if err != nil {
logger.Warning("get ExternalTrafficInformEnable failed:", err)
}
if needRestart0 || needRestart1 {
j.xrayService.SetToNeedRestart()
}
// If no frontend client is connected, skip all WebSocket broadcasting
// routines — including the active-client DB query and JSON marshaling.
if !websocket.HasClients() {
return
}
// Online presence + traffic deltas — small payload, always fits in WS.
// Force non-nil slice/map so JSON marshalling produces [] / {} instead of
// `null` when everyone is offline. The frontend's traffic handler treats
// a missing/null onlineClients field as "no update", so without this the
// "everyone went offline" transition was silently dropped — stale online
// users lingered in the list and the online filter kept showing them.
onlineClients := j.inboundService.GetOnlineClients()
if onlineClients == nil {
onlineClients = []string{}
}
lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
if err != nil {
logger.Warning("get clients last online failed:", err)
}
if lastOnlineMap == nil {
lastOnlineMap = make(map[string]int64)
}
websocket.BroadcastTraffic(map[string]any{
"traffics": traffics,
"clientTraffics": clientTraffics,
"onlineClients": onlineClients,
"lastOnlineMap": lastOnlineMap,
})
// Compact delta payload: per-client absolute counters for clients active
// this cycle, plus inbound-level absolute totals. Frontend applies both
// in-place — typical payload ~1050KB even for 10k+ client deployments.
// Replaces the old full-inbound-list broadcast that hit WS size limits
// (510MB) and forced the frontend into a REST refetch.
clientStatsPayload := map[string]any{}
if activeEmails := activeEmails(clientTraffics); len(activeEmails) > 0 {
if stats, err := j.inboundService.GetActiveClientTraffics(activeEmails); err != nil {
logger.Warning("get active client traffics for websocket failed:", err)
} else if len(stats) > 0 {
clientStatsPayload["clients"] = stats
}
}
if inboundSummary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
logger.Warning("get inbounds traffic summary for websocket failed:", err)
} else if len(inboundSummary) > 0 {
clientStatsPayload["inbounds"] = inboundSummary
}
if len(clientStatsPayload) > 0 {
websocket.BroadcastClientStats(clientStatsPayload)
}
// Outbounds list is small (one row per outbound, no per-client expansion)
// so the full snapshot still fits comfortably in WS.
if updatedOutbounds, err := j.outboundService.GetOutboundsTraffic(); err == nil && updatedOutbounds != nil {
websocket.BroadcastOutbounds(updatedOutbounds)
} else if err != nil {
logger.Warning("get all outbounds for websocket failed:", err)
}
}
// activeEmails returns the set of client emails that had non-zero traffic in
// the current collection window. Idle clients are skipped — no need to push
// their (unchanged) counters to the frontend.
func activeEmails(clientTraffics []*xray.ClientTraffic) []string {
if len(clientTraffics) == 0 {
return nil
}
emails := make([]string, 0, len(clientTraffics))
for _, ct := range clientTraffics {
if ct == nil || ct.Email == "" {
continue
}
if ct.Up == 0 && ct.Down == 0 {
continue
}
emails = append(emails, ct.Email)
}
return emails
}
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {
informURL, err := j.settingService.GetExternalTrafficInformURI()
if err != nil {
logger.Warning("get ExternalTrafficInformURI failed:", err)
return
}
requestBody, err := json.Marshal(map[string]any{"clientTraffics": clientTraffics, "inboundTraffics": inboundTraffics})
if err != nil {
logger.Warning("parse client/inbound traffic failed:", err)
return
}
request := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(request)
request.Header.SetMethod("POST")
request.Header.SetContentType("application/json; charset=UTF-8")
request.SetBody([]byte(requestBody))
request.SetRequestURI(informURL)
response := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(response)
if err := fasthttp.Do(request, response); err != nil {
logger.Warning("POST ExternalTrafficInformURI failed:", err)
}
}