fix: resolve shared-mode traffic flush blocked by stale inboundId=0 delta

The traffic-pending.json file could contain a stale client traffic delta
with inboundId=0 (created before the InboundId resolution fix). When
flushToDatabase tried to INSERT this into client_traffics, it violated
the foreign key constraint fk_inbounds_client_stats, causing the entire
transaction to roll back and blocking ALL traffic from being written to
MariaDB.

- Skip deltas with InboundID==0 in flushToDatabase with a warning log
- Share a single TrafficPendingStore between XrayTrafficJob and the
  flush loop to eliminate a race condition from dual file instances
- Add test for zero InboundID skip behavior
This commit is contained in:
root 2026-04-24 02:56:23 +08:00
parent 157fb93f24
commit 51f17922fa
5 changed files with 57 additions and 9 deletions

View file

@ -1 +1 @@
v1.4.5-beta
v1.4.6-beta

View file

@ -3,7 +3,6 @@ package job
import (
"encoding/json"
"github.com/mhsanaei/3x-ui/v2/config"
"github.com/mhsanaei/3x-ui/v2/logger"
"github.com/mhsanaei/3x-ui/v2/web/service"
"github.com/mhsanaei/3x-ui/v2/web/websocket"
@ -22,11 +21,9 @@ type XrayTrafficJob struct {
}
// NewXrayTrafficJob creates a new traffic collection job instance.
func NewXrayTrafficJob() *XrayTrafficJob {
func NewXrayTrafficJob(store *service.TrafficPendingStore) *XrayTrafficJob {
return &XrayTrafficJob{
trafficFlushSvc: service.NewTrafficFlushService(
service.NewTrafficPendingStore(config.GetTrafficPendingPath()),
),
trafficFlushSvc: service.NewTrafficFlushService(store),
}
}

View file

@ -143,6 +143,11 @@ func (s *TrafficFlushService) flushToDatabase(deltas []TrafficDelta) error {
kind = TrafficDeltaKindClient
}
if delta.InboundID == 0 {
logger.Warningf("skip traffic delta with inbound_id=0: kind=%s email=%s", kind, delta.Email)
continue
}
if err := tx.Model(&model.Inbound{}).
Where("id = ?", delta.InboundID).
Updates(map[string]any{

View file

@ -370,3 +370,46 @@ func TestCollectSkipsUnknownEmail(t *testing.T) {
t.Fatalf("unexpected residual: %+v", deltas[0])
}
}
func TestFlushOnceSkipsZeroInboundIdDelta(t *testing.T) {
setupTestDB(t)
if err := database.GetDB().Create(&model.Inbound{Id: 1, Tag: "inbound-443", Enable: true}).Error; err != nil {
t.Fatalf("seed inbound failed: %v", err)
}
if err := database.GetDB().Create(&xray.ClientTraffic{InboundId: 1, Email: "alice@example.com", Enable: true}).Error; err != nil {
t.Fatalf("seed client traffic failed: %v", err)
}
store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json"))
// Simulate stale delta with inbound_id=0 (from before fix) mixed with valid delta
if err := store.Merge([]TrafficDelta{
{Kind: TrafficDeltaKindClient, InboundID: 0, Email: "alice@example.com", UpDelta: 100, DownDelta: 200},
{Kind: TrafficDeltaKindClient, InboundID: 1, Email: "alice@example.com", UpDelta: 7, DownDelta: 9},
}); err != nil {
t.Fatalf("Merge error: %v", err)
}
svc := NewTrafficFlushService(store)
if err := svc.FlushOnce(); err != nil {
t.Fatalf("FlushOnce error: %v", err)
}
// Verify valid delta was flushed
var clientTraffic xray.ClientTraffic
if err := database.GetDB().First(&clientTraffic, "inbound_id = ? AND email = ?", 1, "alice@example.com").Error; err != nil {
t.Fatalf("lookup client traffic failed: %v", err)
}
if clientTraffic.Up != 7 || clientTraffic.Down != 9 {
t.Fatalf("unexpected flushed traffic (should only include valid delta): %+v", clientTraffic)
}
// Verify pending is cleared (zero InboundID delta was skipped, not re-queued)
deltas, err := store.Load()
if err != nil {
t.Fatalf("Load error: %v", err)
}
if len(deltas) != 0 {
t.Fatalf("expected pending deltas to be cleared, got %+v", deltas)
}
}

View file

@ -139,6 +139,8 @@ type Server struct {
cron *cron.Cron
trafficStore *service.TrafficPendingStore
ctx context.Context
cancel context.CancelFunc
}
@ -351,7 +353,7 @@ func (s *Server) startTask() {
go func() {
time.Sleep(time.Second * 5)
// Statistics every 10 seconds, start the delay for 5 seconds for the first time, and staggered with the time to restart xray
s.cron.AddJob("@every 10s", job.NewXrayTrafficJob())
s.cron.AddJob("@every 10s", job.NewXrayTrafficJob(s.trafficStore))
}()
// check client ips from log file every 10 sec
@ -428,8 +430,7 @@ func (s *Server) startTrafficFlushLoop() {
if !service.IsSharedModeEnabled() {
return
}
store := service.NewTrafficPendingStore(config.GetTrafficPendingPath())
flushService := service.NewTrafficFlushService(store)
flushService := service.NewTrafficFlushService(s.trafficStore)
go flushService.Run(s.ctx)
}
@ -449,6 +450,8 @@ func (s *Server) Start() (err error) {
s.cron = cron.New(cron.WithLocation(loc), cron.WithSeconds())
s.cron.Start()
s.trafficStore = service.NewTrafficPendingStore(config.GetTrafficPendingPath())
engine, err := s.initRouter()
if err != nil {
return err