From 1c299578aa25ea74e93416618c0dcefef69749ba Mon Sep 17 00:00:00 2001 From: SadeghKalami Date: Sat, 16 May 2026 06:17:20 +0330 Subject: [PATCH] feat: implement sub-quota propagation and automatic re-enablement for clients sharing the same SubID --- web/service/inbound.go | 332 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 328 insertions(+), 4 deletions(-) diff --git a/web/service/inbound.go b/web/service/inbound.go index fc3256e2..b7dc2f82 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -1632,6 +1632,45 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin logger.Debug("Client old email not found") needRestart = true } + // Propagate subTotalGB changes to all siblings sharing the same SubID. + newClient := clients[0] + if strings.TrimSpace(newClient.SubID) != "" && newClient.SubTotalGB > 0 { + // First: update siblings within THIS inbound's settings. + var localSettings map[string]any + if err2 := json.Unmarshal([]byte(oldInbound.Settings), &localSettings); err2 == nil { + if localClients, ok := localSettings["clients"].([]any); ok { + localChanged := false + for i, ic := range localClients { + cm, ok := ic.(map[string]any) + if !ok { + continue + } + cSubId, _ := cm["subId"].(string) + if cSubId != newClient.SubID { + continue + } + oldVal, _ := cm["subTotalGB"].(float64) + if int64(oldVal) == newClient.SubTotalGB { + continue + } + cm["subTotalGB"] = newClient.SubTotalGB + localClients[i] = cm + localChanged = true + } + if localChanged { + localSettings["clients"] = localClients + if bs, err3 := json.MarshalIndent(localSettings, "", " "); err3 == nil { + oldInbound.Settings = string(bs) + } + } + } + } + // Then: propagate to OTHER inbounds. + if err := s.syncSubTotalGB(tx, newClient.SubID, newClient.SubTotalGB); err != nil { + logger.Warning("syncSubTotalGB from UpdateInboundClient:", err) + } + } + return needRestart, tx.Save(oldInbound).Error } @@ -1959,13 +1998,20 @@ func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clien disabledClientsCount += count } + needRestart4, count, err := s.reEnableSubQuotaClients(tx) + if err != nil { + logger.Warning("Error in re-enabling sub-quota clients:", err) + } else if count > 0 { + logger.Debugf("%v clients re-enabled by sub quota", count) + } + needRestart2, count, err := s.disableInvalidInbounds(tx) if err != nil { logger.Warning("Error in disabling invalid inbounds:", err) } else if count > 0 { logger.Debugf("%v inbounds disabled", count) } - return needRestart0 || needRestart1 || needRestart2 || needRestart3, disabledClientsCount > 0, nil + return needRestart0 || needRestart1 || needRestart2 || needRestart3 || needRestart4, disabledClientsCount > 0, nil } func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error { @@ -2489,7 +2535,7 @@ func (s *InboundService) GetSubTrafficInfo(subId string) (*SubTrafficInfo, error } seen[key] = struct{}{} emails = append(emails, email) - if subTotal == 0 && r.SubTotalGB > 0 { + if r.SubTotalGB > subTotal { subTotal = r.SubTotalGB } } @@ -2546,7 +2592,7 @@ func (s *InboundService) disableSubQuotaClients(tx *gorm.DB) (bool, int64, error return false, 0, err } - // 2. Group by subId: collect emails and find the first non-zero subTotalGB. + // 2. Group by subId: collect emails and use MAX subTotalGB across siblings. type subGroup struct { emails map[string]struct{} subTotalGB int64 @@ -2568,7 +2614,7 @@ func (s *InboundService) disableSubQuotaClients(tx *gorm.DB) (bool, int64, error groups[subId] = g } g.emails[strings.ToLower(email)] = struct{}{} - if g.subTotalGB == 0 && r.SubTotalGB > 0 { + if r.SubTotalGB > g.subTotalGB { g.subTotalGB = r.SubTotalGB } g.members = append(g.members, struct{ InboundId int; Tag, Email string }{ @@ -2722,6 +2768,284 @@ func (s *InboundService) disableSubQuotaClients(tx *gorm.DB) (bool, int64, error return needRestart, totalDisabled, nil } +// syncSubTotalGB propagates a changed subTotalGB value to ALL clients sharing +// the same SubID across ALL inbounds. This is called when the admin explicitly +// sets or changes the quota on any single client. +func (s *InboundService) syncSubTotalGB(tx *gorm.DB, subId string, newValue int64) error { + if strings.TrimSpace(subId) == "" || newValue < 0 { + return nil + } + + // Find all inbounds that have clients with this subId. + var inbounds []*model.Inbound + if err := tx.Raw(` + SELECT DISTINCT inbounds.* + FROM inbounds, + JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client + WHERE REPLACE(JSON_EXTRACT(client.value, '$.subId'), '"', '') = ? + `, subId).Scan(&inbounds).Error; err != nil { + return err + } + + now := time.Now().Unix() * 1000 + for _, inbound := range inbounds { + settings := map[string]any{} + if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil { + continue + } + clientsRaw, ok := settings["clients"].([]any) + if !ok { + continue + } + changed := false + for i, ic := range clientsRaw { + cm, ok := ic.(map[string]any) + if !ok { + continue + } + cSubId, _ := cm["subId"].(string) + if cSubId != subId { + continue + } + oldVal, _ := cm["subTotalGB"].(float64) + if int64(oldVal) == newValue { + continue + } + cm["subTotalGB"] = newValue + cm["updated_at"] = now + clientsRaw[i] = cm + changed = true + } + if !changed { + continue + } + settings["clients"] = clientsRaw + bs, err := json.MarshalIndent(settings, "", " ") + if err != nil { + continue + } + inbound.Settings = string(bs) + if err := tx.Save(inbound).Error; err != nil { + logger.Warning("syncSubTotalGB save inbound:", err) + } + } + return nil +} + +// reEnableSubQuotaClients checks all SubID groups for clients that were +// disabled by sub-quota enforcement but whose aggregate traffic is now +// below the (possibly increased) quota. Re-enables them in both +// client_traffics and inbound settings JSON. +func (s *InboundService) reEnableSubQuotaClients(tx *gorm.DB) (bool, int64, error) { + // 1. Same scan as disableSubQuotaClients. + var allRows []struct { + InboundId int + Tag string + Email string + SubID string `gorm:"column:sub_id"` + SubTotalGB int64 `gorm:"column:sub_total_gb"` + Enable bool `gorm:"column:enable"` + } + err := tx.Raw(` + SELECT inbounds.id AS inbound_id, + inbounds.tag AS tag, + REPLACE(JSON_EXTRACT(client.value, '$.email'), '"', '') AS email, + REPLACE(JSON_EXTRACT(client.value, '$.subId'), '"', '') AS sub_id, + COALESCE(JSON_EXTRACT(client.value, '$.subTotalGB'), 0) AS sub_total_gb, + JSON_EXTRACT(client.value, '$.enable') AS enable + FROM inbounds, + JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client + WHERE inbounds.node_id IS NULL + `).Scan(&allRows).Error + if err != nil { + return false, 0, err + } + + // 2. Group by subId. + type memberInfo struct { + InboundId int + Tag string + Email string + Enable bool + } + type subGroup struct { + emails map[string]struct{} + subTotalGB int64 + members []memberInfo + hasDisabled bool + } + groups := make(map[string]*subGroup) + for _, r := range allRows { + subId := strings.TrimSpace(r.SubID) + if subId == "" { + continue + } + email := strings.TrimSpace(r.Email) + if email == "" { + continue + } + g, ok := groups[subId] + if !ok { + g = &subGroup{emails: make(map[string]struct{})} + groups[subId] = g + } + g.emails[strings.ToLower(email)] = struct{}{} + if r.SubTotalGB > g.subTotalGB { + g.subTotalGB = r.SubTotalGB + } + g.members = append(g.members, memberInfo{ + InboundId: r.InboundId, Tag: r.Tag, Email: email, Enable: r.Enable, + }) + if !r.Enable { + g.hasDisabled = true + } + } + + // 3. For each group: if it has a quota, has disabled members, and + // aggregate traffic is under the quota → re-enable. + type enableTarget struct { + InboundId int + Tag string + Email string + } + var toEnable []enableTarget + + for _, g := range groups { + if g.subTotalGB <= 0 || !g.hasDisabled { + continue + } + + emails := make([]string, 0, len(g.emails)) + for e := range g.emails { + emails = append(emails, e) + } + + var totalUsed int64 + for _, batch := range chunkStrings(emails, sqliteMaxVars) { + var sum struct{ Total int64 } + if err := tx.Model(xray.ClientTraffic{}). + Select("COALESCE(SUM(up + down), 0) AS total"). + Where("LOWER(email) IN ?", batch). + Scan(&sum).Error; err != nil { + continue + } + totalUsed += sum.Total + } + + if totalUsed >= g.subTotalGB { + continue // still over quota + } + + // Under quota — re-enable disabled members. + for _, m := range g.members { + if !m.Enable { + toEnable = append(toEnable, enableTarget{ + InboundId: m.InboundId, Tag: m.Tag, Email: m.Email, + }) + } + } + } + + if len(toEnable) == 0 { + return false, 0, nil + } + + // 4. Re-enable client_traffics rows (only those not individually over + // their own totalGB and not expired — leave those disabled). + enableEmails := make([]string, 0, len(toEnable)) + for _, t := range toEnable { + enableEmails = append(enableEmails, t.Email) + } + uniqEnable := uniqueNonEmptyStrings(enableEmails) + var totalEnabled int64 + for _, batch := range chunkStrings(uniqEnable, sqliteMaxVars) { + result := tx.Model(xray.ClientTraffic{}). + Where("LOWER(email) IN ? AND enable = ?", batch, false). + // Only re-enable if the client isn't individually over its own totalGB + // and hasn't expired. + Where("(total = 0 OR up + down < total)"). + Where("(expiry_time = 0 OR expiry_time < 0 OR expiry_time > ?)", time.Now().UnixMilli()). + Update("enable", true) + if result.Error != nil { + logger.Warning("reEnableSubQuotaClients update client_traffics:", result.Error) + } + totalEnabled += result.RowsAffected + } + + // 5. Update inbound settings JSON to set enable=true. + inboundEmailMap := make(map[int]map[string]struct{}) + for _, t := range toEnable { + if inboundEmailMap[t.InboundId] == nil { + inboundEmailMap[t.InboundId] = make(map[string]struct{}) + } + inboundEmailMap[t.InboundId][strings.ToLower(t.Email)] = struct{}{} + } + inboundIds := make([]int, 0, len(inboundEmailMap)) + for id := range inboundEmailMap { + inboundIds = append(inboundIds, id) + } + var inbounds []*model.Inbound + if err = tx.Model(model.Inbound{}).Where("id IN ?", inboundIds).Find(&inbounds).Error; err != nil { + logger.Warning("reEnableSubQuotaClients fetch inbounds:", err) + return false, totalEnabled, nil + } + now := time.Now().Unix() * 1000 + dirty := make([]*model.Inbound, 0) + for _, inbound := range inbounds { + settings := map[string]any{} + if jsonErr := json.Unmarshal([]byte(inbound.Settings), &settings); jsonErr != nil { + continue + } + clientsRaw, ok := settings["clients"].([]any) + if !ok { + continue + } + emailSet := inboundEmailMap[inbound.Id] + changed := false + for i := range clientsRaw { + c, ok := clientsRaw[i].(map[string]any) + if !ok { + continue + } + email, _ := c["email"].(string) + if _, shouldEnable := emailSet[strings.ToLower(email)]; !shouldEnable { + continue + } + // Don't re-enable in settings if the client_traffics row wasn't + // re-enabled (due to individual quota or expiry). + var ct xray.ClientTraffic + if err := tx.Where("email = ?", email).First(&ct).Error; err == nil && !ct.Enable { + continue + } + c["enable"] = true + c["updated_at"] = now + clientsRaw[i] = c + changed = true + } + if !changed { + continue + } + settings["clients"] = clientsRaw + modifiedSettings, jsonErr := json.MarshalIndent(settings, "", " ") + if jsonErr != nil { + continue + } + inbound.Settings = string(modifiedSettings) + dirty = append(dirty, inbound) + } + if len(dirty) > 0 { + if err = tx.Save(dirty).Error; err != nil { + logger.Warning("reEnableSubQuotaClients update inbound settings:", err) + } + } + + if totalEnabled > 0 { + logger.Infof("Sub-quota: re-enabled %d clients after quota increase", totalEnabled) + } + + return totalEnabled > 0, totalEnabled, nil +} + func (s *InboundService) GetInboundTags() (string, error) { db := database.GetDB() var inboundTags []string