From 020cd63e227c0015ef1b984ad909000e6fdd52e5 Mon Sep 17 00:00:00 2001 From: OleksandrParshyn <43094723+OleksandrParshyn@users.noreply.github.com> Date: Sat, 1 Nov 2025 12:56:55 +0100 Subject: [PATCH] Fix: Graceful Telegram bot shutdown to prevent 409 Conflict (#3580) * Fix: Graceful Telegram bot shutdown to prevent 409 Conflict Introduces a `botCancel` context and a global `StopBot()` function to ensure the Telegram bot's Long Polling operation is safely terminated (via context cancellation) before the service restarts. This prevents the "Conflict: another update consumer is running" (409) error upon panel restart. Changes: - Added `botCancel context.CancelFunc` to manage context cancellation. - Implemented global `StopBot()` function. - Updated `Tgbot.Stop()` to call `StopBot()`. - Modified `Tgbot.OnReceive()` to use the new cancellable context for `UpdatesViaLongPolling`. * Fix: Prevent race condition and goroutine leak in TgBot Addresses a critical race condition on the global `botCancel` variable, which could occur if `Tgbot.OnReceive()` was called concurrently (e.g., during rapid panel restarts or unexpected behavior). Changes in tgbot.go: - Added `tgBotMutex sync.Mutex` to ensure thread safety. - Protected `botCancel` creation and assignment in `OnReceive()` using the mutex, and added a check to prevent overwriting an active context, which avoids goroutine leaks. - Protected the cancellation and cleanup logic in `StopBot()` with the mutex. * Refactor: Replace time.Sleep with sync.WaitGroup for reliable TgBot shutdown Replaced the unreliable `time.Sleep(1 * time.Second)` in `service.StopBot()` with `sync.WaitGroup`. This ensures the Long Polling goroutine is explicitly waited for and reliably exits before the panel continues, preventing potential resource leaks and incomplete shutdowns during restarts. Changes: - Added `botWG sync.WaitGroup` variable. - Updated `service.StopBot()` to call `botWG.Wait()` instead of `time.Sleep()`. - Modified `Tgbot.OnReceive()` to correctly use `botWG.Add(1)` and `defer botWG.Done()` within the Long Polling goroutine. - Corrected the goroutine structure in `OnReceive()` to properly encapsulate all message handling logic. --- web/service/tgbot.go | 393 ++++++++++++++++++++++++------------------- 1 file changed, 223 insertions(+), 170 deletions(-) diff --git a/web/service/tgbot.go b/web/service/tgbot.go index 0c9d820c..1573b2bf 100644 --- a/web/service/tgbot.go +++ b/web/service/tgbot.go @@ -38,7 +38,15 @@ import ( ) var ( - bot *telego.Bot + bot *telego.Bot + + // botCancel stores the function to cancel the context, stopping Long Polling gracefully. + botCancel context.CancelFunc + // tgBotMutex protects concurrent access to botCancel variable + tgBotMutex sync.Mutex + // botWG waits for the OnReceive Long Polling goroutine to finish. + botWG sync.WaitGroup + botHandler *th.BotHandler adminIds []int64 isRunning bool @@ -306,8 +314,13 @@ func (t *Tgbot) SetHostname() { hostname = host } -// Stop stops the Telegram bot and cleans up resources. +// Stop safely stops the Telegram bot's Long Polling operation. +// This method now calls the global StopBot function and cleans up other resources. func (t *Tgbot) Stop() { + // Call the global StopBot function to gracefully shut down Long Polling + StopBot() + + // Stop the bot handler (in case the goroutine hasn't exited yet) if botHandler != nil { botHandler.Stop() } @@ -316,6 +329,27 @@ func (t *Tgbot) Stop() { adminIds = nil } +// StopBot safely stops the Telegram bot's Long Polling operation by cancelling its context. +// This is the global function called from main.go's signal handler and t.Stop(). +func StopBot() { + tgBotMutex.Lock() + defer tgBotMutex.Unlock() + + if botCancel != nil { + logger.Info("Sending cancellation signal to Telegram bot...") + + // Calling botCancel() cancels the context passed to UpdatesViaLongPolling, + // which stops the Long Polling operation and closes the updates channel, + // allowing the th.Start() goroutine to exit cleanly. + botCancel() + + botCancel = nil + // Giving the goroutine a small delay to exit cleanly. + botWG.Wait() + logger.Info("Telegram bot successfully stopped.") + } +} + // encodeQuery encodes the query string if it's longer than 64 characters. func (t *Tgbot) encodeQuery(query string) string { // NOTE: we only need to hash for more than 64 chars @@ -345,188 +379,207 @@ func (t *Tgbot) OnReceive() { params := telego.GetUpdatesParams{ Timeout: 30, // Increased timeout to reduce API calls } + // --- GRACEFUL SHUTDOWN FIX: Context creation --- + tgBotMutex.Lock() - updates, _ := bot.UpdatesViaLongPolling(context.Background(), ¶ms) + // Create a context with cancellation and store the cancel function. + var ctx context.Context - botHandler, _ = th.NewBotHandler(bot, updates) + // Check if botCancel is already set (to prevent race condition overwrite and goroutine leak) + if botCancel == nil { + ctx, botCancel = context.WithCancel(context.Background()) + } else { + // If botCancel is already set, use a non-cancellable context for this redundant call. + // This prevents overwriting the active botCancel and causing a goroutine leak from the previous call. + logger.Warning("TgBot OnReceive called concurrently. Using background context for redundant call.") + ctx = context.Background() // <<< ИЗМЕНЕНИЕ + } - botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error { - delete(userStates, message.Chat.ID) - t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.keyboardClosed"), tu.ReplyKeyboardRemove()) - return nil - }, th.TextEqual(t.I18nBot("tgbot.buttons.closeKeyboard"))) + tgBotMutex.Unlock() - botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error { - // Use goroutine with worker pool for concurrent command processing - go func() { - messageWorkerPool <- struct{}{} // Acquire worker - defer func() { <-messageWorkerPool }() // Release worker + // Get updates channel using the context. + updates, _ := bot.UpdatesViaLongPolling(ctx, ¶ms) + botWG.Go(func() { + botHandler, _ = th.NewBotHandler(bot, updates) + botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error { delete(userStates, message.Chat.ID) - t.answerCommand(&message, message.Chat.ID, checkAdmin(message.From.ID)) - }() - return nil - }, th.AnyCommand()) + t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.keyboardClosed"), tu.ReplyKeyboardRemove()) + return nil + }, th.TextEqual(t.I18nBot("tgbot.buttons.closeKeyboard"))) - botHandler.HandleCallbackQuery(func(ctx *th.Context, query telego.CallbackQuery) error { - // Use goroutine with worker pool for concurrent callback processing - go func() { - messageWorkerPool <- struct{}{} // Acquire worker - defer func() { <-messageWorkerPool }() // Release worker + botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error { + // Use goroutine with worker pool for concurrent command processing + go func() { + messageWorkerPool <- struct{}{} // Acquire worker + defer func() { <-messageWorkerPool }() // Release worker - delete(userStates, query.Message.GetChat().ID) - t.answerCallback(&query, checkAdmin(query.From.ID)) - }() - return nil - }, th.AnyCallbackQueryWithMessage()) - - botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error { - if userState, exists := userStates[message.Chat.ID]; exists { - switch userState { - case "awaiting_id": - if client_Id == strings.TrimSpace(message.Text) { - t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) - inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID) - message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol) - t.addClient(message.Chat.ID, message_text) - return nil - } - - client_Id = strings.TrimSpace(message.Text) - if t.isSingleWord(client_Id) { - userStates[message.Chat.ID] = "awaiting_id" - - cancel_btn_markup := tu.InlineKeyboard( - tu.InlineKeyboardRow( - tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"), - ), - ) - - t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup) - } else { - t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_id"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) - inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID) - message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol) - t.addClient(message.Chat.ID, message_text) - } - case "awaiting_password_tr": - if client_TrPassword == strings.TrimSpace(message.Text) { - t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) - return nil - } - - client_TrPassword = strings.TrimSpace(message.Text) - if t.isSingleWord(client_TrPassword) { - userStates[message.Chat.ID] = "awaiting_password_tr" - - cancel_btn_markup := tu.InlineKeyboard( - tu.InlineKeyboardRow( - tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"), - ), - ) - - t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup) - } else { - t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_password"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) - inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID) - message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol) - t.addClient(message.Chat.ID, message_text) - } - case "awaiting_password_sh": - if client_ShPassword == strings.TrimSpace(message.Text) { - t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) - return nil - } - - client_ShPassword = strings.TrimSpace(message.Text) - if t.isSingleWord(client_ShPassword) { - userStates[message.Chat.ID] = "awaiting_password_sh" - - cancel_btn_markup := tu.InlineKeyboard( - tu.InlineKeyboardRow( - tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"), - ), - ) - - t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup) - } else { - t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_password"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) - inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID) - message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol) - t.addClient(message.Chat.ID, message_text) - } - case "awaiting_email": - if client_Email == strings.TrimSpace(message.Text) { - t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) - return nil - } - - client_Email = strings.TrimSpace(message.Text) - if t.isSingleWord(client_Email) { - userStates[message.Chat.ID] = "awaiting_email" - - cancel_btn_markup := tu.InlineKeyboard( - tu.InlineKeyboardRow( - tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"), - ), - ) - - t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup) - } else { - t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_email"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) - inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID) - message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol) - t.addClient(message.Chat.ID, message_text) - } - case "awaiting_comment": - if client_Comment == strings.TrimSpace(message.Text) { - t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) - return nil - } - - client_Comment = strings.TrimSpace(message.Text) - t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_comment"), 3, tu.ReplyKeyboardRemove()) delete(userStates, message.Chat.ID) - inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID) - message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol) - t.addClient(message.Chat.ID, message_text) - } + t.answerCommand(&message, message.Chat.ID, checkAdmin(message.From.ID)) + }() + return nil + }, th.AnyCommand()) - } else { - if message.UsersShared != nil { - if checkAdmin(message.From.ID) { - for _, sharedUser := range message.UsersShared.Users { - userID := sharedUser.UserID - needRestart, err := t.inboundService.SetClientTelegramUserID(message.UsersShared.RequestID, userID) - if needRestart { - t.xrayService.SetToNeedRestart() - } - output := "" - if err != nil { - output += t.I18nBot("tgbot.messages.selectUserFailed") - } else { - output += t.I18nBot("tgbot.messages.userSaved") - } - t.SendMsgToTgbot(message.Chat.ID, output, tu.ReplyKeyboardRemove()) + botHandler.HandleCallbackQuery(func(ctx *th.Context, query telego.CallbackQuery) error { + // Use goroutine with worker pool for concurrent callback processing + go func() { + messageWorkerPool <- struct{}{} // Acquire worker + defer func() { <-messageWorkerPool }() // Release worker + + delete(userStates, query.Message.GetChat().ID) + t.answerCallback(&query, checkAdmin(query.From.ID)) + }() + return nil + }, th.AnyCallbackQueryWithMessage()) + + botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error { + if userState, exists := userStates[message.Chat.ID]; exists { + switch userState { + case "awaiting_id": + if client_Id == strings.TrimSpace(message.Text) { + t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) + delete(userStates, message.Chat.ID) + inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID) + message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol) + t.addClient(message.Chat.ID, message_text) + return nil + } + + client_Id = strings.TrimSpace(message.Text) + if t.isSingleWord(client_Id) { + userStates[message.Chat.ID] = "awaiting_id" + + cancel_btn_markup := tu.InlineKeyboard( + tu.InlineKeyboardRow( + tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"), + ), + ) + + t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup) + } else { + t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_id"), 3, tu.ReplyKeyboardRemove()) + delete(userStates, message.Chat.ID) + inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID) + message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol) + t.addClient(message.Chat.ID, message_text) + } + case "awaiting_password_tr": + if client_TrPassword == strings.TrimSpace(message.Text) { + t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) + delete(userStates, message.Chat.ID) + return nil + } + + client_TrPassword = strings.TrimSpace(message.Text) + if t.isSingleWord(client_TrPassword) { + userStates[message.Chat.ID] = "awaiting_password_tr" + + cancel_btn_markup := tu.InlineKeyboard( + tu.InlineKeyboardRow( + tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"), + ), + ) + + t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup) + } else { + t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_password"), 3, tu.ReplyKeyboardRemove()) + delete(userStates, message.Chat.ID) + inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID) + message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol) + t.addClient(message.Chat.ID, message_text) + } + case "awaiting_password_sh": + if client_ShPassword == strings.TrimSpace(message.Text) { + t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) + delete(userStates, message.Chat.ID) + return nil + } + + client_ShPassword = strings.TrimSpace(message.Text) + if t.isSingleWord(client_ShPassword) { + userStates[message.Chat.ID] = "awaiting_password_sh" + + cancel_btn_markup := tu.InlineKeyboard( + tu.InlineKeyboardRow( + tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"), + ), + ) + + t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup) + } else { + t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_password"), 3, tu.ReplyKeyboardRemove()) + delete(userStates, message.Chat.ID) + inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID) + message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol) + t.addClient(message.Chat.ID, message_text) + } + case "awaiting_email": + if client_Email == strings.TrimSpace(message.Text) { + t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) + delete(userStates, message.Chat.ID) + return nil + } + + client_Email = strings.TrimSpace(message.Text) + if t.isSingleWord(client_Email) { + userStates[message.Chat.ID] = "awaiting_email" + + cancel_btn_markup := tu.InlineKeyboard( + tu.InlineKeyboardRow( + tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"), + ), + ) + + t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup) + } else { + t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_email"), 3, tu.ReplyKeyboardRemove()) + delete(userStates, message.Chat.ID) + inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID) + message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol) + t.addClient(message.Chat.ID, message_text) + } + case "awaiting_comment": + if client_Comment == strings.TrimSpace(message.Text) { + t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) + delete(userStates, message.Chat.ID) + return nil + } + + client_Comment = strings.TrimSpace(message.Text) + t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_comment"), 3, tu.ReplyKeyboardRemove()) + delete(userStates, message.Chat.ID) + inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID) + message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol) + t.addClient(message.Chat.ID, message_text) + } + + } else { + if message.UsersShared != nil { + if checkAdmin(message.From.ID) { + for _, sharedUser := range message.UsersShared.Users { + userID := sharedUser.UserID + needRestart, err := t.inboundService.SetClientTelegramUserID(message.UsersShared.RequestID, userID) + if needRestart { + t.xrayService.SetToNeedRestart() + } + output := "" + if err != nil { + output += t.I18nBot("tgbot.messages.selectUserFailed") + } else { + output += t.I18nBot("tgbot.messages.userSaved") + } + t.SendMsgToTgbot(message.Chat.ID, output, tu.ReplyKeyboardRemove()) + } + } else { + t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.noResult"), tu.ReplyKeyboardRemove()) } - } else { - t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.noResult"), tu.ReplyKeyboardRemove()) } } - } - return nil - }, th.AnyMessage()) + return nil + }, th.AnyMessage()) - botHandler.Start() + botHandler.Start() + }) } // answerCommand processes incoming command messages from Telegram users.