diff --git a/web/job/client_traffic_sync_job.go b/web/job/client_traffic_sync_job.go new file mode 100644 index 00000000..587c76e1 --- /dev/null +++ b/web/job/client_traffic_sync_job.go @@ -0,0 +1,178 @@ +package job + +import ( + "encoding/json" + "fmt" + "time" + + "x-ui/database" + "x-ui/database/model" + "x-ui/logger" + "x-ui/xray" + + "gorm.io/gorm" +) + +type SyncClientTrafficJob struct { + subClientsCollection map[string][]string +} + +func NewClientTrafficSyncJob() *SyncClientTrafficJob { + return new(SyncClientTrafficJob) +} +func (j *SyncClientTrafficJob) Run() { + // Step 1: Group clients by SubID + subClientsCollection := j.collectClientsGroupedBySubId() + + // Step 2: Sync client traffics for each SubID group + for subId, emails := range subClientsCollection { + err := j.syncClientTraffics(map[string][]string{subId: emails}) + if err != nil { + logger.Error("Failed to sync traffics for SubID ", subId, ": ", err) + } + } +} + +// collectClientsGroupedBySubId groups clients by their SubIDs +func (j *SyncClientTrafficJob) collectClientsGroupedBySubId() map[string][]string { + db := database.GetDB() + result := make(map[string][]string) + + // Fetch all inbounds + var inbounds []*model.Inbound + if err := db.Model(&model.Inbound{}).Find(&inbounds).Error; err != nil { + logger.Error("Error fetching inbounds: ", err) + return result // Return empty map on error + } + + // Process each inbound + for _, inbound := range inbounds { + if inbound.Settings == "" { + continue + } + + settingsMap, err := parseSettings(inbound.Settings, uint(inbound.Id)) + if err != nil { + logger.Error(err) + continue + } + + clients, ok := settingsMap["clients"].([]interface{}) + if !ok { + continue + } + + processClients(clients, result) + } + + // Remove SubIDs with one or fewer emails + filterSingleEmailSubIDs(result) + + return result +} + +// parseSettings unmarshals the JSON settings and returns it as a map +func parseSettings(settings string, inboundID uint) (map[string]interface{}, error) { + if !json.Valid([]byte(settings)) { + return nil, fmt.Errorf("Invalid JSON format in Settings for inbound ID %d", inboundID) + } + + var tempData map[string]interface{} + if err := json.Unmarshal([]byte(settings), &tempData); err != nil { + return nil, fmt.Errorf("Error unmarshalling settings for inbound ID %d: %v", inboundID, err) + } + + return tempData, nil +} + +// processClients extracts SubID and email from the clients and populates the result map +func processClients(clients []interface{}, result map[string][]string) { + for _, client := range clients { + clientMap, ok := client.(map[string]interface{}) + if !ok { + continue + } + + subId, ok := clientMap["subId"].(string) + if !ok || subId == "" { + continue + } + + email, ok := clientMap["email"].(string) + if !ok || email == "" { + continue + } + + result[subId] = append(result[subId], email) + } +} + +// filterSingleEmailSubIDs removes SubIDs with one or fewer emails from the result map +func filterSingleEmailSubIDs(result map[string][]string) { + for subId, emails := range result { + if len(emails) <= 1 { + delete(result, subId) + } + } +} + +// syncClientTraffics synchronizes traffic data for each SubID group +func (j *SyncClientTrafficJob) syncClientTraffics(result map[string][]string) error { + for subId, emails := range result { + db := database.GetDB() + + // Step 1: Calculate maxUp and maxDown (outside transaction) + var maxUp, maxDown int64 + err := calculateMaxTraffic(db, emails, &maxUp, &maxDown) + if err != nil { + logger.Error("Failed to calculate max traffic for SubID ", subId, ": ", err) + continue + } + + // Step 2: Update traffic data with retry mechanism + err = retryOperation(func() error { + return updateTraffic(db, emails, maxUp, maxDown) + }, 5, 100*time.Millisecond) + + if err != nil { + logger.Error("Failed to update client traffics for SubID ", subId, ": ", err) + } + } + return nil +} + +// calculateMaxTraffic calculates max up and down traffic for a group of emails +func calculateMaxTraffic(db *gorm.DB, emails []string, maxUp, maxDown *int64) error { + return db.Model(&xray.ClientTraffic{}). + Where("email IN ?", emails). + Select("MAX(up) AS max_up, MAX(down) AS max_down"). + Row(). + Scan(maxUp, maxDown) +} + +// updateTraffic updates the traffic data in the database within a transaction +func updateTraffic(db *gorm.DB, emails []string, maxUp, maxDown int64) error { + return db.Transaction(func(tx *gorm.DB) error { + return tx.Model(&xray.ClientTraffic{}). + Where("email IN ?", emails). + Updates(map[string]interface{}{ + "up": maxUp, + "down": maxDown, + }).Error + }) +} + +// retryOperation retries an operation multiple times with a delay +func retryOperation(operation func() error, maxRetries int, delay time.Duration) error { + var err error + for i := 0; i < maxRetries; i++ { + err = operation() + if err == nil { + return nil + } + logger.Info(fmt.Sprintf("Retry %d/%d failed: %v", i+1, maxRetries, err)) + time.Sleep(delay) + } + return err +} + diff --git a/web/web.go b/web/web.go index 35ccec70..867e4357 100644 --- a/web/web.go +++ b/web/web.go @@ -260,6 +260,9 @@ func (s *Server) startTask() { s.cron.AddJob("@every 10s", job.NewXrayTrafficJob()) }() + // Sync the client traffic with same SubId every 10 seconds + s.cron.AddJob("@every 10s", job.NewClientTrafficSyncJob()) + // check client ips from log file every 10 sec s.cron.AddJob("@every 10s", job.NewCheckClientIpJob())