feat: add durable traffic deltas and shared flush loop

This commit is contained in:
Sora39831 2026-04-10 15:25:16 +08:00
parent 3cfa554786
commit 87282dde33
7 changed files with 399 additions and 7 deletions

View file

@ -3,6 +3,7 @@ package job
import (
"encoding/json"
"github.com/mhsanaei/3x-ui/v2/config"
"github.com/mhsanaei/3x-ui/v2/logger"
"github.com/mhsanaei/3x-ui/v2/web/service"
"github.com/mhsanaei/3x-ui/v2/web/websocket"
@ -17,11 +18,16 @@ type XrayTrafficJob struct {
xrayService service.XrayService
inboundService service.InboundService
outboundService service.OutboundService
trafficFlushSvc *service.TrafficFlushService
}
// NewXrayTrafficJob creates a new traffic collection job instance.
func NewXrayTrafficJob() *XrayTrafficJob {
return new(XrayTrafficJob)
return &XrayTrafficJob{
trafficFlushSvc: service.NewTrafficFlushService(
service.NewTrafficPendingStore(config.GetTrafficPendingPath()),
),
}
}
// Run collects traffic statistics from Xray and updates the database, triggering restart if needed.
@ -33,9 +39,16 @@ func (j *XrayTrafficJob) Run() {
if err != nil {
return
}
err, needRestart0 := j.inboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add inbound traffic failed:", err)
needRestart0 := false
if service.IsSharedModeEnabled() {
if err := j.trafficFlushSvc.Collect(clientTraffics); err != nil {
logger.Warning("collect shared traffic failed:", err)
}
} else {
err, needRestart0 = j.inboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add inbound traffic failed:", err)
}
}
err, needRestart1 := j.outboundService.AddTraffic(traffics, clientTraffics)
if err != nil {

View file

@ -1165,6 +1165,14 @@ func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraff
return err, false
}
needRestart, err := s.ReconcileSharedTrafficState(tx)
if err != nil {
return err, false
}
return nil, needRestart
}
func (s *InboundService) ReconcileSharedTrafficState(tx *gorm.DB) (bool, error) {
needRestart0, count, err := s.autoRenewClients(tx)
if err != nil {
logger.Warning("Error in renew clients:", err)
@ -1185,7 +1193,7 @@ func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraff
} else if count > 0 {
logger.Debugf("%v inbounds disabled", count)
}
return nil, (needRestart0 || needRestart1 || needRestart2)
return needRestart0 || needRestart1 || needRestart2, nil
}
func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {

View file

@ -0,0 +1,127 @@
package service
import (
"context"
"time"
"github.com/mhsanaei/3x-ui/v2/config"
"github.com/mhsanaei/3x-ui/v2/database"
"github.com/mhsanaei/3x-ui/v2/database/model"
"github.com/mhsanaei/3x-ui/v2/util/common"
"github.com/mhsanaei/3x-ui/v2/xray"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type TrafficFlushService struct {
store *TrafficPendingStore
inbounds InboundService
xrayService XrayService
flushFn func([]TrafficDelta) error
reconcileFn func(*gorm.DB) (bool, error)
markRestart func()
}
func NewTrafficFlushService(store *TrafficPendingStore) *TrafficFlushService {
svc := &TrafficFlushService{store: store}
svc.flushFn = svc.flushToDatabase
svc.reconcileFn = svc.inbounds.ReconcileSharedTrafficState
svc.markRestart = svc.xrayService.SetToNeedRestart
return svc
}
func (s *TrafficFlushService) Collect(clientTraffics []*xray.ClientTraffic) error {
deltas := make([]TrafficDelta, 0, len(clientTraffics))
for _, traffic := range clientTraffics {
if traffic == nil || (traffic.Up == 0 && traffic.Down == 0) {
continue
}
deltas = append(deltas, TrafficDelta{
InboundID: traffic.InboundId,
Email: traffic.Email,
UpDelta: traffic.Up,
DownDelta: traffic.Down,
})
}
if len(deltas) == 0 {
return nil
}
return s.store.Merge(deltas)
}
func (s *TrafficFlushService) flushToDatabase(deltas []TrafficDelta) error {
now := time.Now().UnixMilli()
return database.GetDB().Transaction(func(tx *gorm.DB) error {
for _, delta := range deltas {
if err := tx.Model(&model.Inbound{}).
Where("id = ?", delta.InboundID).
Updates(map[string]any{
"up": gorm.Expr("up + ?", delta.UpDelta),
"down": gorm.Expr("down + ?", delta.DownDelta),
"all_time": gorm.Expr("COALESCE(all_time, 0) + ?", delta.UpDelta+delta.DownDelta),
}).Error; err != nil {
return err
}
row := xray.ClientTraffic{
InboundId: delta.InboundID,
Email: delta.Email,
Enable: true,
Up: delta.UpDelta,
Down: delta.DownDelta,
AllTime: delta.UpDelta + delta.DownDelta,
LastOnline: now,
}
if err := tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "inbound_id"}, {Name: "email"}},
DoUpdates: clause.Assignments(map[string]any{
"up": gorm.Expr("up + ?", delta.UpDelta),
"down": gorm.Expr("down + ?", delta.DownDelta),
"all_time": gorm.Expr("COALESCE(all_time, 0) + ?", delta.UpDelta+delta.DownDelta),
"last_online": now,
}),
}).Create(&row).Error; err != nil {
return err
}
}
if IsMaster() {
needRestart, err := s.reconcileFn(tx)
if err != nil {
return err
}
if needRestart && s.markRestart != nil {
s.markRestart()
}
}
return nil
})
}
func (s *TrafficFlushService) FlushOnce() error {
deltas, err := s.store.Take()
if err != nil || len(deltas) == 0 {
return err
}
if err := s.flushFn(deltas); err != nil {
return common.Combine(err, s.store.Merge(deltas))
}
return nil
}
func (s *TrafficFlushService) Run(ctx context.Context) {
interval := time.Duration(config.GetNodeConfigFromJSON().TrafficFlushSeconds) * time.Second
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
_ = s.FlushOnce()
return
case <-ticker.C:
_ = s.FlushOnce()
}
}
}

