refactor(node): restart remote xray after tx commit, not inside it

Move the remote RestartXray calls out of the addTraffic write
transaction. disableInvalidClients now returns the affected remote
node IDs instead of restarting their xray while the SQLite write lock
is held; AddTraffic performs the restart after the transaction commits
via restartRemoteNodesOnDisable. Avoids holding the serialized write
lock across slow per-node restart RPCs.
This commit is contained in:
MHSanaei 2026-06-05 00:02:19 +02:00
parent 27829c8414
commit 5df97c9313
No known key found for this signature in database
GPG key ID: 7E4060F2FBE5AB7A

View file

@ -1789,15 +1789,19 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
} }
func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) { func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
var disabledNodeIDs []int
err = submitTrafficWrite(func() error { err = submitTrafficWrite(func() error {
var inner error var inner error
needRestart, clientsDisabled, inner = s.addTrafficLocked(inboundTraffics, clientTraffics) needRestart, clientsDisabled, disabledNodeIDs, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
return inner return inner
}) })
if err == nil && len(disabledNodeIDs) > 0 {
s.restartRemoteNodesOnDisable(disabledNodeIDs)
}
return return
} }
func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) { func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, []int, error) {
var err error var err error
db := database.GetDB() db := database.GetDB()
tx := db.Begin() tx := db.Begin()
@ -1811,11 +1815,11 @@ func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clien
}() }()
err = s.addInboundTraffic(tx, inboundTraffics) err = s.addInboundTraffic(tx, inboundTraffics)
if err != nil { if err != nil {
return false, false, err return false, false, nil, err
} }
err = s.addClientTraffic(tx, clientTraffics) err = s.addClientTraffic(tx, clientTraffics)
if err != nil { if err != nil {
return false, false, err return false, false, nil, err
} }
needRestart0, count, err := s.autoRenewClients(tx) needRestart0, count, err := s.autoRenewClients(tx)
@ -1826,7 +1830,7 @@ func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clien
} }
disabledClientsCount := int64(0) disabledClientsCount := int64(0)
needRestart1, count, err := s.disableInvalidClients(tx) needRestart1, count, disabledNodeIDs, err := s.disableInvalidClients(tx)
if err != nil { if err != nil {
logger.Warning("Error in disabling invalid clients:", err) logger.Warning("Error in disabling invalid clients:", err)
} else if count > 0 { } else if count > 0 {
@ -1840,7 +1844,7 @@ func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clien
} else if count > 0 { } else if count > 0 {
logger.Debugf("%v inbounds disabled", count) logger.Debugf("%v inbounds disabled", count)
} }
return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, nil return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, disabledNodeIDs, nil
} }
func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error { func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
@ -2196,7 +2200,7 @@ func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error
return needRestart, count, err return needRestart, count, err
} }
func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) { func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int, error) {
now := time.Now().Unix() * 1000 now := time.Now().Unix() * 1000
needRestart := false needRestart := false
@ -2205,10 +2209,10 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true). Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true).
Find(&depletedRows).Error Find(&depletedRows).Error
if err != nil { if err != nil {
return false, 0, err return false, 0, nil, err
} }
if len(depletedRows) == 0 { if len(depletedRows) == 0 {
return false, 0, nil return false, 0, nil, nil
} }
depletedEmails := make([]string, 0, len(depletedRows)) depletedEmails := make([]string, 0, len(depletedRows))
@ -2236,7 +2240,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
WHERE clients.email IN ? WHERE clients.email IN ?
`, depletedEmails).Scan(&targets).Error `, depletedEmails).Scan(&targets).Error
if err != nil { if err != nil {
return false, 0, err return false, 0, nil, err
} }
} }
@ -2283,7 +2287,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
err = result.Error err = result.Error
count := result.RowsAffected count := result.RowsAffected
if err != nil { if err != nil {
return needRestart, count, err return needRestart, count, nil, err
} }
if len(depletedEmails) > 0 { if len(depletedEmails) > 0 {
@ -2294,8 +2298,6 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
} }
} }
// Track which remote nodes had clients successfully disabled so we can
// restart their xray and kill the existing active connections.
disabledNodeIDs := make(map[int]struct{}) disabledNodeIDs := make(map[int]struct{})
for inboundID, group := range remoteByInbound { for inboundID, group := range remoteByInbound {
emails := make(map[string]struct{}, len(group)) emails := make(map[string]struct{}, len(group))
@ -2314,12 +2316,24 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
} }
} }
// Restart xray on each affected remote node so active connections are nodeIDs := make([]int, 0, len(disabledNodeIDs))
// dropped immediately, not just blocked on the next reconnect attempt.
if len(disabledNodeIDs) > 0 {
restartOnDisable, _ := (&SettingService{}).GetRestartXrayOnClientDisable()
if restartOnDisable {
for nodeID := range disabledNodeIDs { for nodeID := range disabledNodeIDs {
nodeIDs = append(nodeIDs, nodeID)
}
return needRestart, count, nodeIDs, nil
}
func (s *InboundService) restartRemoteNodesOnDisable(nodeIDs []int) {
restartOnDisable, err := (&SettingService{}).GetRestartXrayOnClientDisable()
if err != nil {
logger.Warning("disableInvalidClients: get RestartXrayOnClientDisable failed:", err)
return
}
if !restartOnDisable {
return
}
for _, nodeID := range nodeIDs {
nodeIDCopy := nodeID nodeIDCopy := nodeID
rt, rtErr := runtime.GetManager().RuntimeFor(&nodeIDCopy) rt, rtErr := runtime.GetManager().RuntimeFor(&nodeIDCopy)
if rtErr != nil { if rtErr != nil {
@ -2330,10 +2344,6 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
logger.Warning("disableInvalidClients: restart xray on node", nodeID, "failed:", rtErr) logger.Warning("disableInvalidClients: restart xray on node", nodeID, "failed:", rtErr)
} }
} }
}
}
return needRestart, count, nil
} }
// markClientsDisabledInSettings flips client.enable=false in the inbound's // markClientsDisabledInSettings flips client.enable=false in the inbound's