diff --git a/web/service/tgbot.go b/web/service/tgbot.go index e575bb28..762ffd25 100644 --- a/web/service/tgbot.go +++ b/web/service/tgbot.go @@ -16,6 +16,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/mhsanaei/3x-ui/v2/config" @@ -44,6 +45,23 @@ var ( hostname string hashStorage *global.HashStorage + // Performance improvements + messageWorkerPool chan struct{} // Semaphore for limiting concurrent message processing + optimizedHTTPClient *http.Client // HTTP client with connection pooling and timeouts + + // Simple cache for frequently accessed data + statusCache struct { + data *Status + timestamp time.Time + mutex sync.RWMutex + } + + serverStatsCache struct { + data string + timestamp time.Time + mutex sync.RWMutex + } + // clients data to adding new client receiver_inbound_ID int client_Id string @@ -100,6 +118,46 @@ func (t *Tgbot) GetHashStorage() *global.HashStorage { return hashStorage } +// getCachedStatus returns cached server status if it's fresh enough (less than 5 seconds old) +func (t *Tgbot) getCachedStatus() (*Status, bool) { + statusCache.mutex.RLock() + defer statusCache.mutex.RUnlock() + + if statusCache.data != nil && time.Since(statusCache.timestamp) < 5*time.Second { + return statusCache.data, true + } + return nil, false +} + +// setCachedStatus updates the status cache +func (t *Tgbot) setCachedStatus(status *Status) { + statusCache.mutex.Lock() + defer statusCache.mutex.Unlock() + + statusCache.data = status + statusCache.timestamp = time.Now() +} + +// getCachedServerStats returns cached server stats if it's fresh enough (less than 10 seconds old) +func (t *Tgbot) getCachedServerStats() (string, bool) { + serverStatsCache.mutex.RLock() + defer serverStatsCache.mutex.RUnlock() + + if serverStatsCache.data != "" && time.Since(serverStatsCache.timestamp) < 10*time.Second { + return serverStatsCache.data, true + } + return "", false +} + +// setCachedServerStats updates the server stats cache +func (t *Tgbot) setCachedServerStats(stats string) { + serverStatsCache.mutex.Lock() + defer serverStatsCache.mutex.Unlock() + + serverStatsCache.data = stats + serverStatsCache.timestamp = time.Now() +} + // Start initializes and starts the Telegram bot with the provided translation files. func (t *Tgbot) Start(i18nFS embed.FS) error { // Initialize localizer @@ -111,6 +169,20 @@ func (t *Tgbot) Start(i18nFS embed.FS) error { // Initialize hash storage to store callback queries hashStorage = global.NewHashStorage(20 * time.Minute) + // Initialize worker pool for concurrent message processing (max 10 concurrent handlers) + messageWorkerPool = make(chan struct{}, 10) + + // Initialize optimized HTTP client with connection pooling + optimizedHTTPClient = &http.Client{ + Timeout: 15 * time.Second, + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 30 * time.Second, + DisableKeepAlives: false, + }, + } + t.SetHostname() // Get Telegram bot token @@ -271,7 +343,7 @@ func (t *Tgbot) decodeQuery(query string) (string, error) { // OnReceive starts the message receiving loop for the Telegram bot. func (t *Tgbot) OnReceive() { params := telego.GetUpdatesParams{ - Timeout: 10, + Timeout: 30, // Increased timeout to reduce API calls } updates, _ := bot.UpdatesViaLongPolling(context.Background(), ¶ms) @@ -285,14 +357,26 @@ func (t *Tgbot) OnReceive() { }, th.TextEqual(t.I18nBot("tgbot.buttons.closeKeyboard"))) botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error { - delete(userStates, message.Chat.ID) - t.answerCommand(&message, message.Chat.ID, checkAdmin(message.From.ID)) + // Use goroutine with worker pool for concurrent command processing + go func() { + messageWorkerPool <- struct{}{} // Acquire worker + defer func() { <-messageWorkerPool }() // Release worker + + delete(userStates, message.Chat.ID) + t.answerCommand(&message, message.Chat.ID, checkAdmin(message.From.ID)) + }() return nil }, th.AnyCommand()) botHandler.HandleCallbackQuery(func(ctx *th.Context, query telego.CallbackQuery) error { - delete(userStates, query.Message.GetChat().ID) - t.answerCallback(&query, checkAdmin(query.From.ID)) + // Use goroutine with worker pool for concurrent callback processing + go func() { + messageWorkerPool <- struct{}{} // Acquire worker + defer func() { <-messageWorkerPool }() // Release worker + + delete(userStates, query.Message.GetChat().ID) + t.answerCallback(&query, checkAdmin(query.From.ID)) + }() return nil }, th.AnyCallbackQueryWithMessage()) @@ -2099,7 +2183,10 @@ func (t *Tgbot) SendMsgToTgbot(chatId int64, msg string, replyMarkup ...telego.R if err != nil { logger.Warning("Error sending telegram message :", err) } - time.Sleep(500 * time.Millisecond) + // Reduced delay to improve performance (only needed for rate limiting) + if n < len(allMessages)-1 { // Only delay between messages, not after the last one + time.Sleep(100 * time.Millisecond) + } } } @@ -2208,12 +2295,12 @@ func (t *Tgbot) sendClientIndividualLinks(chatId int64, email string) { // Force plain text to avoid HTML page; controller respects Accept header req.Header.Set("Accept", "text/plain, */*;q=0.1") - // Use default client with reasonable timeout via context + // Use optimized client with connection pooling ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() req = req.WithContext(ctx) - resp, err := http.DefaultClient.Do(req) + resp, err := optimizedHTTPClient.Do(req) if err != nil { t.SendMsgToTgbot(chatId, t.I18nBot("tgbot.answers.errorOperation")+"\r\n"+err.Error()) return @@ -2323,7 +2410,7 @@ func (t *Tgbot) sendClientQRLinks(chatId int64, email string) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() req = req.WithContext(ctx) - if resp, err := http.DefaultClient.Do(req); err == nil { + if resp, err := optimizedHTTPClient.Do(req); err == nil { body, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() encoded, _ := t.settingService.GetSubEncrypt() @@ -2356,7 +2443,10 @@ func (t *Tgbot) sendClientQRLinks(chatId int64, email string) { tu.FileFromBytes(png, filename), ) _, _ = bot.SendDocument(context.Background(), document) - time.Sleep(200 * time.Millisecond) + // Reduced delay for better performance + if i < max-1 { // Only delay between documents, not after the last one + time.Sleep(50 * time.Millisecond) + } } } } @@ -2443,10 +2533,20 @@ func (t *Tgbot) sendServerUsage() string { // prepareServerUsageInfo prepares the server usage information string. func (t *Tgbot) prepareServerUsageInfo() string { + // Check if we have cached data first + if cachedStats, found := t.getCachedServerStats(); found { + return cachedStats + } + info, ipv4, ipv6 := "", "", "" - // get latest status of server - t.lastStatus = t.serverService.GetStatus(t.lastStatus) + // get latest status of server with caching + if cachedStatus, found := t.getCachedStatus(); found { + t.lastStatus = cachedStatus + } else { + t.lastStatus = t.serverService.GetStatus(t.lastStatus) + t.setCachedStatus(t.lastStatus) + } onlines := p.GetOnlineClients() info += t.I18nBot("tgbot.messages.hostname", "Hostname=="+hostname) @@ -2488,6 +2588,10 @@ func (t *Tgbot) prepareServerUsageInfo() string { info += t.I18nBot("tgbot.messages.udpCount", "Count=="+strconv.Itoa(t.lastStatus.UdpCount)) info += t.I18nBot("tgbot.messages.traffic", "Total=="+common.FormatTraffic(int64(t.lastStatus.NetTraffic.Sent+t.lastStatus.NetTraffic.Recv)), "Upload=="+common.FormatTraffic(int64(t.lastStatus.NetTraffic.Sent)), "Download=="+common.FormatTraffic(int64(t.lastStatus.NetTraffic.Recv))) info += t.I18nBot("tgbot.messages.xrayStatus", "State=="+fmt.Sprint(t.lastStatus.Xray.State)) + + // Cache the complete server stats + t.setCachedServerStats(info) + return info }