View file

@ -0,0 +1,121 @@
package service
import (
"errors"
"path/filepath"
"testing"
"github.com/mhsanaei/3x-ui/v2/database"
"github.com/mhsanaei/3x-ui/v2/database/model"
"github.com/mhsanaei/3x-ui/v2/xray"
"gorm.io/gorm"
)
func TestTrafficPendingStoreMerge(t *testing.T) {
store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json"))
if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", UpDelta: 7}}); err != nil {
t.Fatalf("Merge error: %v", err)
}
if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", DownDelta: 9}}); err != nil {
t.Fatalf("Merge error: %v", err)
}
deltas, err := store.Load()
if err != nil {
t.Fatalf("Load error: %v", err)
}
if len(deltas) != 1 {
t.Fatalf("expected one merged delta, got %d", len(deltas))
}
if deltas[0].UpDelta != 7 || deltas[0].DownDelta != 9 {
t.Fatalf("unexpected merged delta: %+v", deltas[0])
}
}
func TestFlushOnceClearsPendingOnSuccess(t *testing.T) {
setupTestDB(t)
if err := database.GetDB().Create(&model.Inbound{Id: 1, Tag: "inbound-443", Enable: true}).Error; err != nil {
t.Fatalf("seed inbound failed: %v", err)
}
if err := database.GetDB().Create(&xray.ClientTraffic{InboundId: 1, Email: "alice@example.com", Enable: true}).Error; err != nil {
t.Fatalf("seed client traffic failed: %v", err)
}
store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json"))
if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", UpDelta: 7, DownDelta: 9}}); err != nil {
t.Fatalf("Merge error: %v", err)
}
svc := NewTrafficFlushService(store)
if err := svc.FlushOnce(); err != nil {
t.Fatalf("FlushOnce error: %v", err)
}
var clientTraffic xray.ClientTraffic
if err := database.GetDB().First(&clientTraffic, "inbound_id = ? AND email = ?", 1, "alice@example.com").Error; err != nil {
t.Fatalf("lookup client traffic failed: %v", err)
}
if clientTraffic.Up != 7 || clientTraffic.Down != 9 {
t.Fatalf("unexpected flushed traffic: %+v", clientTraffic)
}
deltas, err := store.Load()
if err != nil {
t.Fatalf("Load error: %v", err)
}
if len(deltas) != 0 {
t.Fatalf("expected pending deltas to be cleared, got %+v", deltas)
}
}
func TestFlushOnceKeepsPendingOnFailure(t *testing.T) {
store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json"))
if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", UpDelta: 3}}); err != nil {
t.Fatalf("Merge error: %v", err)
}
svc := NewTrafficFlushService(store)
svc.flushFn = func([]TrafficDelta) error { return errors.New("boom") }
if err := svc.FlushOnce(); err == nil {
t.Fatal("expected flush failure")
}
deltas, err := store.Load()
if err != nil {
t.Fatalf("Load error: %v", err)
}
if len(deltas) != 1 {
t.Fatalf("expected pending delta to remain, got %+v", deltas)
}
}
func TestFlushOnceMarksRestartWhenReconciliationRequiresIt(t *testing.T) {
setupTestDB(t)
if err := database.GetDB().Create(&model.Inbound{Id: 1, Tag: "inbound-443", Enable: true}).Error; err != nil {
t.Fatalf("seed inbound failed: %v", err)
}
if err := database.GetDB().Create(&xray.ClientTraffic{InboundId: 1, Email: "alice@example.com", Enable: true}).Error; err != nil {
t.Fatalf("seed client traffic failed: %v", err)
}
store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json"))
if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", UpDelta: 1}}); err != nil {
t.Fatalf("Merge error: %v", err)
}
restartMarked := false
svc := NewTrafficFlushService(store)
svc.reconcileFn = func(*gorm.DB) (bool, error) { return true, nil }
svc.markRestart = func() { restartMarked = true }
if err := svc.FlushOnce(); err != nil {
t.Fatalf("FlushOnce error: %v", err)
}
if !restartMarked {
t.Fatal("expected flush to mark restart when reconciliation requires it")
}
}

