diff --git a/config/version b/config/version index e8501de6..9deb7ce3 100644 --- a/config/version +++ b/config/version @@ -1 +1 @@ -v1.4.5-beta +v1.4.6-beta diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go index 7811a4ca..8795c72a 100644 --- a/web/job/xray_traffic_job.go +++ b/web/job/xray_traffic_job.go @@ -3,7 +3,6 @@ package job import ( "encoding/json" - "github.com/mhsanaei/3x-ui/v2/config" "github.com/mhsanaei/3x-ui/v2/logger" "github.com/mhsanaei/3x-ui/v2/web/service" "github.com/mhsanaei/3x-ui/v2/web/websocket" @@ -22,11 +21,9 @@ type XrayTrafficJob struct { } // NewXrayTrafficJob creates a new traffic collection job instance. -func NewXrayTrafficJob() *XrayTrafficJob { +func NewXrayTrafficJob(store *service.TrafficPendingStore) *XrayTrafficJob { return &XrayTrafficJob{ - trafficFlushSvc: service.NewTrafficFlushService( - service.NewTrafficPendingStore(config.GetTrafficPendingPath()), - ), + trafficFlushSvc: service.NewTrafficFlushService(store), } } diff --git a/web/service/traffic_flush.go b/web/service/traffic_flush.go index bdf29f79..290f847b 100644 --- a/web/service/traffic_flush.go +++ b/web/service/traffic_flush.go @@ -143,6 +143,11 @@ func (s *TrafficFlushService) flushToDatabase(deltas []TrafficDelta) error { kind = TrafficDeltaKindClient } + if delta.InboundID == 0 { + logger.Warningf("skip traffic delta with inbound_id=0: kind=%s email=%s", kind, delta.Email) + continue + } + if err := tx.Model(&model.Inbound{}). Where("id = ?", delta.InboundID). Updates(map[string]any{ diff --git a/web/service/traffic_flush_test.go b/web/service/traffic_flush_test.go index 7dbb9121..dad045df 100644 --- a/web/service/traffic_flush_test.go +++ b/web/service/traffic_flush_test.go @@ -370,3 +370,46 @@ func TestCollectSkipsUnknownEmail(t *testing.T) { t.Fatalf("unexpected residual: %+v", deltas[0]) } } + +func TestFlushOnceSkipsZeroInboundIdDelta(t *testing.T) { + setupTestDB(t) + + if err := database.GetDB().Create(&model.Inbound{Id: 1, Tag: "inbound-443", Enable: true}).Error; err != nil { + t.Fatalf("seed inbound failed: %v", err) + } + if err := database.GetDB().Create(&xray.ClientTraffic{InboundId: 1, Email: "alice@example.com", Enable: true}).Error; err != nil { + t.Fatalf("seed client traffic failed: %v", err) + } + + store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json")) + // Simulate stale delta with inbound_id=0 (from before fix) mixed with valid delta + if err := store.Merge([]TrafficDelta{ + {Kind: TrafficDeltaKindClient, InboundID: 0, Email: "alice@example.com", UpDelta: 100, DownDelta: 200}, + {Kind: TrafficDeltaKindClient, InboundID: 1, Email: "alice@example.com", UpDelta: 7, DownDelta: 9}, + }); err != nil { + t.Fatalf("Merge error: %v", err) + } + + svc := NewTrafficFlushService(store) + if err := svc.FlushOnce(); err != nil { + t.Fatalf("FlushOnce error: %v", err) + } + + // Verify valid delta was flushed + var clientTraffic xray.ClientTraffic + if err := database.GetDB().First(&clientTraffic, "inbound_id = ? AND email = ?", 1, "alice@example.com").Error; err != nil { + t.Fatalf("lookup client traffic failed: %v", err) + } + if clientTraffic.Up != 7 || clientTraffic.Down != 9 { + t.Fatalf("unexpected flushed traffic (should only include valid delta): %+v", clientTraffic) + } + + // Verify pending is cleared (zero InboundID delta was skipped, not re-queued) + deltas, err := store.Load() + if err != nil { + t.Fatalf("Load error: %v", err) + } + if len(deltas) != 0 { + t.Fatalf("expected pending deltas to be cleared, got %+v", deltas) + } +} diff --git a/web/web.go b/web/web.go index fcd6628f..d86af50c 100644 --- a/web/web.go +++ b/web/web.go @@ -139,6 +139,8 @@ type Server struct { cron *cron.Cron + trafficStore *service.TrafficPendingStore + ctx context.Context cancel context.CancelFunc } @@ -351,7 +353,7 @@ func (s *Server) startTask() { go func() { time.Sleep(time.Second * 5) // Statistics every 10 seconds, start the delay for 5 seconds for the first time, and staggered with the time to restart xray - s.cron.AddJob("@every 10s", job.NewXrayTrafficJob()) + s.cron.AddJob("@every 10s", job.NewXrayTrafficJob(s.trafficStore)) }() // check client ips from log file every 10 sec @@ -428,8 +430,7 @@ func (s *Server) startTrafficFlushLoop() { if !service.IsSharedModeEnabled() { return } - store := service.NewTrafficPendingStore(config.GetTrafficPendingPath()) - flushService := service.NewTrafficFlushService(store) + flushService := service.NewTrafficFlushService(s.trafficStore) go flushService.Run(s.ctx) } @@ -449,6 +450,8 @@ func (s *Server) Start() (err error) { s.cron = cron.New(cron.WithLocation(loc), cron.WithSeconds()) s.cron.Start() + s.trafficStore = service.NewTrafficPendingStore(config.GetTrafficPendingPath()) + engine, err := s.initRouter() if err != nil { return err