diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go index 8d2bfbd6..bcf0bee0 100644 --- a/web/job/xray_traffic_job.go +++ b/web/job/xray_traffic_job.go @@ -3,6 +3,7 @@ package job import ( "encoding/json" + "github.com/mhsanaei/3x-ui/v2/config" "github.com/mhsanaei/3x-ui/v2/logger" "github.com/mhsanaei/3x-ui/v2/web/service" "github.com/mhsanaei/3x-ui/v2/web/websocket" @@ -17,11 +18,16 @@ type XrayTrafficJob struct { xrayService service.XrayService inboundService service.InboundService outboundService service.OutboundService + trafficFlushSvc *service.TrafficFlushService } // NewXrayTrafficJob creates a new traffic collection job instance. func NewXrayTrafficJob() *XrayTrafficJob { - return new(XrayTrafficJob) + return &XrayTrafficJob{ + trafficFlushSvc: service.NewTrafficFlushService( + service.NewTrafficPendingStore(config.GetTrafficPendingPath()), + ), + } } // Run collects traffic statistics from Xray and updates the database, triggering restart if needed. @@ -33,9 +39,16 @@ func (j *XrayTrafficJob) Run() { if err != nil { return } - err, needRestart0 := j.inboundService.AddTraffic(traffics, clientTraffics) - if err != nil { - logger.Warning("add inbound traffic failed:", err) + needRestart0 := false + if service.IsSharedModeEnabled() { + if err := j.trafficFlushSvc.Collect(clientTraffics); err != nil { + logger.Warning("collect shared traffic failed:", err) + } + } else { + err, needRestart0 = j.inboundService.AddTraffic(traffics, clientTraffics) + if err != nil { + logger.Warning("add inbound traffic failed:", err) + } } err, needRestart1 := j.outboundService.AddTraffic(traffics, clientTraffics) if err != nil { diff --git a/web/service/inbound.go b/web/service/inbound.go index bfa52c3d..da8f3561 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -1165,6 +1165,14 @@ func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraff return err, false } + needRestart, err := s.ReconcileSharedTrafficState(tx) + if err != nil { + return err, false + } + return nil, needRestart +} + +func (s *InboundService) ReconcileSharedTrafficState(tx *gorm.DB) (bool, error) { needRestart0, count, err := s.autoRenewClients(tx) if err != nil { logger.Warning("Error in renew clients:", err) @@ -1185,7 +1193,7 @@ func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraff } else if count > 0 { logger.Debugf("%v inbounds disabled", count) } - return nil, (needRestart0 || needRestart1 || needRestart2) + return needRestart0 || needRestart1 || needRestart2, nil } func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error { diff --git a/web/service/traffic_flush.go b/web/service/traffic_flush.go new file mode 100644 index 00000000..7a53b282 --- /dev/null +++ b/web/service/traffic_flush.go @@ -0,0 +1,127 @@ +package service + +import ( + "context" + "time" + + "github.com/mhsanaei/3x-ui/v2/config" + "github.com/mhsanaei/3x-ui/v2/database" + "github.com/mhsanaei/3x-ui/v2/database/model" + "github.com/mhsanaei/3x-ui/v2/util/common" + "github.com/mhsanaei/3x-ui/v2/xray" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type TrafficFlushService struct { + store *TrafficPendingStore + inbounds InboundService + xrayService XrayService + flushFn func([]TrafficDelta) error + reconcileFn func(*gorm.DB) (bool, error) + markRestart func() +} + +func NewTrafficFlushService(store *TrafficPendingStore) *TrafficFlushService { + svc := &TrafficFlushService{store: store} + svc.flushFn = svc.flushToDatabase + svc.reconcileFn = svc.inbounds.ReconcileSharedTrafficState + svc.markRestart = svc.xrayService.SetToNeedRestart + return svc +} + +func (s *TrafficFlushService) Collect(clientTraffics []*xray.ClientTraffic) error { + deltas := make([]TrafficDelta, 0, len(clientTraffics)) + for _, traffic := range clientTraffics { + if traffic == nil || (traffic.Up == 0 && traffic.Down == 0) { + continue + } + deltas = append(deltas, TrafficDelta{ + InboundID: traffic.InboundId, + Email: traffic.Email, + UpDelta: traffic.Up, + DownDelta: traffic.Down, + }) + } + if len(deltas) == 0 { + return nil + } + return s.store.Merge(deltas) +} + +func (s *TrafficFlushService) flushToDatabase(deltas []TrafficDelta) error { + now := time.Now().UnixMilli() + + return database.GetDB().Transaction(func(tx *gorm.DB) error { + for _, delta := range deltas { + if err := tx.Model(&model.Inbound{}). + Where("id = ?", delta.InboundID). + Updates(map[string]any{ + "up": gorm.Expr("up + ?", delta.UpDelta), + "down": gorm.Expr("down + ?", delta.DownDelta), + "all_time": gorm.Expr("COALESCE(all_time, 0) + ?", delta.UpDelta+delta.DownDelta), + }).Error; err != nil { + return err + } + + row := xray.ClientTraffic{ + InboundId: delta.InboundID, + Email: delta.Email, + Enable: true, + Up: delta.UpDelta, + Down: delta.DownDelta, + AllTime: delta.UpDelta + delta.DownDelta, + LastOnline: now, + } + if err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "inbound_id"}, {Name: "email"}}, + DoUpdates: clause.Assignments(map[string]any{ + "up": gorm.Expr("up + ?", delta.UpDelta), + "down": gorm.Expr("down + ?", delta.DownDelta), + "all_time": gorm.Expr("COALESCE(all_time, 0) + ?", delta.UpDelta+delta.DownDelta), + "last_online": now, + }), + }).Create(&row).Error; err != nil { + return err + } + } + + if IsMaster() { + needRestart, err := s.reconcileFn(tx) + if err != nil { + return err + } + if needRestart && s.markRestart != nil { + s.markRestart() + } + } + return nil + }) +} + +func (s *TrafficFlushService) FlushOnce() error { + deltas, err := s.store.Take() + if err != nil || len(deltas) == 0 { + return err + } + if err := s.flushFn(deltas); err != nil { + return common.Combine(err, s.store.Merge(deltas)) + } + return nil +} + +func (s *TrafficFlushService) Run(ctx context.Context) { + interval := time.Duration(config.GetNodeConfigFromJSON().TrafficFlushSeconds) * time.Second + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + _ = s.FlushOnce() + return + case <-ticker.C: + _ = s.FlushOnce() + } + } +} diff --git a/web/service/traffic_flush_test.go b/web/service/traffic_flush_test.go new file mode 100644 index 00000000..42df07a4 --- /dev/null +++ b/web/service/traffic_flush_test.go @@ -0,0 +1,121 @@ +package service + +import ( + "errors" + "path/filepath" + "testing" + + "github.com/mhsanaei/3x-ui/v2/database" + "github.com/mhsanaei/3x-ui/v2/database/model" + "github.com/mhsanaei/3x-ui/v2/xray" + "gorm.io/gorm" +) + +func TestTrafficPendingStoreMerge(t *testing.T) { + store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json")) + + if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", UpDelta: 7}}); err != nil { + t.Fatalf("Merge error: %v", err) + } + if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", DownDelta: 9}}); err != nil { + t.Fatalf("Merge error: %v", err) + } + + deltas, err := store.Load() + if err != nil { + t.Fatalf("Load error: %v", err) + } + if len(deltas) != 1 { + t.Fatalf("expected one merged delta, got %d", len(deltas)) + } + if deltas[0].UpDelta != 7 || deltas[0].DownDelta != 9 { + t.Fatalf("unexpected merged delta: %+v", deltas[0]) + } +} + +func TestFlushOnceClearsPendingOnSuccess(t *testing.T) { + setupTestDB(t) + + if err := database.GetDB().Create(&model.Inbound{Id: 1, Tag: "inbound-443", Enable: true}).Error; err != nil { + t.Fatalf("seed inbound failed: %v", err) + } + if err := database.GetDB().Create(&xray.ClientTraffic{InboundId: 1, Email: "alice@example.com", Enable: true}).Error; err != nil { + t.Fatalf("seed client traffic failed: %v", err) + } + + store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json")) + if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", UpDelta: 7, DownDelta: 9}}); err != nil { + t.Fatalf("Merge error: %v", err) + } + + svc := NewTrafficFlushService(store) + if err := svc.FlushOnce(); err != nil { + t.Fatalf("FlushOnce error: %v", err) + } + + var clientTraffic xray.ClientTraffic + if err := database.GetDB().First(&clientTraffic, "inbound_id = ? AND email = ?", 1, "alice@example.com").Error; err != nil { + t.Fatalf("lookup client traffic failed: %v", err) + } + if clientTraffic.Up != 7 || clientTraffic.Down != 9 { + t.Fatalf("unexpected flushed traffic: %+v", clientTraffic) + } + + deltas, err := store.Load() + if err != nil { + t.Fatalf("Load error: %v", err) + } + if len(deltas) != 0 { + t.Fatalf("expected pending deltas to be cleared, got %+v", deltas) + } +} + +func TestFlushOnceKeepsPendingOnFailure(t *testing.T) { + store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json")) + if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", UpDelta: 3}}); err != nil { + t.Fatalf("Merge error: %v", err) + } + + svc := NewTrafficFlushService(store) + svc.flushFn = func([]TrafficDelta) error { return errors.New("boom") } + + if err := svc.FlushOnce(); err == nil { + t.Fatal("expected flush failure") + } + + deltas, err := store.Load() + if err != nil { + t.Fatalf("Load error: %v", err) + } + if len(deltas) != 1 { + t.Fatalf("expected pending delta to remain, got %+v", deltas) + } +} + +func TestFlushOnceMarksRestartWhenReconciliationRequiresIt(t *testing.T) { + setupTestDB(t) + + if err := database.GetDB().Create(&model.Inbound{Id: 1, Tag: "inbound-443", Enable: true}).Error; err != nil { + t.Fatalf("seed inbound failed: %v", err) + } + if err := database.GetDB().Create(&xray.ClientTraffic{InboundId: 1, Email: "alice@example.com", Enable: true}).Error; err != nil { + t.Fatalf("seed client traffic failed: %v", err) + } + + store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json")) + if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", UpDelta: 1}}); err != nil { + t.Fatalf("Merge error: %v", err) + } + + restartMarked := false + svc := NewTrafficFlushService(store) + svc.reconcileFn = func(*gorm.DB) (bool, error) { return true, nil } + svc.markRestart = func() { restartMarked = true } + + if err := svc.FlushOnce(); err != nil { + t.Fatalf("FlushOnce error: %v", err) + } + if !restartMarked { + t.Fatal("expected flush to mark restart when reconciliation requires it") + } +} diff --git a/web/service/traffic_pending.go b/web/service/traffic_pending.go new file mode 100644 index 00000000..85167424 --- /dev/null +++ b/web/service/traffic_pending.go @@ -0,0 +1,113 @@ +package service + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" +) + +type TrafficDelta struct { + InboundID int `json:"inboundId"` + Email string `json:"email"` + UpDelta int64 `json:"upDelta"` + DownDelta int64 `json:"downDelta"` +} + +type TrafficPendingStore struct { + path string + mu sync.Mutex +} + +func NewTrafficPendingStore(path string) *TrafficPendingStore { + return &TrafficPendingStore{path: path} +} + +func (s *TrafficPendingStore) Load() ([]TrafficDelta, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.loadUnlocked() +} + +func (s *TrafficPendingStore) Save(deltas []TrafficDelta) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.saveUnlocked(deltas) +} + +func (s *TrafficPendingStore) Merge(newDeltas []TrafficDelta) error { + s.mu.Lock() + defer s.mu.Unlock() + + current, err := s.loadUnlocked() + if err != nil { + return err + } + + index := map[string]int{} + for i, delta := range current { + index[deltaKey(delta.InboundID, delta.Email)] = i + } + + for _, delta := range newDeltas { + key := deltaKey(delta.InboundID, delta.Email) + if idx, ok := index[key]; ok { + current[idx].UpDelta += delta.UpDelta + current[idx].DownDelta += delta.DownDelta + continue + } + index[key] = len(current) + current = append(current, delta) + } + + return s.saveUnlocked(current) +} + +func (s *TrafficPendingStore) Take() ([]TrafficDelta, error) { + s.mu.Lock() + defer s.mu.Unlock() + + deltas, err := s.loadUnlocked() + if err != nil || len(deltas) == 0 { + return deltas, err + } + if err := s.saveUnlocked([]TrafficDelta{}); err != nil { + return nil, err + } + return deltas, nil +} + +func (s *TrafficPendingStore) loadUnlocked() ([]TrafficDelta, error) { + data, err := os.ReadFile(s.path) + if os.IsNotExist(err) { + return []TrafficDelta{}, nil + } + if err != nil { + return nil, err + } + + var deltas []TrafficDelta + if err := json.Unmarshal(data, &deltas); err != nil { + return nil, err + } + if deltas == nil { + return []TrafficDelta{}, nil + } + return deltas, nil +} + +func (s *TrafficPendingStore) saveUnlocked(deltas []TrafficDelta) error { + if err := os.MkdirAll(filepath.Dir(s.path), 0755); err != nil { + return err + } + data, err := json.MarshalIndent(deltas, "", " ") + if err != nil { + return err + } + return os.WriteFile(s.path, data, 0644) +} + +func deltaKey(inboundID int, email string) string { + return fmt.Sprintf("%d:%s", inboundID, email) +} diff --git a/web/web.go b/web/web.go index 1facb260..fcd6628f 100644 --- a/web/web.go +++ b/web/web.go @@ -424,6 +424,15 @@ func (s *Server) startNodeLoops() { } } +func (s *Server) startTrafficFlushLoop() { + if !service.IsSharedModeEnabled() { + return + } + store := service.NewTrafficPendingStore(config.GetTrafficPendingPath()) + flushService := service.NewTrafficFlushService(store) + go flushService.Run(s.ctx) +} + // Start initializes and starts the web server with configured settings, routes, and background jobs. func (s *Server) Start() (err error) { // This is an anonymous function, no function name @@ -494,6 +503,7 @@ func (s *Server) Start() (err error) { s.startTask() s.startNodeLoops() + s.startTrafficFlushLoop() isTgbotenabled, err := s.settingService.GetTgbotEnabled() if (err == nil) && (isTgbotenabled) { diff --git a/xray/client_traffic.go b/xray/client_traffic.go index cab4bdc3..97e92bae 100644 --- a/xray/client_traffic.go +++ b/xray/client_traffic.go @@ -4,9 +4,9 @@ package xray // It tracks upload/download usage, expiry times, and online status for inbound clients. type ClientTraffic struct { Id int `json:"id" form:"id" gorm:"primaryKey;autoIncrement"` - InboundId int `json:"inboundId" form:"inboundId"` + InboundId int `json:"inboundId" form:"inboundId" gorm:"uniqueIndex:idx_client_traffics_inbound_email"` Enable bool `json:"enable" form:"enable"` - Email string `json:"email" form:"email"` + Email string `json:"email" form:"email" gorm:"uniqueIndex:idx_client_traffics_inbound_email"` UUID string `json:"uuid" form:"uuid" gorm:"-"` SubId string `json:"subId" form:"subId" gorm:"-"` Up int64 `json:"up" form:"up"`