mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-06-07 05:34:17 +00:00
The traffic-pending.json file could contain a stale client traffic delta with inboundId=0 (created before the InboundId resolution fix). When flushToDatabase tried to INSERT this into client_traffics, it violated the foreign key constraint fk_inbounds_client_stats, causing the entire transaction to roll back and blocking ALL traffic from being written to MariaDB. - Skip deltas with InboundID==0 in flushToDatabase with a warning log - Share a single TrafficPendingStore between XrayTrafficJob and the flush loop to eliminate a race condition from dual file instances - Add test for zero InboundID skip behavior
134 lines
4.5 KiB
Go
134 lines
4.5 KiB
Go
package job
|
|
|
|
import (
|
|
"encoding/json"
|
|
|
|
"github.com/mhsanaei/3x-ui/v2/logger"
|
|
"github.com/mhsanaei/3x-ui/v2/web/service"
|
|
"github.com/mhsanaei/3x-ui/v2/web/websocket"
|
|
"github.com/mhsanaei/3x-ui/v2/xray"
|
|
|
|
"github.com/valyala/fasthttp"
|
|
)
|
|
|
|
// XrayTrafficJob collects and processes traffic statistics from Xray, updating the database and optionally informing external APIs.
|
|
type XrayTrafficJob struct {
|
|
settingService service.SettingService
|
|
xrayService service.XrayService
|
|
inboundService service.InboundService
|
|
outboundService service.OutboundService
|
|
trafficFlushSvc *service.TrafficFlushService
|
|
}
|
|
|
|
// NewXrayTrafficJob creates a new traffic collection job instance.
|
|
func NewXrayTrafficJob(store *service.TrafficPendingStore) *XrayTrafficJob {
|
|
return &XrayTrafficJob{
|
|
trafficFlushSvc: service.NewTrafficFlushService(store),
|
|
}
|
|
}
|
|
|
|
// Run collects traffic statistics from Xray and updates the database, triggering restart if needed.
|
|
func (j *XrayTrafficJob) Run() {
|
|
if !j.xrayService.IsXrayRunning() {
|
|
return
|
|
}
|
|
traffics, clientTraffics, err := j.xrayService.GetXrayTraffic()
|
|
if err != nil {
|
|
return
|
|
}
|
|
needRestart0 := false
|
|
if service.IsSharedModeEnabled() {
|
|
if err := j.trafficFlushSvc.Collect(traffics, clientTraffics); err != nil {
|
|
logger.Warning("collect shared traffic failed:", err)
|
|
}
|
|
// In shared mode, addClientTraffic is bypassed so p.SetOnlineClients
|
|
// is never called. Compute and set online clients here instead.
|
|
online := make([]string, 0, len(clientTraffics))
|
|
for _, ct := range clientTraffics {
|
|
if ct != nil && ct.Up+ct.Down > 0 {
|
|
online = append(online, ct.Email)
|
|
}
|
|
}
|
|
j.inboundService.SetOnlineClients(online)
|
|
} else {
|
|
err, needRestart0 = j.inboundService.AddTraffic(traffics, clientTraffics)
|
|
if err != nil {
|
|
logger.Warning("add inbound traffic failed:", err)
|
|
}
|
|
}
|
|
err, needRestart1 := j.outboundService.AddTraffic(traffics, clientTraffics)
|
|
if err != nil {
|
|
logger.Warning("add outbound traffic failed:", err)
|
|
}
|
|
if ExternalTrafficInformEnable, err := j.settingService.GetExternalTrafficInformEnable(); ExternalTrafficInformEnable {
|
|
j.informTrafficToExternalAPI(traffics, clientTraffics)
|
|
} else if err != nil {
|
|
logger.Warning("get ExternalTrafficInformEnable failed:", err)
|
|
}
|
|
if needRestart0 || needRestart1 {
|
|
j.xrayService.SetToNeedRestart()
|
|
}
|
|
|
|
// Get online clients and last online map for real-time status updates
|
|
onlineClients := j.inboundService.GetOnlineClients()
|
|
lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
|
|
if err != nil {
|
|
logger.Warning("get clients last online failed:", err)
|
|
lastOnlineMap = make(map[string]int64)
|
|
}
|
|
|
|
// Fetch updated inbounds from database with accumulated traffic values
|
|
// This ensures frontend receives the actual total traffic, not just delta values
|
|
updatedInbounds, err := j.inboundService.GetAllInbounds()
|
|
if err != nil {
|
|
logger.Warning("get all inbounds for websocket failed:", err)
|
|
}
|
|
|
|
updatedOutbounds, err := j.outboundService.GetOutboundsTraffic()
|
|
if err != nil {
|
|
logger.Warning("get all outbounds for websocket failed:", err)
|
|
}
|
|
|
|
// Broadcast traffic update via WebSocket with accumulated values from database
|
|
trafficUpdate := map[string]any{
|
|
"traffics": traffics,
|
|
"clientTraffics": clientTraffics,
|
|
"onlineClients": onlineClients,
|
|
"lastOnlineMap": lastOnlineMap,
|
|
}
|
|
websocket.BroadcastTraffic(trafficUpdate)
|
|
|
|
// Broadcast full inbounds update for real-time UI refresh
|
|
if updatedInbounds != nil {
|
|
websocket.BroadcastInbounds(updatedInbounds)
|
|
}
|
|
|
|
if updatedOutbounds != nil {
|
|
websocket.BroadcastOutbounds(updatedOutbounds)
|
|
}
|
|
|
|
}
|
|
|
|
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {
|
|
informURL, err := j.settingService.GetExternalTrafficInformURI()
|
|
if err != nil {
|
|
logger.Warning("get ExternalTrafficInformURI failed:", err)
|
|
return
|
|
}
|
|
requestBody, err := json.Marshal(map[string]any{"clientTraffics": clientTraffics, "inboundTraffics": inboundTraffics})
|
|
if err != nil {
|
|
logger.Warning("parse client/inbound traffic failed:", err)
|
|
return
|
|
}
|
|
request := fasthttp.AcquireRequest()
|
|
defer fasthttp.ReleaseRequest(request)
|
|
request.Header.SetMethod("POST")
|
|
request.Header.SetContentType("application/json; charset=UTF-8")
|
|
request.SetBody([]byte(requestBody))
|
|
request.SetRequestURI(informURL)
|
|
response := fasthttp.AcquireResponse()
|
|
defer fasthttp.ReleaseResponse(response)
|
|
if err := fasthttp.Do(request, response); err != nil {
|
|
logger.Warning("POST ExternalTrafficInformURI failed:", err)
|
|
}
|
|
}
|