3x-ui/web/job/xray_traffic_job.go
lolka1333 fec714a243
fix: enhance WebSocket stability, resolve XHTTP configurations and fix UI loading shifts (#3997)
* feat: implement real-time traffic monitoring and UI updates using a high-performance WebSocket hub and background job system

* feat: add bulk client management support and improve inbound data handling

* Fix bug

* **Fixes & Changes:**
1. **Fixed XPadding Placement Dropdown**:
   - Added the missing `cookie` and `query` options to `xPaddingPlacement` (`stream_xhttp.html`).
   - *Why:* Previously, users wanting `cookie` obfuscation were forced to use the `header` placement string. This caused Xray-core to blindly intercept the entire monolithic HTTP Cookie header, failing internal padding-length validations and causing the inbound to silently drop the connection.
2. **Fixed Uplink Data Placement Validation**:
   - Replaced the unsupported `query` option with `cookie` in `uplinkDataPlacement`.
   - *Why:* Xray-core's `transport_internet.go` explicitly forbids `query` as an uplink placement option. Selecting it from the UI previously sent a payload that would cause Xray-core to instantly throw an `unsupported uplink data placement: query` panic. Adding `cookie` perfectly aligns the UI with Xray-core restrictions.
### Related Issues
- Resolves #3992

* This commit fixes structural payload issues preventing XHTTP from functioning correctly and eliminates WebSocket log spam.
- **[Fix X-Padding UI]** Added missing `cookie` and `query` options to X-Padding Placement. Fixes the issue where using Cookie fallback triggers whole HTTP Cookie header interception and silent drop in Xray-core. (Resolves [#3992](https://github.com/MHSanaei/3x-ui/issues/3992))
- **[Fix Uplink Data Options]** Replaced the invalid `query` option with `cookie` in Uplink Data Placement dropdown to prevent Xray-core backend panic `unsupported uplink data placement: query`.
- **[Fix WebSockets Spam]** Boosted `maxMessageSize` boundary to 100MB and gracefully handled fallback fetch signals via `broadcastInvalidate` to avoid buffer dropping spam. (Resolves [#3984](https://github.com/MHSanaei/3x-ui/issues/3984))

* Fix

* gofmt

* fix(websocket): resolve channel race condition and graceful shutdown deadlock

* Fix: inbounds switch

* Change max quantity from 10000 to 500

* fix
2026-04-19 21:01:00 +02:00

121 lines
4 KiB
Go

package job
import (
"encoding/json"
"github.com/mhsanaei/3x-ui/v2/logger"
"github.com/mhsanaei/3x-ui/v2/web/service"
"github.com/mhsanaei/3x-ui/v2/web/websocket"
"github.com/mhsanaei/3x-ui/v2/xray"
"github.com/valyala/fasthttp"
)
// XrayTrafficJob collects and processes traffic statistics from Xray, updating the database and optionally informing external APIs.
type XrayTrafficJob struct {
settingService service.SettingService
xrayService service.XrayService
inboundService service.InboundService
outboundService service.OutboundService
}
// NewXrayTrafficJob creates a new traffic collection job instance.
func NewXrayTrafficJob() *XrayTrafficJob {
return new(XrayTrafficJob)
}
// Run collects traffic statistics from Xray and updates the database, triggering restart if needed.
func (j *XrayTrafficJob) Run() {
if !j.xrayService.IsXrayRunning() {
return
}
traffics, clientTraffics, err := j.xrayService.GetXrayTraffic()
if err != nil {
return
}
err, needRestart0 := j.inboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add inbound traffic failed:", err)
}
err, needRestart1 := j.outboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add outbound traffic failed:", err)
}
if ExternalTrafficInformEnable, err := j.settingService.GetExternalTrafficInformEnable(); ExternalTrafficInformEnable {
j.informTrafficToExternalAPI(traffics, clientTraffics)
} else if err != nil {
logger.Warning("get ExternalTrafficInformEnable failed:", err)
}
if needRestart0 || needRestart1 {
j.xrayService.SetToNeedRestart()
}
// If no frontend client is connected, skip all WebSocket broadcasting routines,
// including expensive DB queries for online clients and JSON marshaling.
if !websocket.HasClients() {
return
}
// Update online clients list and map
onlineClients := j.inboundService.GetOnlineClients()
lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
if err != nil {
logger.Warning("get clients last online failed:", err)
lastOnlineMap = make(map[string]int64)
}
// Broadcast traffic update (deltas and online stats) via WebSocket
trafficUpdate := map[string]any{
"traffics": traffics,
"clientTraffics": clientTraffics,
"onlineClients": onlineClients,
"lastOnlineMap": lastOnlineMap,
}
websocket.BroadcastTraffic(trafficUpdate)
// Fetch updated inbounds from database with accumulated traffic values
// This ensures frontend receives the actual total traffic for real-time UI refresh.
updatedInbounds, err := j.inboundService.GetAllInbounds()
if err != nil {
logger.Warning("get all inbounds for websocket failed:", err)
}
updatedOutbounds, err := j.outboundService.GetOutboundsTraffic()
if err != nil {
logger.Warning("get all outbounds for websocket failed:", err)
}
// The WebSocket hub will automatically check the payload size.
// If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead.
if updatedInbounds != nil {
websocket.BroadcastInbounds(updatedInbounds)
}
if updatedOutbounds != nil {
websocket.BroadcastOutbounds(updatedOutbounds)
}
}
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {
informURL, err := j.settingService.GetExternalTrafficInformURI()
if err != nil {
logger.Warning("get ExternalTrafficInformURI failed:", err)
return
}
requestBody, err := json.Marshal(map[string]any{"clientTraffics": clientTraffics, "inboundTraffics": inboundTraffics})
if err != nil {
logger.Warning("parse client/inbound traffic failed:", err)
return
}
request := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(request)
request.Header.SetMethod("POST")
request.Header.SetContentType("application/json; charset=UTF-8")
request.SetBody([]byte(requestBody))
request.SetRequestURI(informURL)
response := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(response)
if err := fasthttp.Do(request, response); err != nil {
logger.Warning("POST ExternalTrafficInformURI failed:", err)
}
}