3x-ui/web/job/xray_traffic_job.go
MHSanaei 8e7d215b4a
feat(nodes): traffic-writer queue, full-mirror sync, WS event fixes
- Traffic-writer single-consumer queue (web/service/traffic_writer.go)
  serialises every DB write that touches up/down/all_time/last_online
  (AddTraffic, SetRemoteTraffic, Reset*, UpdateClientTrafficByEmail) so
  overlapping goroutines can no longer clobber each other's column-scoped
  Updates with a stale tx.Save.

- DB pool: WAL + busy_timeout=10s + synchronous=NORMAL + _txlock=
  immediate, MaxOpenConns=8 / MaxIdleConns=4. The immediate-tx PRAGMA
  fixes residual "database is locked [0ms]" cases where deferred-tx
  writer-upgrade conflicts bypass busy_timeout.

- SetRemoteTraffic full-mirrors node-authoritative state into central:
  settings JSON, remark, listen, port, total, expiry, all_time, enable,
  plus per-client total/expiry/reset/all_time. Inbounds and
  client_traffics rows present on node but missing from central are
  created; rows missing from snap are deleted (with cascading
  client_traffics removal).

- NodeTrafficSyncJob detects structural changes from the mirror and
  broadcasts invalidate(inbounds) so open central UIs re-fetch via REST
  on node-side add/del/edit without manual refresh.

- XrayTrafficJob broadcasts invalidate(inbounds) when auto-disable flips
  client_traffics.enable so the per-client toggle reflects depletion
  without manual refresh.

- Frontend: inbounds page now subscribes to the BroadcastInbounds 'inbounds'
  WS event (full-list pushes from add/del/update controllers were silently
  dropped). Fixes invalidate payload field (dataType -> type). Restart-
  panel modal switched from Promise-wrap to onOk-only so Cancel actually
  cancels.

- Node files trimmed of stale prose-comments; cron cadence dropped
  10s -> 5s to match the inbounds page UX.

- README badges and Go module path bumped v2 -> v3 to match module rename.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 16:25:23 +02:00

