From bef6b45848ea5a07148e4c108b6b169f31d5531b Mon Sep 17 00:00:00 2001 From: WatchDogsDev <29868640+WatchDogsDev@users.noreply.github.com> Date: Sun, 22 Mar 2026 06:20:10 +0330 Subject: [PATCH] fix: prevent multi-GB traffic overages after bandwidth limit is reached Three layered fixes targeting the distinct causes of overage: Bucket C (catastrophic): flush pending Xray stats to DB before every scheduled Xray restart so in-memory counters are never silently zeroed. - web/service/xray.go: add FlushTrafficToDB() - web/web.go: call FlushTrafficToDB() in the 30 s restart cron before RestartXray(false) Bucket A (in-flight gap): drain per-user Xray stats counters immediately after RemoveUser() succeeds, capturing bytes accumulated since the last bulk GetTraffic(reset=true) cycle. - xray/api.go: add DrainUserTraffic(email) using GetStats gRPC with reset - web/service/inbound.go: call DrainUserTraffic and persist delta in disableInvalidClients() Bucket B (active TCP connections survive removal): insert iptables DROP rules for each known client IP on the inbound port so established connections are killed immediately, not just new ones. - util/iptables/iptables.go: new package managing the 3X-UI-BLOCK chain (EnsureChain, FlushChain, BlockIP, UnblockIP, ListRules); gracefully degrades when iptables is unavailable - web/job/unblock_ips_job.go: @every 5m cleanup job removes rules older than maxBlockAgeSecs - web/service/inbound.go: blockClientIPs() called after successful RemoveUser(); unblockClientIPs() called after successful AddUser() in autoRenewClients() so renewed clients can reconnect - web/web.go: EnsureChain + FlushChain on startup; register unblock job --- util/iptables/iptables.go | 192 +++++++++++++++++++++++++++++++++++++ web/job/unblock_ips_job.go | 40 ++++++++ web/service/inbound.go | 101 +++++++++++++++++-- web/service/xray.go | 13 +++ web/web.go | 15 +++ xray/api.go | 22 +++++ 6 files changed, 377 insertions(+), 6 deletions(-) create mode 100644 util/iptables/iptables.go create mode 100644 web/job/unblock_ips_job.go diff --git a/util/iptables/iptables.go b/util/iptables/iptables.go new file mode 100644 index 00000000..b9a2038c --- /dev/null +++ b/util/iptables/iptables.go @@ -0,0 +1,192 @@ +// Package iptables manages a dedicated iptables chain (3X-UI-BLOCK) used to +// drop traffic from clients that have exceeded their bandwidth or time limits. +// All rules are inserted into the custom chain so they are isolated from +// OS/admin firewall rules and can be enumerated or flushed independently. +package iptables + +import ( + "fmt" + "os/exec" + "strconv" + "strings" + "time" + + "github.com/mhsanaei/3x-ui/v2/logger" +) + +const chain = "3X-UI-BLOCK" + +// EnsureChain creates the custom chain if it does not exist and adds a jump +// rule from INPUT so the chain is evaluated for every incoming packet. +func EnsureChain() error { + // Create chain — ignore "already exists" error + out, err := run("iptables", "-N", chain) + if err != nil && !strings.Contains(out+err.Error(), "already exists") { + return fmt.Errorf("iptables -N %s: %w (%s)", chain, err, out) + } + + // Idempotent: only add the jump rule if it is not already present + _, checkErr := run("iptables", "-C", "INPUT", "-j", chain) + if checkErr != nil { + _, err = run("iptables", "-I", "INPUT", "-j", chain) + if err != nil { + return fmt.Errorf("iptables -I INPUT -j %s: %w", chain, err) + } + } + return nil +} + +// FlushChain removes all rules from the custom chain. Used on startup to +// clear any stale rules left over from a previous crash. +func FlushChain() error { + _, err := run("iptables", "-F", chain) + if err != nil { + return fmt.Errorf("iptables -F %s: %w", chain, err) + } + return nil +} + +// BlockIP inserts a DROP rule for the given source IP on the given TCP destination +// port into the custom chain. The comment embeds the current Unix timestamp so +// the rule can be age-checked later. +func BlockIP(ip string, port int) error { + comment := fmt.Sprintf("3xui:block:%d", time.Now().Unix()) + _, err := run("iptables", "-I", chain, + "-s", ip, + "-p", "tcp", "--dport", strconv.Itoa(port), + "-m", "comment", "--comment", comment, + "-j", "DROP") + if err != nil { + return fmt.Errorf("iptables BlockIP %s:%d: %w", ip, port, err) + } + return nil +} + +// UnblockIP removes the DROP rule for the given source IP and TCP destination port. +func UnblockIP(ip string, port int) error { + _, err := run("iptables", "-D", chain, + "-s", ip, + "-p", "tcp", "--dport", strconv.Itoa(port), + "-m", "comment", "--comment", findComment(ip, port), + "-j", "DROP") + if err != nil { + // Fall back: delete without the comment (handles rules added without matching comment) + _, err = run("iptables", "-D", chain, + "-s", ip, + "-p", "tcp", "--dport", strconv.Itoa(port), + "-j", "DROP") + if err != nil { + return fmt.Errorf("iptables UnblockIP %s:%d: %w", ip, port, err) + } + } + return nil +} + +// RuleEntry represents a parsed rule from the custom chain. +type RuleEntry struct { + IP string + Port int + InsertedAt int64 // Unix timestamp from the comment, 0 if not present +} + +// ListRules parses all rules in the custom chain and returns structured entries. +func ListRules() ([]RuleEntry, error) { + out, err := runOutput("iptables", "-S", chain) + if err != nil { + return nil, fmt.Errorf("iptables -S %s: %w", chain, err) + } + var rules []RuleEntry + for _, line := range strings.Split(out, "\n") { + entry, ok := parseLine(line) + if ok { + rules = append(rules, entry) + } + } + return rules, nil +} + +// parseLine extracts IP, port, and timestamp from a single `-S` output line. +// Example line: +// +// -A 3X-UI-BLOCK -s 1.2.3.4/32 -p tcp -m tcp --dport 443 -m comment --comment 3xui:block:1700000000 -j DROP +func parseLine(line string) (RuleEntry, bool) { + if !strings.Contains(line, "-j DROP") { + return RuleEntry{}, false + } + var entry RuleEntry + + parts := strings.Fields(line) + for i, p := range parts { + switch p { + case "-s": + if i+1 < len(parts) { + entry.IP = strings.TrimSuffix(parts[i+1], "/32") + } + case "--dport": + if i+1 < len(parts) { + if v, err := strconv.Atoi(parts[i+1]); err == nil { + entry.Port = v + } + } + case "--comment": + if i+1 < len(parts) { + comment := parts[i+1] + // format: 3xui:block: + if strings.HasPrefix(comment, "3xui:block:") { + ts, err := strconv.ParseInt(strings.TrimPrefix(comment, "3xui:block:"), 10, 64) + if err == nil { + entry.InsertedAt = ts + } + } + } + } + } + if entry.IP == "" || entry.Port == 0 { + return RuleEntry{}, false + } + return entry, true +} + +// findComment retrieves the --comment value for an existing rule matching ip:port. +// Returns an empty string if not found (caller will delete without comment). +func findComment(ip string, port int) string { + out, err := runOutput("iptables", "-S", chain) + if err != nil { + return "" + } + needle := fmt.Sprintf("-s %s", ip) + dport := fmt.Sprintf("--dport %d", port) + for _, line := range strings.Split(out, "\n") { + if strings.Contains(line, needle) && strings.Contains(line, dport) { + parts := strings.Fields(line) + for i, p := range parts { + if p == "--comment" && i+1 < len(parts) { + return parts[i+1] + } + } + } + } + return "" +} + +// run executes an iptables command and returns combined output and error. +func run(name string, args ...string) (string, error) { + path, err := exec.LookPath(name) + if err != nil { + logger.Warning("iptables not found in PATH:", err) + return "", err + } + cmd := exec.Command(path, args...) + out, err := cmd.CombinedOutput() + return string(out), err +} + +// runOutput executes a command and returns stdout output. +func runOutput(name string, args ...string) (string, error) { + path, err := exec.LookPath(name) + if err != nil { + return "", err + } + out, err := exec.Command(path, args...).Output() + return string(out), err +} diff --git a/web/job/unblock_ips_job.go b/web/job/unblock_ips_job.go new file mode 100644 index 00000000..61c46327 --- /dev/null +++ b/web/job/unblock_ips_job.go @@ -0,0 +1,40 @@ +package job + +import ( + "time" + + "github.com/mhsanaei/3x-ui/v2/logger" + "github.com/mhsanaei/3x-ui/v2/util/iptables" +) + +const maxBlockAgeSecs int64 = 600 // 10 minutes + +// UnblockIPsJob removes expired iptables DROP rules from the 3X-UI-BLOCK chain. +// Rules older than maxBlockAgeSecs are removed to prevent the firewall table +// from growing unbounded and to unblock IPs that may have been re-assigned. +type UnblockIPsJob struct{} + +// NewUnblockIPsJob creates a new instance of the IP unblock cleanup job. +func NewUnblockIPsJob() *UnblockIPsJob { + return &UnblockIPsJob{} +} + +// Run enumerates all rules in the 3X-UI-BLOCK chain and removes any that are +// older than maxBlockAgeSecs. +func (j *UnblockIPsJob) Run() { + rules, err := iptables.ListRules() + if err != nil { + logger.Debug("UnblockIPsJob: failed to list iptables rules:", err) + return + } + now := time.Now().Unix() + for _, rule := range rules { + if rule.InsertedAt > 0 && (now-rule.InsertedAt) > maxBlockAgeSecs { + if err := iptables.UnblockIP(rule.IP, rule.Port); err != nil { + logger.Warning("UnblockIPsJob: failed to unblock", rule.IP, rule.Port, err) + } else { + logger.Debug("UnblockIPsJob: unblocked expired rule", rule.IP, rule.Port) + } + } + } +} diff --git a/web/service/inbound.go b/web/service/inbound.go index 8a3a4ae2..3bf81cde 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -14,6 +14,7 @@ import ( "github.com/mhsanaei/3x-ui/v2/database/model" "github.com/mhsanaei/3x-ui/v2/logger" "github.com/mhsanaei/3x-ui/v2/util/common" + "github.com/mhsanaei/3x-ui/v2/util/iptables" "github.com/mhsanaei/3x-ui/v2/xray" "gorm.io/gorm" @@ -1212,6 +1213,12 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) { err1 = s.xrayApi.AddUser(clientToAdd.protocol, clientToAdd.tag, clientToAdd.client) if err1 != nil { needRestart = true + } else { + // Client was re-added to Xray; remove any iptables block rules for it + email, _ := clientToAdd.client["email"].(string) + if email != "" { + s.unblockClientIPs(email) + } } } s.xrayApi.Close() @@ -1276,16 +1283,23 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) err1 := s.xrayApi.RemoveUser(result.Tag, result.Email) if err1 == nil { logger.Debug("Client disabled by api:", result.Email) + // Drain any traffic accumulated since the last GetTraffic(reset=true) call + up, down, derr := s.xrayApi.DrainUserTraffic(result.Email) + if derr == nil && (up > 0 || down > 0) { + tx.Model(xray.ClientTraffic{}).Where("email = ?", result.Email). + Updates(map[string]any{ + "up": gorm.Expr("up + ?", up), + "down": gorm.Expr("down + ?", down), + }) + } + // Block active TCP connections for this client + s.blockClientIPs(result.Email) } else { if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", result.Email)) { logger.Debug("User is already disabled. Nothing to do more...") } else { - if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", result.Email)) { - logger.Debug("User is already disabled. Nothing to do more...") - } else { - logger.Debug("Error in disabling client by api:", err1) - needRestart = true - } + logger.Debug("Error in disabling client by api:", err1) + needRestart = true } } } @@ -2507,3 +2521,78 @@ func (s *InboundService) DelInboundClientByEmail(inboundId int, email string) (b return needRestart, db.Save(oldInbound).Error } + +// blockClientIPs inserts iptables DROP rules for all known IPs of the given client. +// Failures are logged as warnings so a missing iptables binary does not break the +// normal disable flow. +func (s *InboundService) blockClientIPs(email string) { + ipsJSON, err := s.GetInboundClientIps(email) + if err != nil || ipsJSON == "" { + return + } + _, inbound, err := s.GetClientInboundByEmail(email) + if err != nil || inbound == nil { + return + } + port := inbound.Port + + type IPWithTimestamp struct { + IP string `json:"ip"` + Timestamp int64 `json:"timestamp"` + } + var ipsWithTime []IPWithTimestamp + if err := json.Unmarshal([]byte(ipsJSON), &ipsWithTime); err != nil { + // Try simple string-array format + var simpleIPs []string + if err2 := json.Unmarshal([]byte(ipsJSON), &simpleIPs); err2 != nil { + return + } + for _, ip := range simpleIPs { + if berr := iptables.BlockIP(ip, port); berr != nil { + logger.Warning("blockClientIPs: failed to block", ip, berr) + } + } + return + } + for _, entry := range ipsWithTime { + if berr := iptables.BlockIP(entry.IP, port); berr != nil { + logger.Warning("blockClientIPs: failed to block", entry.IP, berr) + } + } +} + +// unblockClientIPs removes iptables DROP rules for all known IPs of the given client. +func (s *InboundService) unblockClientIPs(email string) { + ipsJSON, err := s.GetInboundClientIps(email) + if err != nil || ipsJSON == "" { + return + } + _, inbound, err := s.GetClientInboundByEmail(email) + if err != nil || inbound == nil { + return + } + port := inbound.Port + + type IPWithTimestamp struct { + IP string `json:"ip"` + Timestamp int64 `json:"timestamp"` + } + var ipsWithTime []IPWithTimestamp + if err := json.Unmarshal([]byte(ipsJSON), &ipsWithTime); err != nil { + var simpleIPs []string + if err2 := json.Unmarshal([]byte(ipsJSON), &simpleIPs); err2 != nil { + return + } + for _, ip := range simpleIPs { + if uerr := iptables.UnblockIP(ip, port); uerr != nil { + logger.Debug("unblockClientIPs: failed to unblock", ip, uerr) + } + } + return + } + for _, entry := range ipsWithTime { + if uerr := iptables.UnblockIP(entry.IP, port); uerr != nil { + logger.Debug("unblockClientIPs: failed to unblock", entry.IP, uerr) + } + } +} diff --git a/web/service/xray.go b/web/service/xray.go index 511ffdda..77735e17 100644 --- a/web/service/xray.go +++ b/web/service/xray.go @@ -194,6 +194,19 @@ func (s *XrayService) GetXrayConfig() (*xray.Config, error) { return xrayConfig, nil } +// FlushTrafficToDB fetches current Xray traffic stats and persists them to the database. +// Call this before restarting Xray to avoid losing in-memory counter data. +func (s *XrayService) FlushTrafficToDB() { + traffics, clientTraffics, err := s.GetXrayTraffic() + if err != nil { + logger.Debug("FlushTrafficToDB: failed to get traffic:", err) + return + } + if err, _ := s.inboundService.AddTraffic(traffics, clientTraffics); err != nil { + logger.Warning("FlushTrafficToDB: failed to persist traffic:", err) + } +} + // GetXrayTraffic fetches the current traffic statistics from the running Xray process. func (s *XrayService) GetXrayTraffic() ([]*xray.Traffic, []*xray.ClientTraffic, error) { if !s.IsXrayRunning() { diff --git a/web/web.go b/web/web.go index 60934048..fb225717 100644 --- a/web/web.go +++ b/web/web.go @@ -19,6 +19,7 @@ import ( "github.com/mhsanaei/3x-ui/v2/config" "github.com/mhsanaei/3x-ui/v2/logger" "github.com/mhsanaei/3x-ui/v2/util/common" + "github.com/mhsanaei/3x-ui/v2/util/iptables" "github.com/mhsanaei/3x-ui/v2/web/controller" "github.com/mhsanaei/3x-ui/v2/web/job" "github.com/mhsanaei/3x-ui/v2/web/locale" @@ -295,6 +296,15 @@ func (s *Server) initRouter() (*gin.Engine, error) { // startTask schedules background jobs (Xray checks, traffic jobs, cron // jobs) which the panel relies on for periodic maintenance and monitoring. func (s *Server) startTask() { + // Ensure the 3X-UI-BLOCK iptables chain exists and flush any stale rules from a prior run + if err := iptables.EnsureChain(); err != nil { + logger.Warning("iptables EnsureChain failed (continuing without IP blocking):", err) + } else { + if err := iptables.FlushChain(); err != nil { + logger.Warning("iptables FlushChain failed:", err) + } + } + err := s.xrayService.RestartXray(true) if err != nil { logger.Warning("start xray failed:", err) @@ -305,6 +315,8 @@ func (s *Server) startTask() { // Check if xray needs to be restarted every 30 seconds s.cron.AddFunc("@every 30s", func() { if s.xrayService.IsNeedRestartAndSetFalse() { + // Flush pending traffic before restart so in-memory counters are not lost + s.xrayService.FlushTrafficToDB() err := s.xrayService.RestartXray(false) if err != nil { logger.Error("restart xray failed:", err) @@ -321,6 +333,9 @@ func (s *Server) startTask() { // check client ips from log file every 10 sec s.cron.AddJob("@every 10s", job.NewCheckClientIpJob()) + // Clean up expired iptables block rules every 5 minutes + s.cron.AddJob("@every 5m", job.NewUnblockIPsJob()) + // check client ips from log file every day s.cron.AddJob("@daily", job.NewClearLogsJob()) diff --git a/xray/api.go b/xray/api.go index 2312d2e4..53233456 100644 --- a/xray/api.go +++ b/xray/api.go @@ -204,6 +204,28 @@ func (x *XrayAPI) RemoveUser(inboundTag, email string) error { return nil } +// DrainUserTraffic reads and resets the uplink and downlink counters for a specific user +// from the Xray stats service. Returns 0,0,nil if no counters exist for the user. +func (x *XrayAPI) DrainUserTraffic(email string) (up int64, down int64, err error) { + if x.StatsServiceClient == nil { + return 0, 0, common.NewError("xray StatsServiceClient is not initialized") + } + client := *x.StatsServiceClient + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + uplinkName := fmt.Sprintf("user>>>%s>>>traffic>>>uplink", email) + downlinkName := fmt.Sprintf("user>>>%s>>>traffic>>>downlink", email) + + if resp, e := client.GetStats(ctx, &statsService.GetStatsRequest{Name: uplinkName, Reset_: true}); e == nil { + up = resp.GetStat().GetValue() + } + if resp, e := client.GetStats(ctx, &statsService.GetStatsRequest{Name: downlinkName, Reset_: true}); e == nil { + down = resp.GetStat().GetValue() + } + return up, down, nil +} + // GetTraffic queries traffic statistics from the Xray core, optionally resetting counters. func (x *XrayAPI) GetTraffic(reset bool) ([]*Traffic, []*ClientTraffic, error) { if x.grpcClient == nil {