diff --git a/web/job/collect_node_logs_job.go b/web/job/collect_node_logs_job.go deleted file mode 100644 index 907551c8..00000000 --- a/web/job/collect_node_logs_job.go +++ /dev/null @@ -1,283 +0,0 @@ -// Package job provides scheduled background jobs for the 3x-ui panel. -package job - -import ( - "crypto/md5" - "fmt" - "sort" - "strings" - "sync" - "time" - - "github.com/mhsanaei/3x-ui/v2/database/model" - "github.com/mhsanaei/3x-ui/v2/logger" - "github.com/mhsanaei/3x-ui/v2/web/service" -) - -// logEntry represents a log entry with timestamp for sorting -type logEntry struct { - timestamp time.Time - nodeName string - level string - message string -} - -// CollectNodeLogsJob periodically collects XRAY logs from nodes and adds them to the panel log buffer. -type CollectNodeLogsJob struct { - nodeService service.NodeService - // Track last collected log hash for each node and log type to avoid duplicates - lastLogHashes map[string]string // key: "nodeId:logType" (e.g., "1:service", "1:access") - mu sync.RWMutex -} - -// NewCollectNodeLogsJob creates a new job for collecting node logs. -func NewCollectNodeLogsJob() *CollectNodeLogsJob { - return &CollectNodeLogsJob{ - nodeService: service.NodeService{}, - lastLogHashes: make(map[string]string), - } -} - -// parseLogLine parses a log line from node and extracts timestamp, level, and message. -// Format: "timestamp level - message" or "timestamp level - message" for access logs. -// Returns timestamp, level, message, and success flag. -func (j *CollectNodeLogsJob) parseLogLine(logLine string) (time.Time, string, string, bool) { - // Try to parse format: "2006/01/02 15:04:05 level - message" or "2006/01/02 15:04:05.999999 level - message" - if idx := strings.Index(logLine, " - "); idx != -1 { - parts := strings.SplitN(logLine, " - ", 2) - if len(parts) == 2 { - // parts[0] = "timestamp level", parts[1] = "message" - levelPart := strings.TrimSpace(parts[0]) - levelFields := strings.Fields(levelPart) - - if len(levelFields) >= 3 { - // Format: "2006/01/02 15:04:05 level" or "2006/01/02 15:04:05.999999 level" - timestampStr := levelFields[0] + " " + levelFields[1] - level := strings.ToUpper(levelFields[2]) - message := parts[1] - - // Try parsing with microseconds first - timestamp, err := time.ParseInLocation("2006/01/02 15:04:05.999999", timestampStr, time.Local) - if err != nil { - // Fallback to format without microseconds - timestamp, err = time.ParseInLocation("2006/01/02 15:04:05", timestampStr, time.Local) - } - - if err == nil { - return timestamp, level, message, true - } - } else if len(levelFields) >= 2 { - // Try to parse as "timestamp level" where timestamp might be in different format - level := strings.ToUpper(levelFields[len(levelFields)-1]) - message := parts[1] - // Try to extract timestamp from first fields - if len(levelFields) >= 2 { - timestampStr := strings.Join(levelFields[:len(levelFields)-1], " ") - timestamp, err := time.ParseInLocation("2006/01/02 15:04:05.999999", timestampStr, time.Local) - if err != nil { - timestamp, err = time.ParseInLocation("2006/01/02 15:04:05", timestampStr, time.Local) - } - if err == nil { - return timestamp, level, message, true - } - } - // If timestamp parsing fails, use current time - return time.Now(), level, message, true - } - } - } - - // If parsing fails, return current time and treat as INFO level - return time.Now(), "INFO", logLine, false -} - -// processLogs processes logs from a node and returns new log entries with timestamps. -// logType can be "service" or "access" to track them separately. -func (j *CollectNodeLogsJob) processLogs(node *model.Node, rawLogs []string, logType string) []logEntry { - if len(rawLogs) == 0 { - return nil - } - - // Get last collected log hash for this node and log type - hashKey := fmt.Sprintf("%d:%s", node.Id, logType) - j.mu.RLock() - lastHash := j.lastLogHashes[hashKey] - j.mu.RUnlock() - - // Process logs from newest to oldest to find where we left off - var newLogLines []string - var mostRecentHash string - foundLastHash := lastHash == "" // If no last hash, all logs are new - - // Iterate from newest (end) to oldest (start) - for i := len(rawLogs) - 1; i >= 0; i-- { - logLine := rawLogs[i] - if logLine == "" { - continue - } - - // Skip API calls for access logs - if logType == "access" && strings.Contains(logLine, "api -> api") { - continue - } - - // Calculate hash for this log line - logHash := fmt.Sprintf("%x", md5.Sum([]byte(logLine))) - - // Store the most recent hash (first valid log we encounter) - if mostRecentHash == "" { - mostRecentHash = logHash - } - - // If we haven't found the last collected log yet, check if this is it - if !foundLastHash { - if logHash == lastHash { - foundLastHash = true - } - continue - } - - // This is a new log (after the last collected one) - newLogLines = append(newLogLines, logLine) - } - - // If we didn't find the last hash, all logs in this batch are new - if !foundLastHash { - // Add all valid logs as new - for i := len(rawLogs) - 1; i >= 0; i-- { - logLine := rawLogs[i] - if logLine != "" { - if logType == "access" && strings.Contains(logLine, "api -> api") { - continue - } - newLogLines = append(newLogLines, logLine) - } - } - } - - // Parse logs and create entries with timestamps - var entries []logEntry - for _, logLine := range newLogLines { - timestamp, level, message, _ := j.parseLogLine(logLine) - entries = append(entries, logEntry{ - timestamp: timestamp, - nodeName: node.Name, - level: level, - message: message, - }) - } - - // Update last hash to the most recent log we processed (newest log from batch) - if mostRecentHash != "" { - j.mu.Lock() - j.lastLogHashes[hashKey] = mostRecentHash - j.mu.Unlock() - } - - return entries -} - -// Run executes the job to collect logs from all nodes and add them to the panel log buffer. -func (j *CollectNodeLogsJob) Run() { - // Check if multi-node mode is enabled - settingService := service.SettingService{} - multiMode, err := settingService.GetMultiNodeMode() - if err != nil || !multiMode { - return // Skip if multi-node mode is not enabled - } - - nodes, err := j.nodeService.GetAllNodes() - if err != nil { - logger.Debugf("Failed to get nodes for log collection: %v", err) - return - } - - if len(nodes) == 0 { - return // No nodes to collect logs from - } - - // Collect all logs from all nodes first, then sort and add to buffer - var allEntries []logEntry - var wg sync.WaitGroup - var entriesMu sync.Mutex - - // Collect logs from each node concurrently - // Only collect from nodes that have assigned inbounds (active nodes) - for _, node := range nodes { - n := node // Capture loop variable - wg.Add(1) - go func() { - defer wg.Done() - - // Check if node has assigned inbounds (only collect from active nodes) - inbounds, err := j.nodeService.GetInboundsForNode(n.Id) - if err != nil || len(inbounds) == 0 { - return // Skip nodes without assigned inbounds - } - - var nodeEntries []logEntry - - // Collect service logs (node service and XRAY core logs) - serviceLogs, err := j.nodeService.GetNodeServiceLogs(n, 100, "debug") - if err != nil { - // Don't log errors for offline nodes - if !strings.Contains(err.Error(), "status code") { - logger.Debugf("[Node: %s] Failed to collect service logs: %v", n.Name, err) - } - } else { - // Process service logs - nodeEntries = append(nodeEntries, j.processLogs(n, serviceLogs, "service")...) - } - - // Get XRAY access logs (traffic logs) - rawLogs, err := j.nodeService.GetNodeLogs(n, 100, "") - if err != nil { - // Don't log errors for offline nodes or nodes without logs configured - if !strings.Contains(err.Error(), "XRAY is not running") && - !strings.Contains(err.Error(), "status code") && - !strings.Contains(err.Error(), "access log path") { - logger.Debugf("[Node: %s] Failed to collect access logs: %v", n.Name, err) - } - } else { - // Process access logs - nodeEntries = append(nodeEntries, j.processLogs(n, rawLogs, "access")...) - } - - // Add node entries to global list - if len(nodeEntries) > 0 { - entriesMu.Lock() - allEntries = append(allEntries, nodeEntries...) - entriesMu.Unlock() - } - }() - } - - // Wait for all goroutines to finish - wg.Wait() - - // Sort all entries by timestamp (oldest first) - sort.Slice(allEntries, func(i, j int) bool { - return allEntries[i].timestamp.Before(allEntries[j].timestamp) - }) - - // Add sorted logs to panel buffer - for _, entry := range allEntries { - formattedMessage := fmt.Sprintf("[Node: %s] %s", entry.nodeName, entry.message) - switch entry.level { - case "DEBUG": - logger.Debugf("%s", formattedMessage) - case "WARNING": - logger.Warningf("%s", formattedMessage) - case "ERROR": - logger.Errorf("%s", formattedMessage) - case "NOTICE": - logger.Noticef("%s", formattedMessage) - default: - logger.Infof("%s", formattedMessage) - } - } - - if len(allEntries) > 0 { - logger.Debugf("Collected and sorted %d new log entries from %d nodes", len(allEntries), len(nodes)) - } -} diff --git a/web/service/node.go b/web/service/node.go index 0baa5dd7..4fc24704 100644 --- a/web/service/node.go +++ b/web/service/node.go @@ -875,43 +875,5 @@ func (s *NodeService) GetNodeLogs(node *model.Node, count int, filter string) ([ return nil, fmt.Errorf("failed to decode response: %w", err) } - return response.Logs, nil -} - -// GetNodeServiceLogs retrieves service application logs from a node (node service logs and XRAY core logs). -// Returns log lines as strings. -func (s *NodeService) GetNodeServiceLogs(node *model.Node, count int, level string) ([]string, error) { - client, err := s.createHTTPClient(node, 10*time.Second) - if err != nil { - return nil, fmt.Errorf("failed to create HTTP client: %w", err) - } - - url := fmt.Sprintf("%s/api/v1/service-logs?count=%d&level=%s", node.Address, count, level) - - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) - } - - req.Header.Set("Authorization", "Bearer "+node.ApiKey) - - resp, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to request node service logs: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return nil, fmt.Errorf("node returned status code %d: %s", resp.StatusCode, string(body)) - } - - var response struct { - Logs []string `json:"logs"` - } - if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { - return nil, fmt.Errorf("failed to decode response: %w", err) - } - return response.Logs, nil } \ No newline at end of file diff --git a/web/web.go b/web/web.go index ff456301..9283e3df 100644 --- a/web/web.go +++ b/web/web.go @@ -368,9 +368,6 @@ func (s *Server) startTask() { s.cron.AddJob("@every 10s", job.NewCheckNodeHealthJob()) // Collect node statistics (traffic and online clients) every 30 seconds s.cron.AddJob("@every 30s", job.NewCollectNodeStatsJob()) - // Collect node logs and add to panel log buffer every 30 seconds - // Disabled: logs are now pushed from nodes in real-time via push-logs endpoint - // s.cron.AddJob("@every 30s", job.NewCollectNodeLogsJob()) // Make a traffic condition every day, 8:30 var entry cron.EntryID