171 lines
6.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package job
import (
"encoding/json"
"github.com/mhsanaei/3x-ui/v3/logger"
"github.com/mhsanaei/3x-ui/v3/web/service"
"github.com/mhsanaei/3x-ui/v3/web/websocket"
"github.com/mhsanaei/3x-ui/v3/xray"
"github.com/valyala/fasthttp"
)
// XrayTrafficJob collects and processes traffic statistics from Xray, updating the database and optionally informing external APIs.
type XrayTrafficJob struct {
settingService service.SettingService
xrayService service.XrayService
inboundService service.InboundService
outboundService service.OutboundService
}
// NewXrayTrafficJob creates a new traffic collection job instance.
func NewXrayTrafficJob() *XrayTrafficJob {
return new(XrayTrafficJob)
}
// Run collects traffic statistics from Xray, updates the database, and pushes
// real-time updates over WebSocket using compact delta payloads — no REST
// fallback, scales to 10k20k+ clients per inbound.
func (j *XrayTrafficJob) Run() {
if !j.xrayService.IsXrayRunning() {
return
}
traffics, clientTraffics, err := j.xrayService.GetXrayTraffic()
if err != nil {
return
}
needRestart0, clientsDisabled, err := j.inboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add inbound traffic failed:", err)
}
err, needRestart1 := j.outboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add outbound traffic failed:", err)
}
if clientsDisabled {
restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable()
if settingErr != nil {
logger.Warning("get RestartXrayOnClientDisable failed:", settingErr)
}
if restartOnDisable {
if err := j.xrayService.RestartXray(true); err != nil {
logger.Warning("restart xray after disabling clients failed:", err)
j.xrayService.SetToNeedRestart()
}
}
websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
}
if ExternalTrafficInformEnable, err := j.settingService.GetExternalTrafficInformEnable(); ExternalTrafficInformEnable {
j.informTrafficToExternalAPI(traffics, clientTraffics)
} else if err != nil {
logger.Warning("get ExternalTrafficInformEnable failed:", err)
}
if needRestart0 || needRestart1 {
j.xrayService.SetToNeedRestart()
}
// If no frontend client is connected, skip all WebSocket broadcasting
// routines — including the active-client DB query and JSON marshaling.
if !websocket.HasClients() {
return
}
// Online presence + traffic deltas — small payload, always fits in WS.
// Force non-nil slice/map so JSON marshalling produces [] / {} instead of
// `null` when everyone is offline. The frontend's traffic handler treats
// a missing/null onlineClients field as "no update", so without this the
// "everyone went offline" transition was silently dropped — stale online
// users lingered in the list and the online filter kept showing them.
onlineClients := j.inboundService.GetOnlineClients()
if onlineClients == nil {
onlineClients = []string{}
}
lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
if err != nil {
logger.Warning("get clients last online failed:", err)
}
if lastOnlineMap == nil {
lastOnlineMap = make(map[string]int64)
}
websocket.BroadcastTraffic(map[string]any{
"traffics": traffics,
"clientTraffics": clientTraffics,
"onlineClients": onlineClients,
"lastOnlineMap": lastOnlineMap,
})
// Compact delta payload: per-client absolute counters for clients active
// this cycle, plus inbound-level absolute totals. Frontend applies both
// in-place — typical payload ~1050KB even for 10k+ client deployments.
// Replaces the old full-inbound-list broadcast that hit WS size limits
// (510MB) and forced the frontend into a REST refetch.
clientStatsPayload := map[string]any{}
if activeEmails := activeEmails(clientTraffics); len(activeEmails) > 0 {
if stats, err := j.inboundService.GetActiveClientTraffics(activeEmails); err != nil {
logger.Warning("get active client traffics for websocket failed:", err)
} else if len(stats) > 0 {
clientStatsPayload["clients"] = stats
}
}
if inboundSummary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
logger.Warning("get inbounds traffic summary for websocket failed:", err)
} else if len(inboundSummary) > 0 {
clientStatsPayload["inbounds"] = inboundSummary
}
if len(clientStatsPayload) > 0 {
websocket.BroadcastClientStats(clientStatsPayload)
}
// Outbounds list is small (one row per outbound, no per-client expansion)
// so the full snapshot still fits comfortably in WS.
if updatedOutbounds, err := j.outboundService.GetOutboundsTraffic(); err == nil && updatedOutbounds != nil {
websocket.BroadcastOutbounds(updatedOutbounds)
} else if err != nil {
logger.Warning("get all outbounds for websocket failed:", err)
}
}
// activeEmails returns the set of client emails that had non-zero traffic in
// the current collection window. Idle clients are skipped — no need to push
// their (unchanged) counters to the frontend.
func activeEmails(clientTraffics []*xray.ClientTraffic) []string {
if len(clientTraffics) == 0 {
return nil
}
emails := make([]string, 0, len(clientTraffics))
for _, ct := range clientTraffics {
if ct == nil || ct.Email == "" {
continue
}
if ct.Up == 0 && ct.Down == 0 {
continue
}
emails = append(emails, ct.Email)
}
return emails
}
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {
informURL, err := j.settingService.GetExternalTrafficInformURI()
if err != nil {
logger.Warning("get ExternalTrafficInformURI failed:", err)
return
}
requestBody, err := json.Marshal(map[string]any{"clientTraffics": clientTraffics, "inboundTraffics": inboundTraffics})
if err != nil {
logger.Warning("parse client/inbound traffic failed:", err)
return
}
request := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(request)
request.Header.SetMethod("POST")
request.Header.SetContentType("application/json; charset=UTF-8")
request.SetBody([]byte(requestBody))
request.SetRequestURI(informURL)
response := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(response)
if err := fasthttp.Do(request, response); err != nil {
logger.Warning("POST ExternalTrafficInformURI failed:", err)
}
}