diff --git a/web/job/check_client_ip_job.go b/web/job/check_client_ip_job.go index 30318ec0..9ea79936 100644 --- a/web/job/check_client_ip_job.go +++ b/web/job/check_client_ip_job.go @@ -7,16 +7,14 @@ import ( "io" "log" "os" - "os/exec" "regexp" - "runtime" "sort" - "strconv" "time" "github.com/mhsanaei/3x-ui/v2/database" "github.com/mhsanaei/3x-ui/v2/database/model" "github.com/mhsanaei/3x-ui/v2/logger" + "github.com/mhsanaei/3x-ui/v2/web/service" "github.com/mhsanaei/3x-ui/v2/xray" ) @@ -28,20 +26,24 @@ type IPWithTimestamp struct { // CheckClientIpJob monitors client IP addresses from access logs and manages IP blocking based on configured limits. type CheckClientIpJob struct { - lastClear int64 - disAllowedIps []string - fail2BanWarned bool - fail2BanInstalled bool - lastDisconnectOK bool - lastDisconnectByID map[string]int64 + lastClear int64 + tempBansByEmail map[string]int64 + inboundService service.InboundService + xrayService *service.XrayService } var job *CheckClientIpJob +const ( + ipLimitWindowDuration = 3 * time.Minute + ipLimitBanDuration = 3 * time.Minute +) + // NewCheckClientIpJob creates a new client IP monitoring job instance. -func NewCheckClientIpJob() *CheckClientIpJob { +func NewCheckClientIpJob(xrayService *service.XrayService) *CheckClientIpJob { job = &CheckClientIpJob{ - lastDisconnectByID: map[string]int64{}, + tempBansByEmail: map[string]int64{}, + xrayService: xrayService, } return job } @@ -55,25 +57,11 @@ func (j *CheckClientIpJob) Run() { iplimitActive := j.hasLimitIp() isAccessLogAvailable := j.checkAccessLogAvailable(iplimitActive) + j.restoreExpiredClientAccess() + if isAccessLogAvailable { - if runtime.GOOS == "windows" { - if iplimitActive { - shouldClearAccessLog = j.processLogFile() - } - } else { - if iplimitActive { - // Always process and persist client IP records. Fail2Ban is optional. - f2bInstalled := j.checkFail2BanInstalled() - j.fail2BanInstalled = f2bInstalled - shouldClearAccessLog = j.processLogFile() - if !j.fail2BanInstalled && !j.fail2BanWarned { - logger.Warning("[LimitIP] Fail2Ban is not installed, IP records will continue to work but automatic banning is disabled.") - j.fail2BanWarned = true - } - if j.fail2BanInstalled { - j.fail2BanWarned = false - } - } + if iplimitActive { + shouldClearAccessLog = j.processLogFile() } } @@ -210,13 +198,6 @@ func (j *CheckClientIpJob) processLogFile() bool { return shouldCleanLog } -func (j *CheckClientIpJob) checkFail2BanInstalled() bool { - cmd := "fail2ban-client" - args := []string{"-h"} - err := exec.Command(cmd, args...).Run() - return err == nil -} - func (j *CheckClientIpJob) checkAccessLogAvailable(iplimitActive bool) bool { accessLogPath, err := xray.GetAccessLogPath() if err != nil { @@ -340,8 +321,6 @@ func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.Inboun }) shouldCleanLog := false - j.disAllowedIps = []string{} - j.lastDisconnectOK = false // Open log file logIpFile, err := os.OpenFile(xray.GetIPLimitLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) @@ -353,33 +332,23 @@ func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.Inboun log.SetOutput(logIpFile) log.SetFlags(log.LstdFlags) - // Check if we exceed the limit - if len(allIps) > limitIp { + recentIps := j.filterRecentIPs(allIps, time.Now().Add(-ipLimitWindowDuration).Unix()) + + jsonIps, _ := json.Marshal(recentIps) + inboundClientIps.Ips = string(jsonIps) + + // Check if the recent 3-minute window exceeds the limit. + if len(recentIps) > limitIp { shouldCleanLog = true - - // Keep the oldest IPs (currently active connections) and ban the new excess ones. - keptIps := allIps[:limitIp] - bannedIps := allIps[limitIp:] - - // Log banned IPs in the format fail2ban filters expect: [LIMIT_IP] Email = X || Disconnecting OLD IP = Y || Timestamp = Z - for _, ipTime := range bannedIps { - j.disAllowedIps = append(j.disAllowedIps, ipTime.IP) - log.Printf("[LIMIT_IP] Email = %s || Disconnecting OLD IP = %s || Timestamp = %d", clientEmail, ipTime.IP, ipTime.Timestamp) + for _, ipTime := range recentIps { + log.Printf("[LIMIT_IP] Email = %s || Recent IP = %s || Timestamp = %d", clientEmail, ipTime.IP, ipTime.Timestamp) } - // Fallback enforcement path when Fail2Ban is unavailable: - // temporarily remove and re-add user to drop existing sessions. - if len(bannedIps) > 0 && !j.fail2BanInstalled { - j.lastDisconnectOK = j.disconnectClientTemporarily(inbound, clientEmail, clients) + if err := j.disableClientTemporarily(clientEmail, ipLimitBanDuration); err != nil { + logger.Warningf("[LIMIT_IP] Failed to temporarily disable client %s: %v", clientEmail, err) + } else { + logger.Infof("[LIMIT_IP] Client %s: observed %d IPs in the last 3 minutes (limit=%d), temporarily disabled for 3 minutes", clientEmail, len(recentIps), limitIp) } - - // Update database with only the currently active (kept) IPs - jsonIps, _ := json.Marshal(keptIps) - inboundClientIps.Ips = string(jsonIps) - } else { - // Under limit, save all IPs - jsonIps, _ := json.Marshal(allIps) - inboundClientIps.Ips = string(jsonIps) } db := database.GetDB() @@ -389,16 +358,6 @@ func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.Inboun return false } - if len(j.disAllowedIps) > 0 { - if j.fail2BanInstalled { - logger.Infof("[LIMIT_IP] Client %s: Kept %d current IPs, queued %d new IPs for fail2ban", clientEmail, limitIp, len(j.disAllowedIps)) - } else if j.lastDisconnectOK { - logger.Infof("[LIMIT_IP] Client %s: Kept %d current IPs, disconnected session to enforce limit without fail2ban", clientEmail, limitIp) - } else { - logger.Warningf("[LIMIT_IP] Client %s: Kept %d current IPs, but failed to disconnect excess sessions without fail2ban", clientEmail, limitIp) - } - } - return shouldCleanLog } @@ -414,71 +373,56 @@ func (j *CheckClientIpJob) getInboundByEmail(clientEmail string) (*model.Inbound return inbound, nil } -// disconnectClientTemporarily removes and re-adds a client to force stale/excess sessions to drop. -func (j *CheckClientIpJob) disconnectClientTemporarily(inbound *model.Inbound, clientEmail string, clients []model.Client) bool { - if inbound == nil || inbound.Tag == "" { - return false - } - - // Avoid thrashing the same account on every 10s cron tick. +func (j *CheckClientIpJob) disableClientTemporarily(clientEmail string, duration time.Duration) error { now := time.Now().Unix() - if last, ok := j.lastDisconnectByID[clientEmail]; ok && now-last < 30 { - return true + if until, ok := j.tempBansByEmail[clientEmail]; ok && until > now { + return nil } - var xrayAPI xray.XrayAPI - apiPort, err := j.resolveXrayAPIPort() + changed, needRestart, err := j.inboundService.SetClientEnableByEmail(clientEmail, false) if err != nil { - logger.Warningf("[LIMIT_IP] Failed to resolve Xray API port for fallback disconnect: %v", err) - return false + return err + } + if needRestart && j.xrayService != nil { + j.xrayService.SetToNeedRestart() + } + if !changed { + return fmt.Errorf("client %s was not disabled", clientEmail) } - if err := xrayAPI.Init(apiPort); err != nil { - logger.Warningf("[LIMIT_IP] Failed to init Xray API for fallback disconnect: %v", err) - return false - } - defer xrayAPI.Close() + j.tempBansByEmail[clientEmail] = time.Now().Add(duration).Unix() + return nil +} - var clientConfig map[string]any - for _, client := range clients { - if client.Email != clientEmail { +func (j *CheckClientIpJob) restoreExpiredClientAccess() { + now := time.Now().Unix() + for clientEmail, until := range j.tempBansByEmail { + if until > now { continue } - clientBytes, _ := json.Marshal(client) - _ = json.Unmarshal(clientBytes, &clientConfig) - break - } - if clientConfig == nil { - return false - } - if err := xrayAPI.RemoveUser(inbound.Tag, clientEmail); err != nil { - logger.Warningf("[LIMIT_IP] Failed to remove user %s: %v", clientEmail, err) - return false - } + changed, needRestart, err := j.inboundService.SetClientEnableByEmail(clientEmail, true) + if err != nil { + logger.Warningf("[LIMIT_IP] Failed to restore client %s after temporary disable: %v", clientEmail, err) + continue + } + if needRestart && j.xrayService != nil { + j.xrayService.SetToNeedRestart() + } - time.Sleep(150 * time.Millisecond) - if err := xrayAPI.AddUser(string(inbound.Protocol), inbound.Tag, clientConfig); err != nil { - logger.Warningf("[LIMIT_IP] Failed to re-add user %s: %v", clientEmail, err) - return false - } - - j.lastDisconnectByID[clientEmail] = now - return true -} - -func (j *CheckClientIpJob) resolveXrayAPIPort() (int, error) { - if apiPort, err := xray.GetAPIPortFromConfig(); err == nil && apiPort > 0 { - return apiPort, nil - } - - db := database.GetDB() - var apiPortSetting model.Setting - if err := db.Where("key = ?", "xrayApiPort").First(&apiPortSetting).Error; err == nil { - if parsed, convErr := strconv.Atoi(apiPortSetting.Value); convErr == nil && parsed > 0 { - return parsed, nil + delete(j.tempBansByEmail, clientEmail) + if changed { + logger.Infof("[LIMIT_IP] Client %s: temporary 3-minute disable expired, access restored", clientEmail) } } - - return 0, fmt.Errorf("no usable Xray API port found in config or settings") +} + +func (j *CheckClientIpJob) filterRecentIPs(ips []IPWithTimestamp, minTimestamp int64) []IPWithTimestamp { + recent := make([]IPWithTimestamp, 0, len(ips)) + for _, ipTime := range ips { + if ipTime.Timestamp >= minTimestamp { + recent = append(recent, ipTime) + } + } + return recent } diff --git a/web/web.go b/web/web.go index 44c5601c..9e8eef91 100644 --- a/web/web.go +++ b/web/web.go @@ -319,7 +319,7 @@ func (s *Server) startTask() { }() // check client ips from log file every 10 sec - s.cron.AddJob("@every 10s", job.NewCheckClientIpJob()) + s.cron.AddJob("@every 10s", job.NewCheckClientIpJob(&s.xrayService)) // check active device limits every 10 sec s.cron.AddJob("@every 10s", job.NewCheckDeviceLimitJob(&s.xrayService))