View file

@ -0,0 +1,113 @@
package service
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
)
type TrafficDelta struct {
InboundID int `json:"inboundId"`
Email string `json:"email"`
UpDelta int64 `json:"upDelta"`
DownDelta int64 `json:"downDelta"`
}
type TrafficPendingStore struct {
path string
mu sync.Mutex
}
func NewTrafficPendingStore(path string) *TrafficPendingStore {
return &TrafficPendingStore{path: path}
}
func (s *TrafficPendingStore) Load() ([]TrafficDelta, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.loadUnlocked()
}
func (s *TrafficPendingStore) Save(deltas []TrafficDelta) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.saveUnlocked(deltas)
}
func (s *TrafficPendingStore) Merge(newDeltas []TrafficDelta) error {
s.mu.Lock()
defer s.mu.Unlock()
current, err := s.loadUnlocked()
if err != nil {
return err
}
index := map[string]int{}
for i, delta := range current {
index[deltaKey(delta.InboundID, delta.Email)] = i
}
for _, delta := range newDeltas {
key := deltaKey(delta.InboundID, delta.Email)
if idx, ok := index[key]; ok {
current[idx].UpDelta += delta.UpDelta
current[idx].DownDelta += delta.DownDelta
continue
}
index[key] = len(current)
current = append(current, delta)
}
return s.saveUnlocked(current)
}
func (s *TrafficPendingStore) Take() ([]TrafficDelta, error) {
s.mu.Lock()
defer s.mu.Unlock()
deltas, err := s.loadUnlocked()
if err != nil || len(deltas) == 0 {
return deltas, err
}
if err := s.saveUnlocked([]TrafficDelta{}); err != nil {
return nil, err
}
return deltas, nil
}
func (s *TrafficPendingStore) loadUnlocked() ([]TrafficDelta, error) {
data, err := os.ReadFile(s.path)
if os.IsNotExist(err) {
return []TrafficDelta{}, nil
}
if err != nil {
return nil, err
}
var deltas []TrafficDelta
if err := json.Unmarshal(data, &deltas); err != nil {
return nil, err
}
if deltas == nil {
return []TrafficDelta{}, nil
}
return deltas, nil
}
func (s *TrafficPendingStore) saveUnlocked(deltas []TrafficDelta) error {
if err := os.MkdirAll(filepath.Dir(s.path), 0755); err != nil {
return err
}
data, err := json.MarshalIndent(deltas, "", " ")
if err != nil {
return err
}
return os.WriteFile(s.path, data, 0644)
}
func deltaKey(inboundID int, email string) string {
return fmt.Sprintf("%d:%s", inboundID, email)
}

View file

@ -424,6 +424,15 @@ func (s *Server) startNodeLoops() {
}
}
func (s *Server) startTrafficFlushLoop() {
if !service.IsSharedModeEnabled() {
return
}
store := service.NewTrafficPendingStore(config.GetTrafficPendingPath())
flushService := service.NewTrafficFlushService(store)
go flushService.Run(s.ctx)
}
// Start initializes and starts the web server with configured settings, routes, and background jobs.
func (s *Server) Start() (err error) {
// This is an anonymous function, no function name
@ -494,6 +503,7 @@ func (s *Server) Start() (err error) {
s.startTask()
s.startNodeLoops()
s.startTrafficFlushLoop()
isTgbotenabled, err := s.settingService.GetTgbotEnabled()
if (err == nil) && (isTgbotenabled) {

View file

@ -4,9 +4,9 @@ package xray
// It tracks upload/download usage, expiry times, and online status for inbound clients.
type ClientTraffic struct {
Id int `json:"id" form:"id" gorm:"primaryKey;autoIncrement"`
InboundId int `json:"inboundId" form:"inboundId"`
InboundId int `json:"inboundId" form:"inboundId" gorm:"uniqueIndex:idx_client_traffics_inbound_email"`
Enable bool `json:"enable" form:"enable"`
Email string `json:"email" form:"email"`
Email string `json:"email" form:"email" gorm:"uniqueIndex:idx_client_traffics_inbound_email"`
UUID string `json:"uuid" form:"uuid" gorm:"-"`
SubId string `json:"subId" form:"subId" gorm:"-"`
Up int64 `json:"up" form:"up"`