mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-02-27 20:53:01 +00:00
Improve telego client robustness and retries
Some checks failed
Release 3X-UI / build (386) (push) Has been cancelled
Release 3X-UI / build (amd64) (push) Has been cancelled
Release 3X-UI / build (arm64) (push) Has been cancelled
Release 3X-UI / build (armv5) (push) Has been cancelled
Release 3X-UI / build (armv6) (push) Has been cancelled
Release 3X-UI / build (armv7) (push) Has been cancelled
Release 3X-UI / build (s390x) (push) Has been cancelled
Release 3X-UI / Build for Windows (push) Has been cancelled
Some checks failed
Release 3X-UI / build (386) (push) Has been cancelled
Release 3X-UI / build (amd64) (push) Has been cancelled
Release 3X-UI / build (arm64) (push) Has been cancelled
Release 3X-UI / build (armv5) (push) Has been cancelled
Release 3X-UI / build (armv6) (push) Has been cancelled
Release 3X-UI / build (armv7) (push) Has been cancelled
Release 3X-UI / build (s390x) (push) Has been cancelled
Release 3X-UI / Build for Windows (push) Has been cancelled
Add a createRobustFastHTTPClient helper to configure fasthttp.Client with better timeouts, connection limits, retries and optional SOCKS5 proxy dialing. Validate and sanitize proxy and API server URLs instead of returning early on invalid values, and build telego.Bot options dynamically. Reduce long-polling timeout to detect connection issues faster and adjust update retrieval comments. Implement exponential-backoff retry logic for SendMessage calls to handle transient connection/timeouts and improve delivery reliability; also reduce inter-message delay for better throughput.
This commit is contained in:
parent
3fa0da38c9
commit
5b796672e9
4 changed files with 117 additions and 54 deletions
|
|
@ -143,11 +143,11 @@ func (a *SUBController) subs(c *gin.Context) {
|
|||
|
||||
// Add headers
|
||||
header := fmt.Sprintf("upload=%d; download=%d; total=%d; expire=%d", traffic.Up, traffic.Down, traffic.Total, traffic.ExpiryTime/1000)
|
||||
profileUrl := a.subProfileUrl
|
||||
profileUrl := a.subProfileUrl
|
||||
if profileUrl == "" {
|
||||
profileUrl = fmt.Sprintf("%s://%s%s", scheme, hostWithPort, c.Request.RequestURI)
|
||||
}
|
||||
a.ApplyCommonHeaders(c, header, a.updateInterval, a.subTitle, a.subSupportUrl, profileUrl, a.subAnnounce, a.subEnableRouting, a.subRoutingRules)
|
||||
a.ApplyCommonHeaders(c, header, a.updateInterval, a.subTitle, a.subSupportUrl, profileUrl, a.subAnnounce, a.subEnableRouting, a.subRoutingRules)
|
||||
|
||||
if a.subEncrypt {
|
||||
c.String(200, base64.StdEncoding.EncodeToString([]byte(result)))
|
||||
|
|
|
|||
|
|
@ -2141,25 +2141,25 @@ func (s *InboundService) GetInboundClientIps(clientEmail string) (string, error)
|
|||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
|
||||
if InboundClientIps.Ips == "" {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
|
||||
// Try to parse as new format (with timestamps)
|
||||
type IPWithTimestamp struct {
|
||||
IP string `json:"ip"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
|
||||
var ipsWithTime []IPWithTimestamp
|
||||
err = json.Unmarshal([]byte(InboundClientIps.Ips), &ipsWithTime)
|
||||
|
||||
|
||||
// If successfully parsed as new format, return with timestamps
|
||||
if err == nil && len(ipsWithTime) > 0 {
|
||||
return InboundClientIps.Ips, nil
|
||||
}
|
||||
|
||||
|
||||
// Otherwise, assume it's old format (simple string array)
|
||||
// Try to parse as simple array and convert to new format
|
||||
var oldIps []string
|
||||
|
|
@ -2176,7 +2176,7 @@ func (s *InboundService) GetInboundClientIps(clientEmail string) (string, error)
|
|||
result, _ := json.Marshal(newIpsWithTime)
|
||||
return string(result), nil
|
||||
}
|
||||
|
||||
|
||||
// Return as-is if parsing fails
|
||||
return InboundClientIps.Ips, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
|
@ -719,25 +719,25 @@ func (s *SettingService) GetDefaultXrayConfig() (any, error) {
|
|||
}
|
||||
|
||||
func extractHostname(host string) string {
|
||||
h, _, err := net.SplitHostPort(host)
|
||||
// Err is not nil means host does not contain port
|
||||
if err != nil {
|
||||
h = host
|
||||
}
|
||||
h, _, err := net.SplitHostPort(host)
|
||||
// Err is not nil means host does not contain port
|
||||
if err != nil {
|
||||
h = host
|
||||
}
|
||||
|
||||
ip := net.ParseIP(h)
|
||||
// If it's not an IP, return as is
|
||||
if ip == nil {
|
||||
return h
|
||||
}
|
||||
ip := net.ParseIP(h)
|
||||
// If it's not an IP, return as is
|
||||
if ip == nil {
|
||||
return h
|
||||
}
|
||||
|
||||
// If it's an IPv4, return as is
|
||||
if ip.To4() != nil {
|
||||
return h
|
||||
}
|
||||
// If it's an IPv4, return as is
|
||||
if ip.To4() != nil {
|
||||
return h
|
||||
}
|
||||
|
||||
// IPv6 needs bracketing
|
||||
return "[" + h + "]"
|
||||
// IPv6 needs bracketing
|
||||
return "[" + h + "]"
|
||||
}
|
||||
|
||||
func (s *SettingService) GetDefaultSettings(host string) (any, error) {
|
||||
|
|
|
|||
|
|
@ -272,41 +272,78 @@ func (t *Tgbot) Start(i18nFS embed.FS) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// createRobustFastHTTPClient creates a fasthttp.Client with proper connection handling
|
||||
func (t *Tgbot) createRobustFastHTTPClient(proxyUrl string) *fasthttp.Client {
|
||||
client := &fasthttp.Client{
|
||||
// Connection timeouts
|
||||
ReadTimeout: 30 * time.Second,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
MaxIdleConnDuration: 60 * time.Second,
|
||||
MaxConnDuration: 0, // unlimited, but controlled by MaxIdleConnDuration
|
||||
MaxIdemponentCallAttempts: 3,
|
||||
ReadBufferSize: 4096,
|
||||
WriteBufferSize: 4096,
|
||||
MaxConnsPerHost: 100,
|
||||
MaxConnWaitTimeout: 10 * time.Second,
|
||||
DisableHeaderNamesNormalizing: false,
|
||||
DisablePathNormalizing: false,
|
||||
// Retry on connection errors
|
||||
RetryIf: func(request *fasthttp.Request) bool {
|
||||
// Retry on connection errors for GET requests
|
||||
return string(request.Header.Method()) == "GET" || string(request.Header.Method()) == "POST"
|
||||
},
|
||||
}
|
||||
|
||||
// Set proxy if provided
|
||||
if proxyUrl != "" {
|
||||
client.Dial = fasthttpproxy.FasthttpSocksDialer(proxyUrl)
|
||||
}
|
||||
|
||||
return client
|
||||
}
|
||||
|
||||
// NewBot creates a new Telegram bot instance with optional proxy and API server settings.
|
||||
func (t *Tgbot) NewBot(token string, proxyUrl string, apiServerUrl string) (*telego.Bot, error) {
|
||||
if proxyUrl == "" && apiServerUrl == "" {
|
||||
return telego.NewBot(token)
|
||||
}
|
||||
|
||||
// Validate proxy URL if provided
|
||||
if proxyUrl != "" {
|
||||
if !strings.HasPrefix(proxyUrl, "socks5://") {
|
||||
logger.Warning("Invalid socks5 URL, using default")
|
||||
return telego.NewBot(token)
|
||||
logger.Warning("Invalid socks5 URL, ignoring proxy")
|
||||
proxyUrl = "" // Clear invalid proxy
|
||||
} else {
|
||||
_, err := url.Parse(proxyUrl)
|
||||
if err != nil {
|
||||
logger.Warningf("Can't parse proxy URL, ignoring proxy: %v", err)
|
||||
proxyUrl = ""
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_, err := url.Parse(proxyUrl)
|
||||
if err != nil {
|
||||
logger.Warningf("Can't parse proxy URL, using default instance for tgbot: %v", err)
|
||||
return telego.NewBot(token)
|
||||
// Validate API server URL if provided
|
||||
if apiServerUrl != "" {
|
||||
if !strings.HasPrefix(apiServerUrl, "http") {
|
||||
logger.Warning("Invalid http(s) URL for API server, using default")
|
||||
apiServerUrl = ""
|
||||
} else {
|
||||
_, err := url.Parse(apiServerUrl)
|
||||
if err != nil {
|
||||
logger.Warningf("Can't parse API server URL, using default: %v", err)
|
||||
apiServerUrl = ""
|
||||
}
|
||||
}
|
||||
|
||||
return telego.NewBot(token, telego.WithFastHTTPClient(&fasthttp.Client{
|
||||
Dial: fasthttpproxy.FasthttpSocksDialer(proxyUrl),
|
||||
}))
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(apiServerUrl, "http") {
|
||||
logger.Warning("Invalid http(s) URL, using default")
|
||||
return telego.NewBot(token)
|
||||
// Create robust fasthttp client
|
||||
client := t.createRobustFastHTTPClient(proxyUrl)
|
||||
|
||||
// Build bot options
|
||||
var options []telego.BotOption
|
||||
options = append(options, telego.WithFastHTTPClient(client))
|
||||
|
||||
if apiServerUrl != "" {
|
||||
options = append(options, telego.WithAPIServer(apiServerUrl))
|
||||
}
|
||||
|
||||
_, err := url.Parse(apiServerUrl)
|
||||
if err != nil {
|
||||
logger.Warningf("Can't parse API server URL, using default instance for tgbot: %v", err)
|
||||
return telego.NewBot(token)
|
||||
}
|
||||
|
||||
return telego.NewBot(token, telego.WithAPIServer(apiServerUrl))
|
||||
return telego.NewBot(token, options...)
|
||||
}
|
||||
|
||||
// IsRunning checks if the Telegram bot is currently running.
|
||||
|
|
@ -390,7 +427,7 @@ func (t *Tgbot) decodeQuery(query string) (string, error) {
|
|||
// OnReceive starts the message receiving loop for the Telegram bot.
|
||||
func (t *Tgbot) OnReceive() {
|
||||
params := telego.GetUpdatesParams{
|
||||
Timeout: 30, // Increased timeout to reduce API calls
|
||||
Timeout: 20, // Reduced timeout to detect connection issues faster
|
||||
}
|
||||
// Strict singleton: never start a second long-polling loop.
|
||||
tgBotMutex.Lock()
|
||||
|
|
@ -408,7 +445,7 @@ func (t *Tgbot) OnReceive() {
|
|||
botWG.Add(1)
|
||||
tgBotMutex.Unlock()
|
||||
|
||||
// Get updates channel using the context.
|
||||
// Get updates channel using the context with shorter timeout for better error recovery
|
||||
updates, _ := bot.UpdatesViaLongPolling(ctx, ¶ms)
|
||||
go func() {
|
||||
defer botWG.Done()
|
||||
|
|
@ -2247,10 +2284,36 @@ func (t *Tgbot) SendMsgToTgbot(chatId int64, msg string, replyMarkup ...telego.R
|
|||
if len(replyMarkup) > 0 && n == (len(allMessages)-1) {
|
||||
params.ReplyMarkup = replyMarkup[0]
|
||||
}
|
||||
_, err := bot.SendMessage(context.Background(), ¶ms)
|
||||
if err != nil {
|
||||
logger.Warning("Error sending telegram message :", err)
|
||||
|
||||
// Retry logic with exponential backoff for connection errors
|
||||
maxRetries := 3
|
||||
for attempt := range maxRetries {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
_, err := bot.SendMessage(ctx, ¶ms)
|
||||
cancel()
|
||||
|
||||
if err == nil {
|
||||
break // Success
|
||||
}
|
||||
|
||||
// Check if error is a connection error
|
||||
errStr := err.Error()
|
||||
isConnectionError := strings.Contains(errStr, "connection") ||
|
||||
strings.Contains(errStr, "timeout") ||
|
||||
strings.Contains(errStr, "closed")
|
||||
|
||||
if isConnectionError && attempt < maxRetries-1 {
|
||||
// Exponential backoff: 1s, 2s, 4s
|
||||
backoff := time.Duration(1<<uint(attempt)) * time.Second
|
||||
logger.Warningf("Connection error sending telegram message (attempt %d/%d), retrying in %v: %v",
|
||||
attempt+1, maxRetries, backoff, err)
|
||||
time.Sleep(backoff)
|
||||
} else {
|
||||
logger.Warning("Error sending telegram message:", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Reduced delay to improve performance (only needed for rate limiting)
|
||||
if n < len(allMessages)-1 { // Only delay between messages, not after the last one
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
|
|
|||
Loading…
Reference in a new issue