feat: implement sub-quota propagation and automatic re-enablement for clients sharing the same SubID

This commit is contained in:
SadeghKalami 2026-05-16 06:17:20 +03:30
parent 2f3243a2cf
commit 1c299578aa

View file

@ -1632,6 +1632,45 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin
logger.Debug("Client old email not found")
needRestart = true
}
// Propagate subTotalGB changes to all siblings sharing the same SubID.
newClient := clients[0]
if strings.TrimSpace(newClient.SubID) != "" && newClient.SubTotalGB > 0 {
// First: update siblings within THIS inbound's settings.
var localSettings map[string]any
if err2 := json.Unmarshal([]byte(oldInbound.Settings), &localSettings); err2 == nil {
if localClients, ok := localSettings["clients"].([]any); ok {
localChanged := false
for i, ic := range localClients {
cm, ok := ic.(map[string]any)
if !ok {
continue
}
cSubId, _ := cm["subId"].(string)
if cSubId != newClient.SubID {
continue
}
oldVal, _ := cm["subTotalGB"].(float64)
if int64(oldVal) == newClient.SubTotalGB {
continue
}
cm["subTotalGB"] = newClient.SubTotalGB
localClients[i] = cm
localChanged = true
}
if localChanged {
localSettings["clients"] = localClients
if bs, err3 := json.MarshalIndent(localSettings, "", " "); err3 == nil {
oldInbound.Settings = string(bs)
}
}
}
}
// Then: propagate to OTHER inbounds.
if err := s.syncSubTotalGB(tx, newClient.SubID, newClient.SubTotalGB); err != nil {
logger.Warning("syncSubTotalGB from UpdateInboundClient:", err)
}
}
return needRestart, tx.Save(oldInbound).Error
}
@ -1959,13 +1998,20 @@ func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clien
disabledClientsCount += count
}
needRestart4, count, err := s.reEnableSubQuotaClients(tx)
if err != nil {
logger.Warning("Error in re-enabling sub-quota clients:", err)
} else if count > 0 {
logger.Debugf("%v clients re-enabled by sub quota", count)
}
needRestart2, count, err := s.disableInvalidInbounds(tx)
if err != nil {
logger.Warning("Error in disabling invalid inbounds:", err)
} else if count > 0 {
logger.Debugf("%v inbounds disabled", count)
}
return needRestart0 || needRestart1 || needRestart2 || needRestart3, disabledClientsCount > 0, nil
return needRestart0 || needRestart1 || needRestart2 || needRestart3 || needRestart4, disabledClientsCount > 0, nil
}
func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
@ -2489,7 +2535,7 @@ func (s *InboundService) GetSubTrafficInfo(subId string) (*SubTrafficInfo, error
}
seen[key] = struct{}{}
emails = append(emails, email)
if subTotal == 0 && r.SubTotalGB > 0 {
if r.SubTotalGB > subTotal {
subTotal = r.SubTotalGB
}
}
@ -2546,7 +2592,7 @@ func (s *InboundService) disableSubQuotaClients(tx *gorm.DB) (bool, int64, error
return false, 0, err
}
// 2. Group by subId: collect emails and find the first non-zero subTotalGB.
// 2. Group by subId: collect emails and use MAX subTotalGB across siblings.
type subGroup struct {
emails map[string]struct{}
subTotalGB int64
@ -2568,7 +2614,7 @@ func (s *InboundService) disableSubQuotaClients(tx *gorm.DB) (bool, int64, error
groups[subId] = g
}
g.emails[strings.ToLower(email)] = struct{}{}
if g.subTotalGB == 0 && r.SubTotalGB > 0 {
if r.SubTotalGB > g.subTotalGB {
g.subTotalGB = r.SubTotalGB
}
g.members = append(g.members, struct{ InboundId int; Tag, Email string }{
@ -2722,6 +2768,284 @@ func (s *InboundService) disableSubQuotaClients(tx *gorm.DB) (bool, int64, error
return needRestart, totalDisabled, nil
}
// syncSubTotalGB propagates a changed subTotalGB value to ALL clients sharing
// the same SubID across ALL inbounds. This is called when the admin explicitly
// sets or changes the quota on any single client.
func (s *InboundService) syncSubTotalGB(tx *gorm.DB, subId string, newValue int64) error {
if strings.TrimSpace(subId) == "" || newValue < 0 {
return nil
}
// Find all inbounds that have clients with this subId.
var inbounds []*model.Inbound
if err := tx.Raw(`
SELECT DISTINCT inbounds.*
FROM inbounds,
JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
WHERE REPLACE(JSON_EXTRACT(client.value, '$.subId'), '"', '') = ?
`, subId).Scan(&inbounds).Error; err != nil {
return err
}
now := time.Now().Unix() * 1000
for _, inbound := range inbounds {
settings := map[string]any{}
if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
continue
}
clientsRaw, ok := settings["clients"].([]any)
if !ok {
continue
}
changed := false
for i, ic := range clientsRaw {
cm, ok := ic.(map[string]any)
if !ok {
continue
}
cSubId, _ := cm["subId"].(string)
if cSubId != subId {
continue
}
oldVal, _ := cm["subTotalGB"].(float64)
if int64(oldVal) == newValue {
continue
}
cm["subTotalGB"] = newValue
cm["updated_at"] = now
clientsRaw[i] = cm
changed = true
}
if !changed {
continue
}
settings["clients"] = clientsRaw
bs, err := json.MarshalIndent(settings, "", " ")
if err != nil {
continue
}
inbound.Settings = string(bs)
if err := tx.Save(inbound).Error; err != nil {
logger.Warning("syncSubTotalGB save inbound:", err)
}
}
return nil
}
// reEnableSubQuotaClients checks all SubID groups for clients that were
// disabled by sub-quota enforcement but whose aggregate traffic is now
// below the (possibly increased) quota. Re-enables them in both
// client_traffics and inbound settings JSON.
func (s *InboundService) reEnableSubQuotaClients(tx *gorm.DB) (bool, int64, error) {
// 1. Same scan as disableSubQuotaClients.
var allRows []struct {
InboundId int
Tag string
Email string
SubID string `gorm:"column:sub_id"`
SubTotalGB int64 `gorm:"column:sub_total_gb"`
Enable bool `gorm:"column:enable"`
}
err := tx.Raw(`
SELECT inbounds.id AS inbound_id,
inbounds.tag AS tag,
REPLACE(JSON_EXTRACT(client.value, '$.email'), '"', '') AS email,
REPLACE(JSON_EXTRACT(client.value, '$.subId'), '"', '') AS sub_id,
COALESCE(JSON_EXTRACT(client.value, '$.subTotalGB'), 0) AS sub_total_gb,
JSON_EXTRACT(client.value, '$.enable') AS enable
FROM inbounds,
JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
WHERE inbounds.node_id IS NULL
`).Scan(&allRows).Error
if err != nil {
return false, 0, err
}
// 2. Group by subId.
type memberInfo struct {
InboundId int
Tag string
Email string
Enable bool
}
type subGroup struct {
emails map[string]struct{}
subTotalGB int64
members []memberInfo
hasDisabled bool
}
groups := make(map[string]*subGroup)
for _, r := range allRows {
subId := strings.TrimSpace(r.SubID)
if subId == "" {
continue
}
email := strings.TrimSpace(r.Email)
if email == "" {
continue
}
g, ok := groups[subId]
if !ok {
g = &subGroup{emails: make(map[string]struct{})}
groups[subId] = g
}
g.emails[strings.ToLower(email)] = struct{}{}
if r.SubTotalGB > g.subTotalGB {
g.subTotalGB = r.SubTotalGB
}
g.members = append(g.members, memberInfo{
InboundId: r.InboundId, Tag: r.Tag, Email: email, Enable: r.Enable,
})
if !r.Enable {
g.hasDisabled = true
}
}
// 3. For each group: if it has a quota, has disabled members, and
// aggregate traffic is under the quota → re-enable.
type enableTarget struct {
InboundId int
Tag string
Email string
}
var toEnable []enableTarget
for _, g := range groups {
if g.subTotalGB <= 0 || !g.hasDisabled {
continue
}
emails := make([]string, 0, len(g.emails))
for e := range g.emails {
emails = append(emails, e)
}
var totalUsed int64
for _, batch := range chunkStrings(emails, sqliteMaxVars) {
var sum struct{ Total int64 }
if err := tx.Model(xray.ClientTraffic{}).
Select("COALESCE(SUM(up + down), 0) AS total").
Where("LOWER(email) IN ?", batch).
Scan(&sum).Error; err != nil {
continue
}
totalUsed += sum.Total
}
if totalUsed >= g.subTotalGB {
continue // still over quota
}
// Under quota — re-enable disabled members.
for _, m := range g.members {
if !m.Enable {
toEnable = append(toEnable, enableTarget{
InboundId: m.InboundId, Tag: m.Tag, Email: m.Email,
})
}
}
}
if len(toEnable) == 0 {
return false, 0, nil
}
// 4. Re-enable client_traffics rows (only those not individually over
// their own totalGB and not expired — leave those disabled).
enableEmails := make([]string, 0, len(toEnable))
for _, t := range toEnable {
enableEmails = append(enableEmails, t.Email)
}
uniqEnable := uniqueNonEmptyStrings(enableEmails)
var totalEnabled int64
for _, batch := range chunkStrings(uniqEnable, sqliteMaxVars) {
result := tx.Model(xray.ClientTraffic{}).
Where("LOWER(email) IN ? AND enable = ?", batch, false).
// Only re-enable if the client isn't individually over its own totalGB
// and hasn't expired.
Where("(total = 0 OR up + down < total)").
Where("(expiry_time = 0 OR expiry_time < 0 OR expiry_time > ?)", time.Now().UnixMilli()).
Update("enable", true)
if result.Error != nil {
logger.Warning("reEnableSubQuotaClients update client_traffics:", result.Error)
}
totalEnabled += result.RowsAffected
}
// 5. Update inbound settings JSON to set enable=true.
inboundEmailMap := make(map[int]map[string]struct{})
for _, t := range toEnable {
if inboundEmailMap[t.InboundId] == nil {
inboundEmailMap[t.InboundId] = make(map[string]struct{})
}
inboundEmailMap[t.InboundId][strings.ToLower(t.Email)] = struct{}{}
}
inboundIds := make([]int, 0, len(inboundEmailMap))
for id := range inboundEmailMap {
inboundIds = append(inboundIds, id)
}
var inbounds []*model.Inbound
if err = tx.Model(model.Inbound{}).Where("id IN ?", inboundIds).Find(&inbounds).Error; err != nil {
logger.Warning("reEnableSubQuotaClients fetch inbounds:", err)
return false, totalEnabled, nil
}
now := time.Now().Unix() * 1000
dirty := make([]*model.Inbound, 0)
for _, inbound := range inbounds {
settings := map[string]any{}
if jsonErr := json.Unmarshal([]byte(inbound.Settings), &settings); jsonErr != nil {
continue
}
clientsRaw, ok := settings["clients"].([]any)
if !ok {
continue
}
emailSet := inboundEmailMap[inbound.Id]
changed := false
for i := range clientsRaw {
c, ok := clientsRaw[i].(map[string]any)
if !ok {
continue
}
email, _ := c["email"].(string)
if _, shouldEnable := emailSet[strings.ToLower(email)]; !shouldEnable {
continue
}
// Don't re-enable in settings if the client_traffics row wasn't
// re-enabled (due to individual quota or expiry).
var ct xray.ClientTraffic
if err := tx.Where("email = ?", email).First(&ct).Error; err == nil && !ct.Enable {
continue
}
c["enable"] = true
c["updated_at"] = now
clientsRaw[i] = c
changed = true
}
if !changed {
continue
}
settings["clients"] = clientsRaw
modifiedSettings, jsonErr := json.MarshalIndent(settings, "", " ")
if jsonErr != nil {
continue
}
inbound.Settings = string(modifiedSettings)
dirty = append(dirty, inbound)
}
if len(dirty) > 0 {
if err = tx.Save(dirty).Error; err != nil {
logger.Warning("reEnableSubQuotaClients update inbound settings:", err)
}
}
if totalEnabled > 0 {
logger.Infof("Sub-quota: re-enabled %d clients after quota increase", totalEnabled)
}
return totalEnabled > 0, totalEnabled, nil
}
func (s *InboundService) GetInboundTags() (string, error) {
db := database.GetDB()
var inboundTags []string