3x-ui/web/job/xray_traffic_job.go
root 99e2e6c2e2 fix: resolve client traffic InboundId from DB in shared mode, set online clients
In shared mode the Xray API returns InboundId=0 for client traffic.
Collect() now looks up the real InboundId from the client_traffics table
by email, and skips unknown emails with a warning. Also computes and
sets online clients in XrayTrafficJob since addClientTraffic is bypassed.
2026-04-24 02:29:02 +08:00

137 lines
4.5 KiB
Go

package job
import (
"encoding/json"
"github.com/mhsanaei/3x-ui/v2/config"
"github.com/mhsanaei/3x-ui/v2/logger"
"github.com/mhsanaei/3x-ui/v2/web/service"
"github.com/mhsanaei/3x-ui/v2/web/websocket"
"github.com/mhsanaei/3x-ui/v2/xray"
"github.com/valyala/fasthttp"
)
// XrayTrafficJob collects and processes traffic statistics from Xray, updating the database and optionally informing external APIs.
type XrayTrafficJob struct {
settingService service.SettingService
xrayService service.XrayService
inboundService service.InboundService
outboundService service.OutboundService
trafficFlushSvc *service.TrafficFlushService
}
// NewXrayTrafficJob creates a new traffic collection job instance.
func NewXrayTrafficJob() *XrayTrafficJob {
return &XrayTrafficJob{
trafficFlushSvc: service.NewTrafficFlushService(
service.NewTrafficPendingStore(config.GetTrafficPendingPath()),
),
}
}
// Run collects traffic statistics from Xray and updates the database, triggering restart if needed.
func (j *XrayTrafficJob) Run() {
if !j.xrayService.IsXrayRunning() {
return
}
traffics, clientTraffics, err := j.xrayService.GetXrayTraffic()
if err != nil {
return
}
needRestart0 := false
if service.IsSharedModeEnabled() {
if err := j.trafficFlushSvc.Collect(traffics, clientTraffics); err != nil {
logger.Warning("collect shared traffic failed:", err)
}
// In shared mode, addClientTraffic is bypassed so p.SetOnlineClients
// is never called. Compute and set online clients here instead.
online := make([]string, 0, len(clientTraffics))
for _, ct := range clientTraffics {
if ct != nil && ct.Up+ct.Down > 0 {
online = append(online, ct.Email)
}
}
j.inboundService.SetOnlineClients(online)
} else {
err, needRestart0 = j.inboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add inbound traffic failed:", err)
}
}
err, needRestart1 := j.outboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add outbound traffic failed:", err)
}
if ExternalTrafficInformEnable, err := j.settingService.GetExternalTrafficInformEnable(); ExternalTrafficInformEnable {
j.informTrafficToExternalAPI(traffics, clientTraffics)
} else if err != nil {
logger.Warning("get ExternalTrafficInformEnable failed:", err)
}
if needRestart0 || needRestart1 {
j.xrayService.SetToNeedRestart()
}
// Get online clients and last online map for real-time status updates
onlineClients := j.inboundService.GetOnlineClients()
lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
if err != nil {
logger.Warning("get clients last online failed:", err)
lastOnlineMap = make(map[string]int64)
}
// Fetch updated inbounds from database with accumulated traffic values
// This ensures frontend receives the actual total traffic, not just delta values
updatedInbounds, err := j.inboundService.GetAllInbounds()
if err != nil {
logger.Warning("get all inbounds for websocket failed:", err)
}
updatedOutbounds, err := j.outboundService.GetOutboundsTraffic()
if err != nil {
logger.Warning("get all outbounds for websocket failed:", err)
}
// Broadcast traffic update via WebSocket with accumulated values from database
trafficUpdate := map[string]any{
"traffics": traffics,
"clientTraffics": clientTraffics,
"onlineClients": onlineClients,
"lastOnlineMap": lastOnlineMap,
}
websocket.BroadcastTraffic(trafficUpdate)
// Broadcast full inbounds update for real-time UI refresh
if updatedInbounds != nil {
websocket.BroadcastInbounds(updatedInbounds)
}
if updatedOutbounds != nil {
websocket.BroadcastOutbounds(updatedOutbounds)
}
}
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {
informURL, err := j.settingService.GetExternalTrafficInformURI()
if err != nil {
logger.Warning("get ExternalTrafficInformURI failed:", err)
return
}
requestBody, err := json.Marshal(map[string]any{"clientTraffics": clientTraffics, "inboundTraffics": inboundTraffics})
if err != nil {
logger.Warning("parse client/inbound traffic failed:", err)
return
}
request := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(request)
request.Header.SetMethod("POST")
request.Header.SetContentType("application/json; charset=UTF-8")
request.SetBody([]byte(requestBody))
request.SetRequestURI(informURL)
response := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(response)
if err := fasthttp.Do(request, response); err != nil {
logger.Warning("POST ExternalTrafficInformURI failed:", err)
}
}