diff --git a/.gitignore b/.gitignore index 0621bf9f..8b43b917 100644 --- a/.gitignore +++ b/.gitignore @@ -41,6 +41,7 @@ docker-compose.override.yml # Ignore local docs directory /docs/ +/docs/* # Ignore local git worktrees /.worktrees/ diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go index ed6aab42..7811a4ca 100644 --- a/web/job/xray_traffic_job.go +++ b/web/job/xray_traffic_job.go @@ -44,6 +44,15 @@ func (j *XrayTrafficJob) Run() { if err := j.trafficFlushSvc.Collect(traffics, clientTraffics); err != nil { logger.Warning("collect shared traffic failed:", err) } + // In shared mode, addClientTraffic is bypassed so p.SetOnlineClients + // is never called. Compute and set online clients here instead. + online := make([]string, 0, len(clientTraffics)) + for _, ct := range clientTraffics { + if ct != nil && ct.Up+ct.Down > 0 { + online = append(online, ct.Email) + } + } + j.inboundService.SetOnlineClients(online) } else { err, needRestart0 = j.inboundService.AddTraffic(traffics, clientTraffics) if err != nil { diff --git a/web/service/inbound.go b/web/service/inbound.go index da8f3561..f1e4fdb5 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -2642,6 +2642,14 @@ func (s *InboundService) GetOnlineClients() []string { return p.GetOnlineClients() } +// SetOnlineClients updates the in-memory online clients list used by websocket broadcasts. +// This is needed in shared mode where addClientTraffic is bypassed. +func (s *InboundService) SetOnlineClients(clients []string) { + if p != nil { + p.SetOnlineClients(clients) + } +} + func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) { db := database.GetDB() var rows []xray.ClientTraffic diff --git a/web/service/traffic_flush.go b/web/service/traffic_flush.go index 8509717f..bdf29f79 100644 --- a/web/service/traffic_flush.go +++ b/web/service/traffic_flush.go @@ -32,6 +32,30 @@ func NewTrafficFlushService(store *TrafficPendingStore) *TrafficFlushService { } func (s *TrafficFlushService) Collect(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) error { + // Resolve email → InboundId mapping from database, since Xray API only + // returns email for client traffic without the InboundId. + emailToInboundID := map[string]int{} + if len(clientTraffics) > 0 { + emails := make([]string, 0, len(clientTraffics)) + for _, ct := range clientTraffics { + if ct != nil && ct.Email != "" { + emails = append(emails, ct.Email) + } + } + if len(emails) > 0 { + var rows []xray.ClientTraffic + if err := database.GetDB().Model(&xray.ClientTraffic{}). + Select("inbound_id, email"). + Where("email IN (?)", emails). + Find(&rows).Error; err != nil { + logger.Warning("resolve email to inbound_id failed:", err) + } + for _, r := range rows { + emailToInboundID[r.Email] = r.InboundId + } + } + } + deltas := make([]TrafficDelta, 0, len(clientTraffics)+len(inboundTraffics)) clientTotals := map[int]TrafficDelta{} @@ -39,18 +63,23 @@ func (s *TrafficFlushService) Collect(inboundTraffics []*xray.Traffic, clientTra if traffic == nil || (traffic.Up == 0 && traffic.Down == 0) { continue } + resolvedID := emailToInboundID[traffic.Email] + if resolvedID == 0 { + logger.Warningf("skip client traffic for unknown email %q (no inbound_id in DB)", traffic.Email) + continue + } delta := TrafficDelta{ Kind: TrafficDeltaKindClient, - InboundID: traffic.InboundId, + InboundID: resolvedID, Email: traffic.Email, UpDelta: traffic.Up, DownDelta: traffic.Down, } deltas = append(deltas, delta) - total := clientTotals[traffic.InboundId] + total := clientTotals[resolvedID] total.UpDelta += traffic.Up total.DownDelta += traffic.Down - clientTotals[traffic.InboundId] = total + clientTotals[resolvedID] = total } for _, traffic := range inboundTraffics { diff --git a/web/service/traffic_flush_test.go b/web/service/traffic_flush_test.go index 9f1f0c2b..7dbb9121 100644 --- a/web/service/traffic_flush_test.go +++ b/web/service/traffic_flush_test.go @@ -40,13 +40,18 @@ func TestCollectPersistsInboundOnlyDeltaFromDifference(t *testing.T) { if err := database.GetDB().Create(&model.Inbound{Id: 1, Tag: "inbound-443", Enable: true}).Error; err != nil { t.Fatalf("seed inbound failed: %v", err) } + // Seed client_traffics so Collect can resolve email → InboundId + if err := database.GetDB().Create(&xray.ClientTraffic{InboundId: 1, Email: "alice@example.com", Enable: true}).Error; err != nil { + t.Fatalf("seed client traffic failed: %v", err) + } store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json")) svc := NewTrafficFlushService(store) + // Xray API returns InboundId=0; Collect resolves it from DB err := svc.Collect( []*xray.Traffic{{Tag: "inbound-443", IsInbound: true, Up: 100, Down: 50}}, - []*xray.ClientTraffic{{InboundId: 1, Email: "alice@example.com", Up: 70, Down: 20}}, + []*xray.ClientTraffic{{InboundId: 0, Email: "alice@example.com", Up: 70, Down: 20}}, ) if err != nil { t.Fatalf("Collect error: %v", err) @@ -176,13 +181,18 @@ func TestCollectClampsNegativeResidualAndLogsDetailedWarning(t *testing.T) { if err := database.GetDB().Create(&model.Inbound{Id: 1, Tag: "inbound-443", Enable: true}).Error; err != nil { t.Fatalf("seed inbound failed: %v", err) } + // Seed client_traffics so Collect can resolve email → InboundId + if err := database.GetDB().Create(&xray.ClientTraffic{InboundId: 1, Email: "alice@example.com", Enable: true}).Error; err != nil { + t.Fatalf("seed client traffic failed: %v", err) + } store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json")) svc := NewTrafficFlushService(store) + // Xray API returns InboundId=0; Collect resolves it from DB err := svc.Collect( []*xray.Traffic{{Tag: "inbound-443", IsInbound: true, Up: 10, Down: 5}}, - []*xray.ClientTraffic{{InboundId: 1, Email: "alice@example.com", Up: 12, Down: 7}}, + []*xray.ClientTraffic{{InboundId: 0, Email: "alice@example.com", Up: 12, Down: 7}}, ) if err != nil { t.Fatalf("Collect error: %v", err) @@ -267,3 +277,96 @@ func TestFlushOnceMarksRestartWhenReconciliationRequiresIt(t *testing.T) { t.Fatal("expected flush to mark restart when reconciliation requires it") } } + +func TestCollectResolvesInboundIdFromDB(t *testing.T) { + setupTestDB(t) + if err := database.GetDB().Create(&model.Inbound{Id: 5, Tag: "inbound-8443", Enable: true}).Error; err != nil { + t.Fatalf("seed inbound failed: %v", err) + } + if err := database.GetDB().Create(&xray.ClientTraffic{InboundId: 5, Email: "bob@example.com", Enable: true}).Error; err != nil { + t.Fatalf("seed client traffic failed: %v", err) + } + + store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json")) + svc := NewTrafficFlushService(store) + + // Simulate Xray API: InboundId is always 0, only email is set + err := svc.Collect( + []*xray.Traffic{{Tag: "inbound-8443", IsInbound: true, Up: 200, Down: 100}}, + []*xray.ClientTraffic{{InboundId: 0, Email: "bob@example.com", Up: 150, Down: 80}}, + ) + if err != nil { + t.Fatalf("Collect error: %v", err) + } + + deltas, err := store.Load() + if err != nil { + t.Fatalf("Load error: %v", err) + } + + var clientDelta *TrafficDelta + var inboundOnlyDelta *TrafficDelta + for i := range deltas { + switch deltas[i].Kind { + case TrafficDeltaKindClient: + clientDelta = &deltas[i] + case TrafficDeltaKindInboundOnly: + inboundOnlyDelta = &deltas[i] + } + } + + if clientDelta == nil { + t.Fatal("expected client delta") + } + // InboundId must be resolved to 5 from DB, not 0 from Xray API + if clientDelta.InboundID != 5 { + t.Fatalf("expected InboundID=5, got %d", clientDelta.InboundID) + } + if clientDelta.Email != "bob@example.com" || clientDelta.UpDelta != 150 || clientDelta.DownDelta != 80 { + t.Fatalf("unexpected client delta: %+v", *clientDelta) + } + + // Residual: 200-150=50 up, 100-80=20 down + if inboundOnlyDelta == nil { + t.Fatal("expected inbound-only delta") + } + if inboundOnlyDelta.InboundID != 5 || inboundOnlyDelta.UpDelta != 50 || inboundOnlyDelta.DownDelta != 20 { + t.Fatalf("unexpected inbound-only delta: %+v", *inboundOnlyDelta) + } +} + +func TestCollectSkipsUnknownEmail(t *testing.T) { + setupTestDB(t) + if err := database.GetDB().Create(&model.Inbound{Id: 1, Tag: "inbound-443", Enable: true}).Error; err != nil { + t.Fatalf("seed inbound failed: %v", err) + } + // No client_traffic seeded → email is unknown + + store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json")) + svc := NewTrafficFlushService(store) + + err := svc.Collect( + []*xray.Traffic{{Tag: "inbound-443", IsInbound: true, Up: 100, Down: 50}}, + []*xray.ClientTraffic{{InboundId: 0, Email: "unknown@example.com", Up: 30, Down: 10}}, + ) + if err != nil { + t.Fatalf("Collect error: %v", err) + } + + deltas, err := store.Load() + if err != nil { + t.Fatalf("Load error: %v", err) + } + + // Unknown email should be skipped; only inbound-only residual remains + if len(deltas) != 1 { + t.Fatalf("expected 1 delta (inbound-only), got %d: %+v", len(deltas), deltas) + } + if deltas[0].Kind != TrafficDeltaKindInboundOnly { + t.Fatalf("expected inbound-only delta, got %+v", deltas[0]) + } + // Full inbound traffic becomes residual since no client traffic matched + if deltas[0].UpDelta != 100 || deltas[0].DownDelta != 50 { + t.Fatalf("unexpected residual: %+v", deltas[0]) + } +} diff --git a/x-ui.sh b/x-ui.sh index 7ac63c60..fb5a056a 100644 --- a/x-ui.sh +++ b/x-ui.sh @@ -2696,7 +2696,6 @@ mariadb_server_override_path() { mariadb_server_config_candidates() { local override_path override_path=$(mariadb_server_override_path) - echo "$override_path" local path="" for path in \ @@ -2712,6 +2711,8 @@ mariadb_server_config_candidates() { echo "$path" fi done + + echo "$override_path" } ensure_mariadb_override_file() { @@ -3050,26 +3051,38 @@ test_mariadb_server_connection() { local host="$1" port="$2" user="$3" pass="$4" local bin local -a cmd + local err_output bin=$(mariadb_cli_bin) || return 1 cmd=("$bin" -h "$host" -P "$port" -u "$user") if [[ -n "$pass" ]]; then cmd+=("-p$pass") fi cmd+=(-e "SELECT 1;") - "${cmd[@]}" >/dev/null 2>&1 + err_output=$("${cmd[@]}" 2>&1) + local rc=$? + if [[ $rc -ne 0 ]]; then + echo -e "${red}MariaDB 连接失败: ${err_output}${plain}" >&2 + return 1 + fi } test_mariadb_database_connection() { local host="$1" port="$2" dbname="$3" user="$4" pass="$5" local bin local -a cmd + local err_output bin=$(mariadb_cli_bin) || return 1 cmd=("$bin" -h "$host" -P "$port" -u "$user" -D "$dbname") if [[ -n "$pass" ]]; then cmd+=("-p$pass") fi cmd+=(-e "SELECT 1;") - "${cmd[@]}" >/dev/null 2>&1 + err_output=$("${cmd[@]}" 2>&1) + local rc=$? + if [[ $rc -ne 0 ]]; then + echo -e "${red}MariaDB 连接失败: ${err_output}${plain}" >&2 + return 1 + fi } is_safe_mariadb_identifier() {