diff --git a/go.mod b/go.mod
index 2f81efb8..ab54cbaf 100644
--- a/go.mod
+++ b/go.mod
@@ -9,6 +9,7 @@ require (
github.com/go-ldap/ldap/v3 v3.4.12
github.com/goccy/go-json v0.10.5
github.com/google/uuid v1.6.0
+ github.com/gorilla/websocket v1.5.3
github.com/joho/godotenv v1.5.1
github.com/mymmrac/telego v1.3.1
github.com/nicksnyder/go-i18n/v2 v2.6.0
@@ -51,7 +52,6 @@ require (
github.com/gorilla/context v1.1.2 // indirect
github.com/gorilla/securecookie v1.1.2 // indirect
github.com/gorilla/sessions v1.4.0 // indirect
- github.com/gorilla/websocket v1.5.3 // indirect
github.com/grbit/go-json v0.11.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
diff --git a/web.rar b/web.rar
new file mode 100644
index 00000000..8e3093b7
Binary files /dev/null and b/web.rar differ
diff --git a/web/assets/js/websocket.js b/web/assets/js/websocket.js
new file mode 100644
index 00000000..2a551a8a
--- /dev/null
+++ b/web/assets/js/websocket.js
@@ -0,0 +1,138 @@
+/**
+ * WebSocket client for real-time updates
+ */
+class WebSocketClient {
+ constructor(basePath = '') {
+ this.basePath = basePath;
+ this.ws = null;
+ this.reconnectAttempts = 0;
+ this.maxReconnectAttempts = 10;
+ this.reconnectDelay = 1000;
+ this.listeners = new Map();
+ this.isConnected = false;
+ this.shouldReconnect = true;
+ }
+
+ connect() {
+ if (this.ws && this.ws.readyState === WebSocket.OPEN) {
+ return;
+ }
+
+ const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
+ const wsUrl = `${protocol}//${window.location.host}${this.basePath}ws`;
+
+ try {
+ this.ws = new WebSocket(wsUrl);
+
+ this.ws.onopen = () => {
+ console.log('WebSocket connected');
+ this.isConnected = true;
+ this.reconnectAttempts = 0;
+ this.emit('connected');
+ };
+
+ this.ws.onmessage = (event) => {
+ try {
+ // Validate message size (prevent memory issues)
+ const maxMessageSize = 10 * 1024 * 1024; // 10MB
+ if (event.data && event.data.length > maxMessageSize) {
+ console.error('WebSocket message too large:', event.data.length, 'bytes');
+ this.ws.close();
+ return;
+ }
+
+ const message = JSON.parse(event.data);
+ if (!message || typeof message !== 'object') {
+ console.error('Invalid WebSocket message format');
+ return;
+ }
+
+ this.handleMessage(message);
+ } catch (e) {
+ console.error('Failed to parse WebSocket message:', e);
+ }
+ };
+
+ this.ws.onerror = (error) => {
+ console.error('WebSocket error:', error);
+ this.emit('error', error);
+ };
+
+ this.ws.onclose = () => {
+ console.log('WebSocket disconnected');
+ this.isConnected = false;
+ this.emit('disconnected');
+
+ if (this.shouldReconnect && this.reconnectAttempts < this.maxReconnectAttempts) {
+ this.reconnectAttempts++;
+ const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
+ console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
+ setTimeout(() => this.connect(), delay);
+ }
+ };
+ } catch (e) {
+ console.error('Failed to create WebSocket connection:', e);
+ this.emit('error', e);
+ }
+ }
+
+ handleMessage(message) {
+ const { type, payload, time } = message;
+
+ // Emit to specific type listeners
+ this.emit(type, payload, time);
+
+ // Emit to all listeners
+ this.emit('message', { type, payload, time });
+ }
+
+ on(event, callback) {
+ if (!this.listeners.has(event)) {
+ this.listeners.set(event, []);
+ }
+ this.listeners.get(event).push(callback);
+ }
+
+ off(event, callback) {
+ if (!this.listeners.has(event)) {
+ return;
+ }
+ const callbacks = this.listeners.get(event);
+ const index = callbacks.indexOf(callback);
+ if (index > -1) {
+ callbacks.splice(index, 1);
+ }
+ }
+
+ emit(event, ...args) {
+ if (this.listeners.has(event)) {
+ this.listeners.get(event).forEach(callback => {
+ try {
+ callback(...args);
+ } catch (e) {
+ console.error('Error in WebSocket event handler:', e);
+ }
+ });
+ }
+ }
+
+ disconnect() {
+ this.shouldReconnect = false;
+ if (this.ws) {
+ this.ws.close();
+ this.ws = null;
+ }
+ }
+
+ send(data) {
+ if (this.ws && this.ws.readyState === WebSocket.OPEN) {
+ this.ws.send(JSON.stringify(data));
+ } else {
+ console.warn('WebSocket is not connected');
+ }
+ }
+}
+
+// Create global WebSocket client instance
+// Safely get basePath from global scope (defined in page.html)
+window.wsClient = new WebSocketClient(typeof basePath !== 'undefined' ? basePath : '');
diff --git a/web/controller/inbound.go b/web/controller/inbound.go
index eeb160d6..8317de31 100644
--- a/web/controller/inbound.go
+++ b/web/controller/inbound.go
@@ -8,6 +8,7 @@ import (
"github.com/mhsanaei/3x-ui/v2/database/model"
"github.com/mhsanaei/3x-ui/v2/web/service"
"github.com/mhsanaei/3x-ui/v2/web/session"
+ "github.com/mhsanaei/3x-ui/v2/web/websocket"
"github.com/gin-gonic/gin"
)
@@ -125,6 +126,9 @@ func (a *InboundController) addInbound(c *gin.Context) {
if needRestart {
a.xrayService.SetToNeedRestart()
}
+ // Broadcast inbounds update via WebSocket
+ inbounds, _ := a.inboundService.GetInbounds(user.Id)
+ websocket.BroadcastInbounds(inbounds)
}
// delInbound deletes an inbound configuration by its ID.
@@ -143,6 +147,10 @@ func (a *InboundController) delInbound(c *gin.Context) {
if needRestart {
a.xrayService.SetToNeedRestart()
}
+ // Broadcast inbounds update via WebSocket
+ user := session.GetLoginUser(c)
+ inbounds, _ := a.inboundService.GetInbounds(user.Id)
+ websocket.BroadcastInbounds(inbounds)
}
// updateInbound updates an existing inbound configuration.
@@ -169,6 +177,10 @@ func (a *InboundController) updateInbound(c *gin.Context) {
if needRestart {
a.xrayService.SetToNeedRestart()
}
+ // Broadcast inbounds update via WebSocket
+ user := session.GetLoginUser(c)
+ inbounds, _ := a.inboundService.GetInbounds(user.Id)
+ websocket.BroadcastInbounds(inbounds)
}
// getClientIps retrieves the IP addresses associated with a client by email.
diff --git a/web/controller/server.go b/web/controller/server.go
index 292ef338..5b39700e 100644
--- a/web/controller/server.go
+++ b/web/controller/server.go
@@ -9,6 +9,7 @@ import (
"github.com/mhsanaei/3x-ui/v2/web/global"
"github.com/mhsanaei/3x-ui/v2/web/service"
+ "github.com/mhsanaei/3x-ui/v2/web/websocket"
"github.com/gin-gonic/gin"
)
@@ -67,6 +68,8 @@ func (a *ServerController) refreshStatus() {
// collect cpu history when status is fresh
if a.lastStatus != nil {
a.serverService.AppendCpuSample(time.Now(), a.lastStatus.Cpu)
+ // Broadcast status update via WebSocket
+ websocket.BroadcastStatus(a.lastStatus)
}
}
@@ -155,9 +158,16 @@ func (a *ServerController) stopXrayService(c *gin.Context) {
err := a.serverService.StopXrayService()
if err != nil {
jsonMsg(c, I18nWeb(c, "pages.xray.stopError"), err)
+ websocket.BroadcastXrayState("error", err.Error())
return
}
jsonMsg(c, I18nWeb(c, "pages.xray.stopSuccess"), err)
+ websocket.BroadcastXrayState("stop", "")
+ websocket.BroadcastNotification(
+ I18nWeb(c, "pages.xray.stopSuccess"),
+ "Xray service has been stopped",
+ "warning",
+ )
}
// restartXrayService restarts the Xray service.
@@ -165,9 +175,16 @@ func (a *ServerController) restartXrayService(c *gin.Context) {
err := a.serverService.RestartXrayService()
if err != nil {
jsonMsg(c, I18nWeb(c, "pages.xray.restartError"), err)
+ websocket.BroadcastXrayState("error", err.Error())
return
}
jsonMsg(c, I18nWeb(c, "pages.xray.restartSuccess"), err)
+ websocket.BroadcastXrayState("running", "")
+ websocket.BroadcastNotification(
+ I18nWeb(c, "pages.xray.restartSuccess"),
+ "Xray service has been restarted successfully",
+ "success",
+ )
}
// getLogs retrieves the application logs based on count, level, and syslog filters.
diff --git a/web/controller/websocket.go b/web/controller/websocket.go
new file mode 100644
index 00000000..b3f9622c
--- /dev/null
+++ b/web/controller/websocket.go
@@ -0,0 +1,232 @@
+package controller
+
+import (
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/mhsanaei/3x-ui/v2/logger"
+ "github.com/mhsanaei/3x-ui/v2/util/common"
+ "github.com/mhsanaei/3x-ui/v2/web/session"
+ "github.com/mhsanaei/3x-ui/v2/web/websocket"
+
+ "github.com/gin-gonic/gin"
+ ws "github.com/gorilla/websocket"
+)
+
+const (
+ // Time allowed to write a message to the peer
+ writeWait = 10 * time.Second
+
+ // Time allowed to read the next pong message from the peer
+ pongWait = 60 * time.Second
+
+ // Send pings to peer with this period (must be less than pongWait)
+ pingPeriod = (pongWait * 9) / 10
+
+ // Maximum message size allowed from peer
+ maxMessageSize = 512
+)
+
+var upgrader = ws.Upgrader{
+ ReadBufferSize: 4096, // Увеличено с 1024 для лучшей производительности
+ WriteBufferSize: 4096, // Увеличено с 1024 для лучшей производительности
+ CheckOrigin: func(r *http.Request) bool {
+ // Check origin for security
+ origin := r.Header.Get("Origin")
+ if origin == "" {
+ // Allow connections without Origin header (same-origin requests)
+ return true
+ }
+ // Get the host from the request
+ host := r.Host
+ // Extract scheme and host from origin
+ originURL := origin
+ // Simple check: origin should match the request host
+ // This prevents cross-origin WebSocket hijacking
+ if strings.HasPrefix(originURL, "http://") || strings.HasPrefix(originURL, "https://") {
+ // Extract host from origin
+ originHost := strings.TrimPrefix(strings.TrimPrefix(originURL, "http://"), "https://")
+ if idx := strings.Index(originHost, "/"); idx != -1 {
+ originHost = originHost[:idx]
+ }
+ if idx := strings.Index(originHost, ":"); idx != -1 {
+ originHost = originHost[:idx]
+ }
+ // Compare hosts (without port)
+ requestHost := host
+ if idx := strings.Index(requestHost, ":"); idx != -1 {
+ requestHost = requestHost[:idx]
+ }
+ return originHost == requestHost || originHost == "" || requestHost == ""
+ }
+ return false
+ },
+}
+
+// WebSocketController handles WebSocket connections for real-time updates
+type WebSocketController struct {
+ BaseController
+ hub *websocket.Hub
+}
+
+// NewWebSocketController creates a new WebSocket controller
+func NewWebSocketController(hub *websocket.Hub) *WebSocketController {
+ return &WebSocketController{
+ hub: hub,
+ }
+}
+
+// HandleWebSocket handles WebSocket connections
+func (w *WebSocketController) HandleWebSocket(c *gin.Context) {
+ // Check authentication
+ if !session.IsLogin(c) {
+ logger.Warningf("Unauthorized WebSocket connection attempt from %s", getRemoteIp(c))
+ c.AbortWithStatus(http.StatusUnauthorized)
+ return
+ }
+
+ // Upgrade connection to WebSocket
+ conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
+ if err != nil {
+ logger.Error("Failed to upgrade WebSocket connection:", err)
+ return
+ }
+
+ // Create client
+ clientID := uuid.New().String()
+ client := &websocket.Client{
+ ID: clientID,
+ Hub: w.hub,
+ Send: make(chan []byte, 512), // Увеличено с 256 до 512 для предотвращения переполнения
+ Topics: make(map[websocket.MessageType]bool),
+ }
+
+ // Register client
+ w.hub.Register(client)
+ logger.Infof("WebSocket client %s registered from %s", clientID, getRemoteIp(c))
+
+ // Start goroutines for reading and writing
+ go w.writePump(client, conn)
+ go w.readPump(client, conn)
+}
+
+// readPump pumps messages from the WebSocket connection to the hub
+func (w *WebSocketController) readPump(client *websocket.Client, conn *ws.Conn) {
+ defer func() {
+ if r := common.Recover("WebSocket readPump panic"); r != nil {
+ logger.Error("WebSocket readPump panic recovered:", r)
+ }
+ w.hub.Unregister(client)
+ conn.Close()
+ }()
+
+ conn.SetReadDeadline(time.Now().Add(pongWait))
+ conn.SetPongHandler(func(string) error {
+ conn.SetReadDeadline(time.Now().Add(pongWait))
+ return nil
+ })
+ conn.SetReadLimit(maxMessageSize)
+
+ for {
+ _, message, err := conn.ReadMessage()
+ if err != nil {
+ if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) {
+ logger.Debugf("WebSocket read error for client %s: %v", client.ID, err)
+ }
+ break
+ }
+
+ // Validate message size
+ if len(message) > maxMessageSize {
+ logger.Warningf("WebSocket message from client %s exceeds max size: %d bytes", client.ID, len(message))
+ continue
+ }
+
+ // Handle incoming messages (e.g., subscription requests)
+ // For now, we'll just log them
+ logger.Debugf("Received WebSocket message from client %s: %s", client.ID, string(message))
+ }
+}
+
+// writePump pumps messages from the hub to the WebSocket connection
+func (w *WebSocketController) writePump(client *websocket.Client, conn *ws.Conn) {
+ ticker := time.NewTicker(pingPeriod)
+ defer func() {
+ if r := common.Recover("WebSocket writePump panic"); r != nil {
+ logger.Error("WebSocket writePump panic recovered:", r)
+ }
+ ticker.Stop()
+ conn.Close()
+ }()
+
+ for {
+ select {
+ case message, ok := <-client.Send:
+ conn.SetWriteDeadline(time.Now().Add(writeWait))
+ if !ok {
+ // Hub closed the channel
+ conn.WriteMessage(ws.CloseMessage, []byte{})
+ return
+ }
+
+ writer, err := conn.NextWriter(ws.TextMessage)
+ if err != nil {
+ logger.Debugf("WebSocket write error for client %s: %v", client.ID, err)
+ return
+ }
+ writer.Write(message)
+
+ // Оптимизация: батчинг сообщений с умным ограничением
+ // Обрабатываем накопленные сообщения, но ограничиваем для предотвращения задержек
+ n := len(client.Send)
+ maxQueued := 20 // Увеличено с 10 до 20 для лучшей пропускной способности
+ if n > maxQueued {
+ // Пропускаем старые сообщения, оставляем только последние для актуальности
+ skipped := n - maxQueued
+ for i := 0; i < skipped; i++ {
+ select {
+ case <-client.Send:
+ // Пропускаем старое сообщение
+ default:
+ // Канал закрыт или пуст, прекращаем пропуск
+ goto skipDone
+ }
+ }
+ skipDone:
+ n = len(client.Send) // Обновляем количество после пропуска
+ }
+
+ // Батчинг: отправляем несколько сообщений в одном фрейме
+ // Безопасное чтение с проверкой закрытия канала
+ for i := 0; i < n; i++ {
+ select {
+ case msg, ok := <-client.Send:
+ if !ok {
+ // Канал закрыт, выходим
+ return
+ }
+ writer.Write([]byte{'\n'})
+ writer.Write(msg)
+ default:
+ // Больше нет сообщений в очереди, прекращаем батчинг
+ goto batchDone
+ }
+ }
+ batchDone:
+
+ if err := writer.Close(); err != nil {
+ logger.Debugf("WebSocket writer close error for client %s: %v", client.ID, err)
+ return
+ }
+
+ case <-ticker.C:
+ conn.SetWriteDeadline(time.Now().Add(writeWait))
+ if err := conn.WriteMessage(ws.PingMessage, nil); err != nil {
+ logger.Debugf("WebSocket ping error for client %s: %v", client.ID, err)
+ return
+ }
+ }
+ }
+}
diff --git a/web/global/global.go b/web/global/global.go
index 025fa081..f72c7bfe 100644
--- a/web/global/global.go
+++ b/web/global/global.go
@@ -17,6 +17,7 @@ var (
type WebServer interface {
GetCron() *cron.Cron // Get the cron scheduler
GetCtx() context.Context // Get the server context
+ GetWSHub() interface{} // Get the WebSocket hub (using interface{} to avoid circular dependency)
}
// SubServer interface defines methods for accessing the subscription server instance.
diff --git a/web/html/common/page.html b/web/html/common/page.html
index c0a7ca63..0af63afb 100644
--- a/web/html/common/page.html
+++ b/web/html/common/page.html
@@ -49,6 +49,7 @@
const basePath = '{{ .base_path }}';
axios.defaults.baseURL = basePath;
+
{{ end }}
{{ define "page/body_end" }}
diff --git a/web/html/form/protocol/vless.html b/web/html/form/protocol/vless.html
index 83950063..65e12a68 100644
--- a/web/html/form/protocol/vless.html
+++ b/web/html/form/protocol/vless.html
@@ -39,6 +39,7 @@
+
diff --git a/web/html/inbounds.html b/web/html/inbounds.html
index 86bde2c8..fcaaad03 100644
--- a/web/html/inbounds.html
+++ b/web/html/inbounds.html
@@ -1567,13 +1567,97 @@
}
this.loading();
this.getDefaultSettings();
- if (this.isRefreshEnabled) {
- this.startDataRefreshLoop();
+
+ // Initial data fetch
+ this.getDBInbounds().then(() => {
+ this.loading(false);
+ });
+
+ // Setup WebSocket for real-time updates
+ if (window.wsClient) {
+ window.wsClient.connect();
+
+ // Listen for inbounds updates
+ window.wsClient.on('inbounds', (payload) => {
+ if (payload && Array.isArray(payload)) {
+ // Use setInbounds to properly convert to DBInbound objects with methods
+ this.setInbounds(payload);
+ this.searchInbounds(this.searchKey);
+ }
+ });
+
+ // Listen for traffic updates
+ window.wsClient.on('traffic', (payload) => {
+ if (payload && payload.clientTraffics) {
+ // Update client traffic statistics
+ payload.clientTraffics.forEach(clientTraffic => {
+ const dbInbound = this.dbInbounds.find(ib => {
+ const clients = this.getInboundClients(ib);
+ return clients && clients.some(c => c.email === clientTraffic.email);
+ });
+ if (dbInbound && dbInbound.clientStats) {
+ const stats = dbInbound.clientStats.find(s => s.email === clientTraffic.email);
+ if (stats) {
+ stats.up = clientTraffic.up || stats.up;
+ stats.down = clientTraffic.down || stats.down;
+ stats.total = clientTraffic.total || stats.total;
+ }
+ }
+ });
+ }
+
+ // Update online clients list in real-time
+ if (payload && Array.isArray(payload.onlineClients)) {
+ this.onlineClients = payload.onlineClients;
+ // Recalculate client counts to update online status
+ this.dbInbounds.forEach(dbInbound => {
+ const inbound = this.inbounds.find(ib => ib.id === dbInbound.id);
+ if (inbound && this.clientCount[dbInbound.id]) {
+ this.clientCount[dbInbound.id] = this.getClientCounts(dbInbound, inbound);
+ }
+ });
+ }
+
+ // Update last online map in real-time
+ if (payload && payload.lastOnlineMap && typeof payload.lastOnlineMap === 'object') {
+ this.lastOnlineMap = { ...this.lastOnlineMap, ...payload.lastOnlineMap };
+ }
+ });
+
+ // Listen for notifications
+ window.wsClient.on('notification', (payload) => {
+ if (payload && payload.title) {
+ const type = payload.level || 'info';
+ this.$notification[type]({
+ message: payload.title,
+ description: payload.message || '',
+ duration: 4.5,
+ });
+ }
+ });
+
+ // Fallback to polling if WebSocket fails
+ window.wsClient.on('error', () => {
+ console.warn('WebSocket connection failed, falling back to polling');
+ if (this.isRefreshEnabled) {
+ this.startDataRefreshLoop();
+ }
+ });
+
+ window.wsClient.on('disconnected', () => {
+ if (window.wsClient.reconnectAttempts >= window.wsClient.maxReconnectAttempts) {
+ console.warn('WebSocket reconnection failed, falling back to polling');
+ if (this.isRefreshEnabled) {
+ this.startDataRefreshLoop();
+ }
+ }
+ });
+ } else {
+ // Fallback to polling if WebSocket is not available
+ if (this.isRefreshEnabled) {
+ this.startDataRefreshLoop();
+ }
}
- else {
- this.getDBInbounds();
- }
- this.loading(false);
},
computed: {
total() {
diff --git a/web/html/index.html b/web/html/index.html
index 9cbb019d..c82e9503 100644
--- a/web/html/index.html
+++ b/web/html/index.html
@@ -1102,6 +1102,20 @@
});
fileInput.click();
},
+ startPolling() {
+ // Fallback polling mechanism
+ const pollInterval = setInterval(async () => {
+ if (window.wsClient && window.wsClient.isConnected) {
+ clearInterval(pollInterval);
+ return;
+ }
+ try {
+ await this.getStatus();
+ } catch (e) {
+ console.error(e);
+ }
+ }, 2000);
+ },
},
async mounted() {
if (window.location.protocol !== "https:") {
@@ -1113,13 +1127,67 @@
this.ipLimitEnable = msg.obj.ipLimitEnable;
}
- while (true) {
- try {
- await this.getStatus();
- } catch (e) {
- console.error(e);
- }
- await PromiseUtil.sleep(2000);
+ // Initial status fetch
+ await this.getStatus();
+
+ // Setup WebSocket for real-time updates
+ if (window.wsClient) {
+ window.wsClient.connect();
+
+ // Listen for status updates
+ window.wsClient.on('status', (payload) => {
+ this.setStatus(payload);
+ });
+
+ // Listen for Xray state changes
+ window.wsClient.on('xray_state', (payload) => {
+ if (this.status && this.status.xray) {
+ this.status.xray.state = payload.state;
+ this.status.xray.errorMsg = payload.errorMsg || '';
+ switch (payload.state) {
+ case 'running':
+ this.status.xray.color = "green";
+ this.status.xray.stateMsg = '{{ i18n "pages.index.xrayStatusRunning" }}';
+ break;
+ case 'stop':
+ this.status.xray.color = "orange";
+ this.status.xray.stateMsg = '{{ i18n "pages.index.xrayStatusStop" }}';
+ break;
+ case 'error':
+ this.status.xray.color = "red";
+ this.status.xray.stateMsg = '{{ i18n "pages.index.xrayStatusError" }}';
+ break;
+ }
+ }
+ });
+
+ // Listen for notifications
+ window.wsClient.on('notification', (payload) => {
+ if (payload && payload.title) {
+ const type = payload.level || 'info';
+ this.$notification[type]({
+ message: payload.title,
+ description: payload.message || '',
+ duration: 4.5,
+ });
+ }
+ });
+
+ // Fallback to polling if WebSocket fails
+ window.wsClient.on('error', () => {
+ console.warn('WebSocket connection failed, falling back to polling');
+ this.startPolling();
+ });
+
+ window.wsClient.on('disconnected', () => {
+ if (window.wsClient.reconnectAttempts >= window.wsClient.maxReconnectAttempts) {
+ console.warn('WebSocket reconnection failed, falling back to polling');
+ this.startPolling();
+ }
+ });
+ } else {
+ // Fallback to polling if WebSocket is not available
+ this.startPolling();
}
},
});
diff --git a/web/job/ldap_sync_job.go b/web/job/ldap_sync_job.go
index 6642bbcf..a947eb73 100644
--- a/web/job/ldap_sync_job.go
+++ b/web/job/ldap_sync_job.go
@@ -322,66 +322,6 @@ func (j *LdapSyncJob) clientsToJSON(clients []model.Client) string {
return b.String()
}
-// ensureClientExists adds client with defaults to inbound tag if not present
-func (j *LdapSyncJob) ensureClientExists(inboundTag string, email string, defGB int, defExpiryDays int, defLimitIP int) {
- inbounds, err := j.inboundService.GetAllInbounds()
- if err != nil {
- logger.Warning("ensureClientExists: get inbounds failed:", err)
- return
- }
- var target *model.Inbound
- for _, ib := range inbounds {
- if ib.Tag == inboundTag {
- target = ib
- break
- }
- }
- if target == nil {
- logger.Debugf("ensureClientExists: inbound tag %s not found", inboundTag)
- return
- }
- // check if email already exists in this inbound
- clients, err := j.inboundService.GetClients(target)
- if err == nil {
- for _, c := range clients {
- if c.Email == email {
- return
- }
- }
- }
-
- // build new client according to protocol
- newClient := model.Client{
- Email: email,
- Enable: true,
- LimitIP: defLimitIP,
- TotalGB: int64(defGB),
- }
- if defExpiryDays > 0 {
- newClient.ExpiryTime = time.Now().Add(time.Duration(defExpiryDays) * 24 * time.Hour).UnixMilli()
- }
-
- switch target.Protocol {
- case model.Trojan:
- newClient.Password = uuid.NewString()
- case model.Shadowsocks:
- newClient.Password = uuid.NewString()
- default: // VMESS/VLESS and others using ID
- newClient.ID = uuid.NewString()
- }
-
- // prepare inbound payload with only the new client
- payload := &model.Inbound{Id: target.Id}
- payload.Settings = `{"clients":[` + j.clientToJSON(newClient) + `]}`
-
- if _, err := j.inboundService.AddInboundClient(payload); err != nil {
- logger.Warning("ensureClientExists: add client failed:", err)
- } else {
- j.xrayService.SetToNeedRestart()
- logger.Infof("LDAP auto-create: %s in %s", email, inboundTag)
- }
-}
-
// clientToJSON serializes minimal client fields to JSON object string without extra deps
func (j *LdapSyncJob) clientToJSON(c model.Client) string {
// construct minimal JSON manually to avoid importing json for simple case
diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go
index a9affb4b..2f331cd6 100644
--- a/web/job/xray_traffic_job.go
+++ b/web/job/xray_traffic_job.go
@@ -5,6 +5,7 @@ import (
"github.com/mhsanaei/3x-ui/v2/logger"
"github.com/mhsanaei/3x-ui/v2/web/service"
+ "github.com/mhsanaei/3x-ui/v2/web/websocket"
"github.com/mhsanaei/3x-ui/v2/xray"
"github.com/valyala/fasthttp"
@@ -48,6 +49,23 @@ func (j *XrayTrafficJob) Run() {
if needRestart0 || needRestart1 {
j.xrayService.SetToNeedRestart()
}
+
+ // Get online clients and last online map for real-time status updates
+ onlineClients := j.inboundService.GetOnlineClients()
+ lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
+ if err != nil {
+ logger.Warning("get clients last online failed:", err)
+ lastOnlineMap = make(map[string]int64)
+ }
+
+ // Broadcast traffic update via WebSocket
+ trafficUpdate := map[string]interface{}{
+ "traffics": traffics,
+ "clientTraffics": clientTraffics,
+ "onlineClients": onlineClients,
+ "lastOnlineMap": lastOnlineMap,
+ }
+ websocket.BroadcastTraffic(trafficUpdate)
}
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {
diff --git a/web/web.go b/web/web.go
index c7a2ce1f..7dae08e3 100644
--- a/web/web.go
+++ b/web/web.go
@@ -25,6 +25,7 @@ import (
"github.com/mhsanaei/3x-ui/v2/web/middleware"
"github.com/mhsanaei/3x-ui/v2/web/network"
"github.com/mhsanaei/3x-ui/v2/web/service"
+ "github.com/mhsanaei/3x-ui/v2/web/websocket"
"github.com/gin-contrib/gzip"
"github.com/gin-contrib/sessions"
@@ -98,11 +99,14 @@ type Server struct {
index *controller.IndexController
panel *controller.XUIController
api *controller.APIController
+ ws *controller.WebSocketController
xrayService service.XrayService
settingService service.SettingService
tgbotService service.Tgbot
+ wsHub *websocket.Hub
+
cron *cron.Cron
ctx context.Context
@@ -266,6 +270,15 @@ func (s *Server) initRouter() (*gin.Engine, error) {
s.panel = controller.NewXUIController(g)
s.api = controller.NewAPIController(g)
+ // Initialize WebSocket hub
+ s.wsHub = websocket.NewHub()
+ go s.wsHub.Run()
+
+ // Initialize WebSocket controller
+ s.ws = controller.NewWebSocketController(s.wsHub)
+ // Register WebSocket route with basePath (g already has basePath prefix)
+ g.GET("/ws", s.ws.HandleWebSocket)
+
// Chrome DevTools endpoint for debugging web apps
engine.GET("/.well-known/appspecific/com.chrome.devtools.json", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{})
@@ -448,6 +461,10 @@ func (s *Server) Stop() error {
if s.tgbotService.IsRunning() {
s.tgbotService.Stop()
}
+ // Gracefully stop WebSocket hub
+ if s.wsHub != nil {
+ s.wsHub.Stop()
+ }
var err1 error
var err2 error
if s.httpServer != nil {
@@ -468,3 +485,8 @@ func (s *Server) GetCtx() context.Context {
func (s *Server) GetCron() *cron.Cron {
return s.cron
}
+
+// GetWSHub returns the WebSocket hub instance.
+func (s *Server) GetWSHub() interface{} {
+ return s.wsHub
+}
diff --git a/web/websocket/hub.go b/web/websocket/hub.go
new file mode 100644
index 00000000..17c1ab69
--- /dev/null
+++ b/web/websocket/hub.go
@@ -0,0 +1,379 @@
+// Package websocket provides WebSocket hub for real-time updates and notifications.
+package websocket
+
+import (
+ "context"
+ "encoding/json"
+ "runtime"
+ "sync"
+ "time"
+
+ "github.com/mhsanaei/3x-ui/v2/logger"
+)
+
+// MessageType represents the type of WebSocket message
+type MessageType string
+
+const (
+ MessageTypeStatus MessageType = "status" // Server status update
+ MessageTypeTraffic MessageType = "traffic" // Traffic statistics update
+ MessageTypeInbounds MessageType = "inbounds" // Inbounds list update
+ MessageTypeNotification MessageType = "notification" // System notification
+ MessageTypeXrayState MessageType = "xray_state" // Xray state change
+)
+
+// Message represents a WebSocket message
+type Message struct {
+ Type MessageType `json:"type"`
+ Payload interface{} `json:"payload"`
+ Time int64 `json:"time"`
+}
+
+// Client represents a WebSocket client connection
+type Client struct {
+ ID string
+ Send chan []byte
+ Hub *Hub
+ Topics map[MessageType]bool // Subscribed topics
+}
+
+// Hub maintains the set of active clients and broadcasts messages to them
+type Hub struct {
+ // Registered clients
+ clients map[*Client]bool
+
+ // Inbound messages from clients
+ broadcast chan []byte
+
+ // Register requests from clients
+ register chan *Client
+
+ // Unregister requests from clients
+ unregister chan *Client
+
+ // Mutex for thread-safe operations
+ mu sync.RWMutex
+
+ // Context for graceful shutdown
+ ctx context.Context
+ cancel context.CancelFunc
+
+ // Worker pool for parallel broadcasting
+ workerPoolSize int
+ broadcastWg sync.WaitGroup
+}
+
+// NewHub creates a new WebSocket hub
+func NewHub() *Hub {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ // Calculate optimal worker pool size (CPU cores * 2, but max 100)
+ workerPoolSize := runtime.NumCPU() * 2
+ if workerPoolSize > 100 {
+ workerPoolSize = 100
+ }
+ if workerPoolSize < 10 {
+ workerPoolSize = 10
+ }
+
+ return &Hub{
+ clients: make(map[*Client]bool),
+ broadcast: make(chan []byte, 2048), // Увеличено с 256 до 2048 для высокой нагрузки
+ register: make(chan *Client, 100), // Буферизованный канал для быстрой регистрации
+ unregister: make(chan *Client, 100), // Буферизованный канал для быстрой отмены регистрации
+ ctx: ctx,
+ cancel: cancel,
+ workerPoolSize: workerPoolSize,
+ }
+}
+
+// Run starts the hub's main loop
+func (h *Hub) Run() {
+ defer func() {
+ if r := recover(); r != nil {
+ logger.Error("WebSocket hub panic recovered:", r)
+ // Restart the hub loop
+ go h.Run()
+ }
+ }()
+
+ for {
+ select {
+ case <-h.ctx.Done():
+ // Graceful shutdown: close all clients
+ h.mu.Lock()
+ for client := range h.clients {
+ // Safely close channel (avoid double close panic)
+ select {
+ case _, stillOpen := <-client.Send:
+ if stillOpen {
+ close(client.Send)
+ }
+ default:
+ close(client.Send)
+ }
+ }
+ h.clients = make(map[*Client]bool)
+ h.mu.Unlock()
+ // Wait for all broadcast workers to finish
+ h.broadcastWg.Wait()
+ logger.Info("WebSocket hub stopped gracefully")
+ return
+
+ case client := <-h.register:
+ if client == nil {
+ continue
+ }
+ h.mu.Lock()
+ h.clients[client] = true
+ count := len(h.clients)
+ h.mu.Unlock()
+ logger.Infof("WebSocket client connected: %s (total: %d)", client.ID, count)
+
+ case client := <-h.unregister:
+ if client == nil {
+ continue
+ }
+ h.mu.Lock()
+ if _, ok := h.clients[client]; ok {
+ delete(h.clients, client)
+ // Safely close channel (avoid double close panic)
+ // Check if channel is already closed by trying to read from it
+ select {
+ case _, stillOpen := <-client.Send:
+ if stillOpen {
+ // Channel was open and had data, now it's empty, safe to close
+ close(client.Send)
+ }
+ // If stillOpen is false, channel was already closed, do nothing
+ default:
+ // Channel is empty and open, safe to close
+ close(client.Send)
+ }
+ }
+ count := len(h.clients)
+ h.mu.Unlock()
+ logger.Infof("WebSocket client disconnected: %s (total: %d)", client.ID, count)
+
+ case message := <-h.broadcast:
+ if message == nil {
+ continue
+ }
+ // Оптимизация: быстро копируем список клиентов и освобождаем блокировку
+ h.mu.RLock()
+ clientCount := len(h.clients)
+ if clientCount == 0 {
+ h.mu.RUnlock()
+ continue
+ }
+
+ // Предварительно выделяем память для списка клиентов
+ clients := make([]*Client, 0, clientCount)
+ for client := range h.clients {
+ clients = append(clients, client)
+ }
+ h.mu.RUnlock()
+
+ // Параллельная рассылка с использованием worker pool
+ h.broadcastParallel(clients, message)
+ }
+ }
+}
+
+// broadcastParallel отправляет сообщение всем клиентам параллельно для максимальной производительности
+func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
+ if len(clients) == 0 {
+ return
+ }
+
+ // Для небольшого количества клиентов используем простую параллельную отправку
+ if len(clients) < h.workerPoolSize {
+ var wg sync.WaitGroup
+ for _, client := range clients {
+ wg.Add(1)
+ go func(c *Client) {
+ defer wg.Done()
+ defer func() {
+ if r := recover(); r != nil {
+ // Канал может быть закрыт, безопасно игнорируем
+ logger.Debugf("WebSocket broadcast panic recovered for client %s: %v", c.ID, r)
+ }
+ }()
+ select {
+ case c.Send <- message:
+ default:
+ // Client's send buffer is full, disconnect
+ logger.Debugf("WebSocket client %s send buffer full, disconnecting", c.ID)
+ h.Unregister(c)
+ }
+ }(client)
+ }
+ wg.Wait()
+ return
+ }
+
+ // Для большого количества клиентов используем worker pool для оптимальной производительности
+ clientChan := make(chan *Client, len(clients))
+ for _, client := range clients {
+ clientChan <- client
+ }
+ close(clientChan)
+
+ // Запускаем воркеров для параллельной обработки
+ h.broadcastWg.Add(h.workerPoolSize)
+ for i := 0; i < h.workerPoolSize; i++ {
+ go func() {
+ defer h.broadcastWg.Done()
+ for client := range clientChan {
+ func() {
+ defer func() {
+ if r := recover(); r != nil {
+ // Канал может быть закрыт, безопасно игнорируем
+ logger.Debugf("WebSocket broadcast panic recovered for client %s: %v", client.ID, r)
+ }
+ }()
+ select {
+ case client.Send <- message:
+ default:
+ // Client's send buffer is full, disconnect
+ logger.Debugf("WebSocket client %s send buffer full, disconnecting", client.ID)
+ h.Unregister(client)
+ }
+ }()
+ }
+ }()
+ }
+
+ // Ждем завершения всех воркеров
+ h.broadcastWg.Wait()
+}
+
+// Broadcast sends a message to all connected clients
+func (h *Hub) Broadcast(messageType MessageType, payload interface{}) {
+ if h == nil {
+ return
+ }
+ if payload == nil {
+ logger.Warning("Attempted to broadcast nil payload")
+ return
+ }
+
+ msg := Message{
+ Type: messageType,
+ Payload: payload,
+ Time: getCurrentTimestamp(),
+ }
+
+ data, err := json.Marshal(msg)
+ if err != nil {
+ logger.Error("Failed to marshal WebSocket message:", err)
+ return
+ }
+
+ // Limit message size to prevent memory issues
+ const maxMessageSize = 1024 * 1024 // 1MB
+ if len(data) > maxMessageSize {
+ logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
+ return
+ }
+
+ // Неблокирующая отправка с таймаутом для предотвращения задержек
+ select {
+ case h.broadcast <- data:
+ case <-time.After(100 * time.Millisecond):
+ logger.Warning("WebSocket broadcast channel is full, dropping message")
+ case <-h.ctx.Done():
+ // Hub is shutting down
+ }
+}
+
+// BroadcastToTopic sends a message only to clients subscribed to the specific topic
+func (h *Hub) BroadcastToTopic(messageType MessageType, payload interface{}) {
+ if h == nil {
+ return
+ }
+ if payload == nil {
+ logger.Warning("Attempted to broadcast nil payload to topic")
+ return
+ }
+
+ msg := Message{
+ Type: messageType,
+ Payload: payload,
+ Time: getCurrentTimestamp(),
+ }
+
+ data, err := json.Marshal(msg)
+ if err != nil {
+ logger.Error("Failed to marshal WebSocket message:", err)
+ return
+ }
+
+ // Limit message size to prevent memory issues
+ const maxMessageSize = 1024 * 1024 // 1MB
+ if len(data) > maxMessageSize {
+ logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
+ return
+ }
+
+ h.mu.RLock()
+ // Фильтруем клиентов по топикам и быстро освобождаем блокировку
+ subscribedClients := make([]*Client, 0)
+ for client := range h.clients {
+ if len(client.Topics) == 0 || client.Topics[messageType] {
+ subscribedClients = append(subscribedClients, client)
+ }
+ }
+ h.mu.RUnlock()
+
+ // Параллельная отправка подписанным клиентам
+ if len(subscribedClients) > 0 {
+ h.broadcastParallel(subscribedClients, data)
+ }
+}
+
+// GetClientCount returns the number of connected clients
+func (h *Hub) GetClientCount() int {
+ h.mu.RLock()
+ defer h.mu.RUnlock()
+ return len(h.clients)
+}
+
+// Register registers a new client with the hub
+func (h *Hub) Register(client *Client) {
+ if h == nil || client == nil {
+ return
+ }
+ select {
+ case h.register <- client:
+ case <-h.ctx.Done():
+ // Hub is shutting down
+ }
+}
+
+// Unregister unregisters a client from the hub
+func (h *Hub) Unregister(client *Client) {
+ if h == nil || client == nil {
+ return
+ }
+ select {
+ case h.unregister <- client:
+ case <-h.ctx.Done():
+ // Hub is shutting down
+ }
+}
+
+// Stop gracefully stops the hub and closes all connections
+func (h *Hub) Stop() {
+ if h == nil {
+ return
+ }
+ if h.cancel != nil {
+ h.cancel()
+ }
+}
+
+// getCurrentTimestamp returns current Unix timestamp in milliseconds
+func getCurrentTimestamp() int64 {
+ return time.Now().UnixMilli()
+}
diff --git a/web/websocket/notifier.go b/web/websocket/notifier.go
new file mode 100644
index 00000000..cedf56f2
--- /dev/null
+++ b/web/websocket/notifier.go
@@ -0,0 +1,74 @@
+// Package websocket provides WebSocket hub for real-time updates and notifications.
+package websocket
+
+import (
+ "github.com/mhsanaei/3x-ui/v2/logger"
+ "github.com/mhsanaei/3x-ui/v2/web/global"
+)
+
+// GetHub returns the global WebSocket hub instance
+func GetHub() *Hub {
+ webServer := global.GetWebServer()
+ if webServer == nil {
+ return nil
+ }
+ hub := webServer.GetWSHub()
+ if hub == nil {
+ return nil
+ }
+ wsHub, ok := hub.(*Hub)
+ if !ok {
+ logger.Warning("WebSocket hub type assertion failed")
+ return nil
+ }
+ return wsHub
+}
+
+// BroadcastStatus broadcasts server status update to all connected clients
+func BroadcastStatus(status interface{}) {
+ hub := GetHub()
+ if hub != nil {
+ hub.Broadcast(MessageTypeStatus, status)
+ }
+}
+
+// BroadcastTraffic broadcasts traffic statistics update to all connected clients
+func BroadcastTraffic(traffic interface{}) {
+ hub := GetHub()
+ if hub != nil {
+ hub.Broadcast(MessageTypeTraffic, traffic)
+ }
+}
+
+// BroadcastInbounds broadcasts inbounds list update to all connected clients
+func BroadcastInbounds(inbounds interface{}) {
+ hub := GetHub()
+ if hub != nil {
+ hub.Broadcast(MessageTypeInbounds, inbounds)
+ }
+}
+
+// BroadcastNotification broadcasts a system notification to all connected clients
+func BroadcastNotification(title, message, level string) {
+ hub := GetHub()
+ if hub != nil {
+ notification := map[string]string{
+ "title": title,
+ "message": message,
+ "level": level, // info, warning, error, success
+ }
+ hub.Broadcast(MessageTypeNotification, notification)
+ }
+}
+
+// BroadcastXrayState broadcasts Xray state change to all connected clients
+func BroadcastXrayState(state string, errorMsg string) {
+ hub := GetHub()
+ if hub != nil {
+ stateUpdate := map[string]string{
+ "state": state,
+ "errorMsg": errorMsg,
+ }
+ hub.Broadcast(MessageTypeXrayState, stateUpdate)
+ }
+}