mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-04-14 19:45:47 +00:00
Merge 93436bccba into 169b216d7e
This commit is contained in:
commit
0414ef17b0
17 changed files with 254 additions and 132 deletions
|
|
@ -13,4 +13,4 @@ services:
|
|||
XUI_ENABLE_FAIL2BAN: "true"
|
||||
tty: true
|
||||
network_mode: host
|
||||
restart: unless-stopped
|
||||
restart: unless-stopped
|
||||
|
|
@ -581,7 +581,7 @@ prompt_and_setup_ssl() {
|
|||
|
||||
# 3.1 Request Domain to compose Panel URL later
|
||||
read -rp "Please enter domain name certificate issued for: " custom_domain
|
||||
custom_domain="${custom_domain// /}" # Убираем пробелы
|
||||
custom_domain="${custom_domain// /}" # Remove spaces
|
||||
|
||||
# 3.2 Loop for Certificate Path
|
||||
while true; do
|
||||
|
|
|
|||
|
|
@ -609,7 +609,7 @@ prompt_and_setup_ssl() {
|
|||
|
||||
# 3.1 Request Domain to compose Panel URL later
|
||||
read -rp "Please enter domain name certificate issued for: " custom_domain
|
||||
custom_domain="${custom_domain// /}" # Убираем пробелы
|
||||
custom_domain="${custom_domain// /}" # Remove spaces
|
||||
|
||||
# 3.2 Loop for Certificate Path
|
||||
while true; do
|
||||
|
|
|
|||
|
|
@ -91,6 +91,10 @@ class DBInbound {
|
|||
}
|
||||
|
||||
toInbound() {
|
||||
if (this._cachedInbound) {
|
||||
return this._cachedInbound;
|
||||
}
|
||||
|
||||
let settings = {};
|
||||
if (!ObjectUtil.isEmpty(this.settings)) {
|
||||
settings = JSON.parse(this.settings);
|
||||
|
|
@ -116,7 +120,21 @@ class DBInbound {
|
|||
sniffing: sniffing,
|
||||
clientStats: this.clientStats,
|
||||
};
|
||||
return Inbound.fromJson(config);
|
||||
|
||||
this._cachedInbound = Inbound.fromJson(config);
|
||||
return this._cachedInbound;
|
||||
}
|
||||
|
||||
getClientStats(email) {
|
||||
if (!this._clientStatsMap) {
|
||||
this._clientStatsMap = new Map();
|
||||
if (this.clientStats && Array.isArray(this.clientStats)) {
|
||||
for (const stats of this.clientStats) {
|
||||
this._clientStatsMap.set(stats.email, stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
return this._clientStatsMap.get(email);
|
||||
}
|
||||
|
||||
isMultiUser() {
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ class WebSocketClient {
|
|||
this.ws.onmessage = (event) => {
|
||||
try {
|
||||
// Validate message size (prevent memory issues)
|
||||
const maxMessageSize = 10 * 1024 * 1024; // 10MB
|
||||
const maxMessageSize = 100 * 1024 * 1024; // 100MB
|
||||
if (event.data && event.data.length > maxMessageSize) {
|
||||
console.error('WebSocket message too large:', event.data.length, 'bytes');
|
||||
this.ws.close();
|
||||
|
|
|
|||
|
|
@ -30,8 +30,10 @@ const (
|
|||
)
|
||||
|
||||
var upgrader = ws.Upgrader{
|
||||
ReadBufferSize: 4096, // Increased from 1024 for better performance
|
||||
WriteBufferSize: 4096, // Increased from 1024 for better performance
|
||||
ReadBufferSize: 32768, // Huge buffers for huge databases
|
||||
WriteBufferSize: 32768, // Huge buffers to reduce TCP fragmentation
|
||||
EnableCompression: true, // Automatically GZIP large messages unconditionally
|
||||
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
// Check origin for security
|
||||
origin := r.Header.Get("Origin")
|
||||
|
|
|
|||
|
|
@ -70,6 +70,8 @@
|
|||
<a-select-option
|
||||
value="queryInHeader">queryInHeader</a-select-option>
|
||||
<a-select-option value="header">header</a-select-option>
|
||||
<a-select-option value="cookie">cookie</a-select-option>
|
||||
<a-select-option value="query">query</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
<a-form-item label="Padding Method">
|
||||
|
|
@ -127,7 +129,7 @@
|
|||
<a-select-option value>Default (body)</a-select-option>
|
||||
<a-select-option value="body">body</a-select-option>
|
||||
<a-select-option value="header">header</a-select-option>
|
||||
<a-select-option value="query">query</a-select-option>
|
||||
<a-select-option value="cookie">cookie</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
<a-form-item label="Uplink Data Key"
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@
|
|||
<a-sidebar></a-sidebar>
|
||||
<a-layout id="content-layout">
|
||||
<a-layout-content>
|
||||
<a-spin :spinning="loadingStates.spinning" :delay="500" tip='{{ i18n "loading"}}'>
|
||||
<a-spin :spinning="loadingStates.spinning" :delay="500" tip='{{ i18n "loading"}}' size="large">
|
||||
<transition name="list" appear>
|
||||
<a-alert type="error" v-if="showAlert && loadingStates.fetched" :style="{ marginBottom: '10px' }"
|
||||
message='{{ i18n "secAlertTitle" }}' color="red" description='{{ i18n "secAlertSsl" }}' show-icon closable>
|
||||
|
|
@ -14,10 +14,7 @@
|
|||
</transition>
|
||||
<transition name="list" appear>
|
||||
<a-row v-if="!loadingStates.fetched">
|
||||
<a-card
|
||||
:style="{ textAlign: 'center', padding: '30px 0', marginTop: '10px', background: 'transparent', border: 'none' }">
|
||||
<a-spin tip='{{ i18n "loading" }}'></a-spin>
|
||||
</a-card>
|
||||
<div :style="{ minHeight: 'calc(100vh - 120px)' }"></div>
|
||||
</a-row>
|
||||
<a-row :gutter="[isMobile ? 8 : 16, isMobile ? 0 : 12]" v-else>
|
||||
<a-col>
|
||||
|
|
@ -1304,7 +1301,6 @@
|
|||
if (!clients || !Array.isArray(clients)) return;
|
||||
index = this.findIndexOfClient(dbInbound.protocol, clients, client);
|
||||
if (index < 0 || !clients[index]) return;
|
||||
clients[index].enable = !clients[index].enable;
|
||||
clientId = this.getClientId(dbInbound.protocol, clients[index]);
|
||||
await this.updateClient(clients[index], dbInboundId, clientId);
|
||||
this.loading(false);
|
||||
|
|
@ -1317,7 +1313,7 @@
|
|||
},
|
||||
getInboundClients(dbInbound) {
|
||||
if (!dbInbound) return null;
|
||||
const inbound = dbInbound.toInbound();
|
||||
const inbound = this.inbounds.find(ib => ib.id === dbInbound.id) || dbInbound.toInbound();
|
||||
return inbound && inbound.clients ? inbound.clients : null;
|
||||
},
|
||||
resetClientTraffic(client, dbInboundId, confirmation = true) {
|
||||
|
|
@ -1367,42 +1363,54 @@
|
|||
isExpiry(dbInbound, index) {
|
||||
return dbInbound.toInbound().isExpiry(index);
|
||||
},
|
||||
getClientStats(dbInbound, email) {
|
||||
if (!dbInbound) return null;
|
||||
if (!dbInbound._clientStatsMap) {
|
||||
dbInbound._clientStatsMap = new Map();
|
||||
if (dbInbound.clientStats && Array.isArray(dbInbound.clientStats)) {
|
||||
for (const stats of dbInbound.clientStats) {
|
||||
dbInbound._clientStatsMap.set(stats.email, stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
return dbInbound._clientStatsMap.get(email);
|
||||
},
|
||||
getUpStats(dbInbound, email) {
|
||||
if (email.length == 0) return 0;
|
||||
clientStats = dbInbound.clientStats.find(stats => stats.email === email);
|
||||
if (!email || email.length == 0) return 0;
|
||||
let clientStats = this.getClientStats(dbInbound, email);
|
||||
return clientStats ? clientStats.up : 0;
|
||||
},
|
||||
getDownStats(dbInbound, email) {
|
||||
if (email.length == 0) return 0;
|
||||
clientStats = dbInbound.clientStats.find(stats => stats.email === email);
|
||||
if (!email || email.length == 0) return 0;
|
||||
let clientStats = this.getClientStats(dbInbound, email);
|
||||
return clientStats ? clientStats.down : 0;
|
||||
},
|
||||
getSumStats(dbInbound, email) {
|
||||
if (email.length == 0) return 0;
|
||||
clientStats = dbInbound.clientStats.find(stats => stats.email === email);
|
||||
if (!email || email.length == 0) return 0;
|
||||
let clientStats = this.getClientStats(dbInbound, email);
|
||||
return clientStats ? clientStats.up + clientStats.down : 0;
|
||||
},
|
||||
getAllTimeClient(dbInbound, email) {
|
||||
if (email.length == 0) return 0;
|
||||
clientStats = dbInbound.clientStats.find(stats => stats.email === email);
|
||||
if (!email || email.length == 0) return 0;
|
||||
let clientStats = this.getClientStats(dbInbound, email);
|
||||
if (!clientStats) return 0;
|
||||
return clientStats.allTime || (clientStats.up + clientStats.down);
|
||||
},
|
||||
getRemStats(dbInbound, email) {
|
||||
if (email.length == 0) return 0;
|
||||
clientStats = dbInbound.clientStats.find(stats => stats.email === email);
|
||||
if (!email || email.length == 0) return 0;
|
||||
let clientStats = this.getClientStats(dbInbound, email);
|
||||
if (!clientStats) return 0;
|
||||
remained = clientStats.total - (clientStats.up + clientStats.down);
|
||||
let remained = clientStats.total - (clientStats.up + clientStats.down);
|
||||
return remained > 0 ? remained : 0;
|
||||
},
|
||||
clientStatsColor(dbInbound, email) {
|
||||
if (email.length == 0) return ColorUtils.clientUsageColor();
|
||||
clientStats = dbInbound.clientStats.find(stats => stats.email === email);
|
||||
if (!email || email.length == 0) return ColorUtils.clientUsageColor();
|
||||
let clientStats = this.getClientStats(dbInbound, email);
|
||||
return ColorUtils.clientUsageColor(clientStats, app.trafficDiff)
|
||||
},
|
||||
statsProgress(dbInbound, email) {
|
||||
if (email.length == 0) return 100;
|
||||
clientStats = dbInbound.clientStats.find(stats => stats.email === email);
|
||||
if (!email || email.length == 0) return 100;
|
||||
let clientStats = this.getClientStats(dbInbound, email);
|
||||
if (!clientStats) return 0;
|
||||
if (clientStats.total == 0) return 100;
|
||||
return 100 * (clientStats.down + clientStats.up) / clientStats.total;
|
||||
|
|
@ -1415,11 +1423,11 @@
|
|||
return 100 * (1 - (remainedSeconds / resetSeconds));
|
||||
},
|
||||
statsExpColor(dbInbound, email) {
|
||||
if (email.length == 0) return '#7a316f';
|
||||
clientStats = dbInbound.clientStats.find(stats => stats.email === email);
|
||||
if (!email || email.length == 0) return '#7a316f';
|
||||
let clientStats = this.getClientStats(dbInbound, email);
|
||||
if (!clientStats) return '#7a316f';
|
||||
statsColor = ColorUtils.usageColor(clientStats.down + clientStats.up, this.trafficDiff, clientStats.total);
|
||||
expColor = ColorUtils.usageColor(new Date().getTime(), this.expireDiff, clientStats.expiryTime);
|
||||
let statsColor = ColorUtils.usageColor(clientStats.down + clientStats.up, this.trafficDiff, clientStats.total);
|
||||
let expColor = ColorUtils.usageColor(new Date().getTime(), this.expireDiff, clientStats.expiryTime);
|
||||
switch (true) {
|
||||
case statsColor == "red" || expColor == "red":
|
||||
return "#cf3c3c"; // Red
|
||||
|
|
@ -1432,12 +1440,12 @@
|
|||
}
|
||||
},
|
||||
isClientEnabled(dbInbound, email) {
|
||||
clientStats = dbInbound.clientStats ? dbInbound.clientStats.find(stats => stats.email === email) : null;
|
||||
let clientStats = dbInbound ? this.getClientStats(dbInbound, email) : null;
|
||||
return clientStats ? clientStats['enable'] : true;
|
||||
},
|
||||
isClientDepleted(dbInbound, email) {
|
||||
if (!email || !dbInbound || !dbInbound.clientStats) return false;
|
||||
const stats = dbInbound.clientStats.find(s => s.email === email);
|
||||
if (!email || !dbInbound) return false;
|
||||
const stats = this.getClientStats(dbInbound, email);
|
||||
if (!stats) return false;
|
||||
const total = stats.total ?? 0;
|
||||
const used = (stats.up ?? 0) + (stats.down ?? 0);
|
||||
|
|
@ -1557,12 +1565,18 @@
|
|||
pagination(obj) {
|
||||
if (this.pageSize > 0 && obj.length > this.pageSize) {
|
||||
// Set page options based on object size
|
||||
sizeOptions = [];
|
||||
for (i = this.pageSize; i <= obj.length; i = i + this.pageSize) {
|
||||
sizeOptions.push(i.toString());
|
||||
let sizeOptions = [this.pageSize.toString()];
|
||||
const increments = [2, 5, 10, 20];
|
||||
for (const m of increments) {
|
||||
const val = this.pageSize * m;
|
||||
if (val < obj.length && val <= 1000) {
|
||||
sizeOptions.push(val.toString());
|
||||
}
|
||||
}
|
||||
// Add option to see all in one page
|
||||
sizeOptions.push(i.toString());
|
||||
if (!sizeOptions.includes(obj.length.toString())) {
|
||||
sizeOptions.push(obj.length.toString());
|
||||
}
|
||||
|
||||
p = {
|
||||
showSizeChanger: true,
|
||||
|
|
@ -1605,11 +1619,25 @@
|
|||
}
|
||||
});
|
||||
|
||||
// Listen for invalidate signals (sent when payload is too large for WebSocket)
|
||||
// The server sends a lightweight notification and we re-fetch via REST API
|
||||
let invalidateTimer = null;
|
||||
window.wsClient.on('invalidate', (payload) => {
|
||||
if (payload && (payload.type === 'inbounds' || payload.type === 'traffic')) {
|
||||
// Debounce to avoid flooding the REST API with multiple invalidate signals
|
||||
if (invalidateTimer) clearTimeout(invalidateTimer);
|
||||
invalidateTimer = setTimeout(() => {
|
||||
invalidateTimer = null;
|
||||
this.getDBInbounds();
|
||||
}, 1000);
|
||||
}
|
||||
});
|
||||
|
||||
// Listen for traffic updates
|
||||
window.wsClient.on('traffic', (payload) => {
|
||||
// Note: Do NOT update total consumed traffic (stats.up, stats.down) from this event
|
||||
// because clientTraffics contains delta/incremental values, not total accumulated values.
|
||||
// Total traffic is updated via the 'inbounds' event which contains accumulated values from database.
|
||||
// Total traffic is updated via the 'inbounds' WebSocket event (or 'invalidate' fallback for large panels).
|
||||
|
||||
// Update online clients list in real-time
|
||||
if (payload && Array.isArray(payload.onlineClients)) {
|
||||
|
|
@ -1627,22 +1655,27 @@
|
|||
this.onlineClients = nextOnlineClients;
|
||||
if (onlineChanged) {
|
||||
// Recalculate client counts to update online status
|
||||
// Use $set for Vue 2 reactivity — direct array index assignment is not reactive
|
||||
this.dbInbounds.forEach(dbInbound => {
|
||||
const inbound = this.inbounds.find(ib => ib.id === dbInbound.id);
|
||||
if (inbound && this.clientCount[dbInbound.id]) {
|
||||
this.clientCount[dbInbound.id] = this.getClientCounts(dbInbound, inbound);
|
||||
this.$set(this.clientCount, dbInbound.id, this.getClientCounts(dbInbound, inbound));
|
||||
}
|
||||
});
|
||||
|
||||
// Always trigger UI refresh — not just when filter is enabled
|
||||
if (this.enableFilter) {
|
||||
this.filterInbounds();
|
||||
} else {
|
||||
this.searchInbounds(this.searchKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update last online map in real-time
|
||||
// Replace entirely (server sends the full map) to avoid unbounded growth from deleted clients
|
||||
if (payload && payload.lastOnlineMap && typeof payload.lastOnlineMap === 'object') {
|
||||
this.lastOnlineMap = { ...this.lastOnlineMap, ...payload.lastOnlineMap };
|
||||
this.lastOnlineMap = payload.lastOnlineMap;
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -1697,4 +1730,18 @@
|
|||
},
|
||||
});
|
||||
</script>
|
||||
<style>
|
||||
#content-layout > .ant-layout-content > .ant-spin-nested-loading > div > .ant-spin {
|
||||
position: fixed !important;
|
||||
top: 50vh !important;
|
||||
left: calc(50vw + 100px) !important;
|
||||
transform: translate(-50%, -50%);
|
||||
z-index: 99999 !important;
|
||||
}
|
||||
@media (max-width: 768px) {
|
||||
#content-layout > .ant-layout-content > .ant-spin-nested-loading > div > .ant-spin {
|
||||
left: 50vw !important;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
{{ template "page/body_end" .}}
|
||||
|
|
@ -6,7 +6,7 @@
|
|||
<a-sidebar></a-sidebar>
|
||||
<a-layout id="content-layout">
|
||||
<a-layout-content>
|
||||
<a-spin :spinning="loadingStates.spinning" :delay="200" :tip="loadingTip">
|
||||
<a-spin :spinning="loadingStates.spinning" :delay="200" :tip="loadingTip" size="large">
|
||||
<transition name="list" appear>
|
||||
<a-alert type="error" v-if="showAlert && loadingStates.fetched" class="mb-10"
|
||||
message='{{ i18n "secAlertTitle" }}' color="red" description='{{ i18n "secAlertSsl" }}' show-icon closable>
|
||||
|
|
@ -15,9 +15,7 @@
|
|||
<transition name="list" appear>
|
||||
<template>
|
||||
<a-row v-if="!loadingStates.fetched">
|
||||
<a-card class="card-placeholder text-center">
|
||||
<a-spin tip='{{ i18n "loading" }}'></a-spin>
|
||||
</a-card>
|
||||
<div :style="{ minHeight: 'calc(100vh - 120px)' }"></div>
|
||||
</a-row>
|
||||
<a-row :gutter="[isMobile ? 8 : 16, isMobile ? 0 : 12]" v-else>
|
||||
<a-col>
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@
|
|||
<a-input v-model.trim="clientsBulkModal.emailPostfix"></a-input>
|
||||
</a-form-item>
|
||||
<a-form-item label='{{ i18n "pages.client.clientCount" }}' v-if="clientsBulkModal.emailMethod < 2">
|
||||
<a-input-number v-model.number="clientsBulkModal.quantity" :min="1" :max="100"></a-input-number>
|
||||
<a-input-number v-model.number="clientsBulkModal.quantity" :min="1" :max="10000"></a-input-number>
|
||||
</a-form-item>
|
||||
<a-form-item label='{{ i18n "security" }}' v-if="inbound.protocol === Protocols.VMESS">
|
||||
<a-select v-model="clientsBulkModal.security" :dropdown-class-name="themeSwitcher.currentTheme">
|
||||
|
|
@ -204,7 +204,7 @@
|
|||
this.security = "auto";
|
||||
this.flow = "";
|
||||
this.dbInbound = new DBInbound(dbInbound);
|
||||
this.inbound = dbInbound.toInbound();
|
||||
this.inbound = Inbound.fromJson(dbInbound.toInbound().toJson());
|
||||
this.delayedStart = false;
|
||||
this.reset = 0;
|
||||
},
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@
|
|||
this.okText = okText;
|
||||
this.isEdit = isEdit;
|
||||
this.dbInbound = new DBInbound(dbInbound);
|
||||
this.inbound = dbInbound.toInbound();
|
||||
this.inbound = Inbound.fromJson(dbInbound.toInbound().toJson());
|
||||
this.clients = this.inbound.clients;
|
||||
this.index = index === null ? this.clients.length : index;
|
||||
this.delayedStart = false;
|
||||
|
|
@ -98,9 +98,9 @@
|
|||
return app.datepicker;
|
||||
},
|
||||
get isTrafficExhausted() {
|
||||
if (!clientStats) return false
|
||||
if (clientStats.total <= 0) return false
|
||||
if (clientStats.up + clientStats.down < clientStats.total) return false
|
||||
if (!this.clientStats) return false
|
||||
if (this.clientStats.total <= 0) return false
|
||||
if (this.clientStats.up + this.clientStats.down < this.clientStats.total) return false
|
||||
return true
|
||||
},
|
||||
get isExpiry() {
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@
|
|||
<a-sidebar></a-sidebar>
|
||||
<a-layout id="content-layout">
|
||||
<a-layout-content>
|
||||
<a-spin :spinning="loadingStates.spinning" :delay="500" tip='{{ i18n "loading"}}'>
|
||||
<a-spin :spinning="loadingStates.spinning" :delay="500" tip='{{ i18n "loading"}}' size="large">
|
||||
<transition name="list" appear>
|
||||
<a-alert type="error" v-if="confAlerts.length>0 && loadingStates.fetched" :style="{ marginBottom: '10px' }"
|
||||
message='{{ i18n "secAlertTitle" }}' color="red" show-icon closable>
|
||||
|
|
@ -21,10 +21,7 @@
|
|||
<transition name="list" appear>
|
||||
<template>
|
||||
<a-row v-if="!loadingStates.fetched">
|
||||
<a-card
|
||||
:style="{ textAlign: 'center', padding: '30px 0', marginTop: '10px', background: 'transparent', border: 'none' }">
|
||||
<a-spin tip='{{ i18n "loading" }}'></a-spin>
|
||||
</a-card>
|
||||
<div :style="{ minHeight: 'calc(100vh - 120px)' }"></div>
|
||||
</a-row>
|
||||
<a-row :gutter="[isMobile ? 8 : 16, isMobile ? 0 : 12]" v-else>
|
||||
<a-col>
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@
|
|||
<a-layout id="content-layout">
|
||||
<a-layout-content>
|
||||
<a-spin :spinning="loadingStates.spinning" :delay="500"
|
||||
tip='{{ i18n "loading"}}'>
|
||||
tip='{{ i18n "loading"}}' size="large">
|
||||
<transition name="list" appear>
|
||||
<a-alert type="error" v-if="showAlert && loadingStates.fetched"
|
||||
:style="{ marginBottom: '10px' }"
|
||||
|
|
@ -24,10 +24,7 @@
|
|||
</transition>
|
||||
<transition name="list" appear>
|
||||
<a-row v-if="!loadingStates.fetched">
|
||||
<a-card
|
||||
:style="{ textAlign: 'center', padding: '30px 0', marginTop: '10px', background: 'transparent', border: 'none' }">
|
||||
<a-spin tip='{{ i18n "loading" }}'></a-spin>
|
||||
</a-card>
|
||||
<div :style="{ minHeight: 'calc(100vh - 120px)' }"></div>
|
||||
</a-row>
|
||||
<a-row :gutter="[isMobile ? 8 : 16, isMobile ? 0 : 12]" v-else>
|
||||
<a-col>
|
||||
|
|
@ -1075,6 +1072,14 @@
|
|||
this.$forceUpdate();
|
||||
}
|
||||
});
|
||||
|
||||
// Handle invalidate signals (sent when payload is too large for WebSocket,
|
||||
// or when traffic job notifies about data changes)
|
||||
window.wsClient.on('invalidate', (payload) => {
|
||||
if (payload && payload.type === 'outbounds') {
|
||||
this.refreshOutboundTraffic();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
while (true) {
|
||||
|
|
|
|||
|
|
@ -50,7 +50,13 @@ func (j *XrayTrafficJob) Run() {
|
|||
j.xrayService.SetToNeedRestart()
|
||||
}
|
||||
|
||||
// Get online clients and last online map for real-time status updates
|
||||
// If no frontend client is connected, skip all WebSocket broadcasting routines,
|
||||
// including expensive DB queries for online clients and JSON marshaling.
|
||||
if !websocket.HasClients() {
|
||||
return
|
||||
}
|
||||
|
||||
// Update online clients list and map
|
||||
onlineClients := j.inboundService.GetOnlineClients()
|
||||
lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
|
||||
if err != nil {
|
||||
|
|
@ -58,8 +64,17 @@ func (j *XrayTrafficJob) Run() {
|
|||
lastOnlineMap = make(map[string]int64)
|
||||
}
|
||||
|
||||
// Broadcast traffic update (deltas and online stats) via WebSocket
|
||||
trafficUpdate := map[string]any{
|
||||
"traffics": traffics,
|
||||
"clientTraffics": clientTraffics,
|
||||
"onlineClients": onlineClients,
|
||||
"lastOnlineMap": lastOnlineMap,
|
||||
}
|
||||
websocket.BroadcastTraffic(trafficUpdate)
|
||||
|
||||
// Fetch updated inbounds from database with accumulated traffic values
|
||||
// This ensures frontend receives the actual total traffic, not just delta values
|
||||
// This ensures frontend receives the actual total traffic for real-time UI refresh.
|
||||
updatedInbounds, err := j.inboundService.GetAllInbounds()
|
||||
if err != nil {
|
||||
logger.Warning("get all inbounds for websocket failed:", err)
|
||||
|
|
@ -70,16 +85,8 @@ func (j *XrayTrafficJob) Run() {
|
|||
logger.Warning("get all outbounds for websocket failed:", err)
|
||||
}
|
||||
|
||||
// Broadcast traffic update via WebSocket with accumulated values from database
|
||||
trafficUpdate := map[string]any{
|
||||
"traffics": traffics,
|
||||
"clientTraffics": clientTraffics,
|
||||
"onlineClients": onlineClients,
|
||||
"lastOnlineMap": lastOnlineMap,
|
||||
}
|
||||
websocket.BroadcastTraffic(trafficUpdate)
|
||||
|
||||
// Broadcast full inbounds update for real-time UI refresh
|
||||
// The WebSocket hub will automatically check the payload size.
|
||||
// If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead.
|
||||
if updatedInbounds != nil {
|
||||
websocket.BroadcastInbounds(updatedInbounds)
|
||||
}
|
||||
|
|
@ -87,7 +94,6 @@ func (j *XrayTrafficJob) Run() {
|
|||
if updatedOutbounds != nil {
|
||||
websocket.BroadcastOutbounds(updatedOutbounds)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {
|
||||
|
|
|
|||
|
|
@ -118,31 +118,35 @@ func (s *XrayService) GetXrayConfig() (*xray.Config, error) {
|
|||
json.Unmarshal([]byte(inbound.Settings), &settings)
|
||||
clients, ok := settings["clients"].([]any)
|
||||
if ok {
|
||||
// check users active or not
|
||||
// Fast O(N) lookup map for client traffic enablement
|
||||
clientStats := inbound.ClientStats
|
||||
enableMap := make(map[string]bool, len(clientStats))
|
||||
for _, clientTraffic := range clientStats {
|
||||
indexDecrease := 0
|
||||
for index, client := range clients {
|
||||
c := client.(map[string]any)
|
||||
if c["email"] == clientTraffic.Email {
|
||||
if !clientTraffic.Enable {
|
||||
clients = RemoveIndex(clients, index-indexDecrease)
|
||||
indexDecrease++
|
||||
logger.Infof("Remove Inbound User %s due to expiration or traffic limit", c["email"])
|
||||
}
|
||||
}
|
||||
}
|
||||
enableMap[clientTraffic.Email] = clientTraffic.Enable
|
||||
}
|
||||
|
||||
// clear client config for additional parameters
|
||||
// filter and clean clients
|
||||
var final_clients []any
|
||||
for _, client := range clients {
|
||||
c := client.(map[string]any)
|
||||
if c["enable"] != nil {
|
||||
if enable, ok := c["enable"].(bool); ok && !enable {
|
||||
continue
|
||||
}
|
||||
c, ok := client.(map[string]any)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
email, _ := c["email"].(string)
|
||||
|
||||
// check users active or not via stats
|
||||
if enable, exists := enableMap[email]; exists && !enable {
|
||||
logger.Infof("Remove Inbound User %s due to expiration or traffic limit", email)
|
||||
continue
|
||||
}
|
||||
|
||||
// check manual disabled flag
|
||||
if manualEnable, ok := c["enable"].(bool); ok && !manualEnable {
|
||||
continue
|
||||
}
|
||||
|
||||
// clear client config for additional parameters
|
||||
for key := range c {
|
||||
if key != "email" && key != "id" && key != "password" && key != "flow" && key != "method" {
|
||||
delete(c, key)
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ const (
|
|||
MessageTypeNotification MessageType = "notification" // System notification
|
||||
MessageTypeXrayState MessageType = "xray_state" // Xray state change
|
||||
MessageTypeOutbounds MessageType = "outbounds" // Outbounds list update
|
||||
MessageTypeInvalidate MessageType = "invalidate" // Lightweight signal telling frontend to re-fetch data via REST
|
||||
)
|
||||
|
||||
// Message represents a WebSocket message
|
||||
|
|
@ -32,10 +33,11 @@ type Message struct {
|
|||
|
||||
// Client represents a WebSocket client connection
|
||||
type Client struct {
|
||||
ID string
|
||||
Send chan []byte
|
||||
Hub *Hub
|
||||
Topics map[MessageType]bool // Subscribed topics
|
||||
ID string
|
||||
Send chan []byte
|
||||
Hub *Hub
|
||||
Topics map[MessageType]bool // Subscribed topics
|
||||
closeOnce sync.Once // Ensures Send channel is closed exactly once
|
||||
}
|
||||
|
||||
// Hub maintains the set of active clients and broadcasts messages to them
|
||||
|
|
@ -61,7 +63,6 @@ type Hub struct {
|
|||
|
||||
// Worker pool for parallel broadcasting
|
||||
workerPoolSize int
|
||||
broadcastWg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewHub creates a new WebSocket hub
|
||||
|
|
@ -104,20 +105,12 @@ func (h *Hub) Run() {
|
|||
// Graceful shutdown: close all clients
|
||||
h.mu.Lock()
|
||||
for client := range h.clients {
|
||||
// Safely close channel (avoid double close panic)
|
||||
select {
|
||||
case _, stillOpen := <-client.Send:
|
||||
if stillOpen {
|
||||
close(client.Send)
|
||||
}
|
||||
default:
|
||||
client.closeOnce.Do(func() {
|
||||
close(client.Send)
|
||||
}
|
||||
})
|
||||
}
|
||||
h.clients = make(map[*Client]bool)
|
||||
h.mu.Unlock()
|
||||
// Wait for all broadcast workers to finish
|
||||
h.broadcastWg.Wait()
|
||||
logger.Info("WebSocket hub stopped gracefully")
|
||||
return
|
||||
|
||||
|
|
@ -138,19 +131,9 @@ func (h *Hub) Run() {
|
|||
h.mu.Lock()
|
||||
if _, ok := h.clients[client]; ok {
|
||||
delete(h.clients, client)
|
||||
// Safely close channel (avoid double close panic)
|
||||
// Check if channel is already closed by trying to read from it
|
||||
select {
|
||||
case _, stillOpen := <-client.Send:
|
||||
if stillOpen {
|
||||
// Channel was open and had data, now it's empty, safe to close
|
||||
close(client.Send)
|
||||
}
|
||||
// If stillOpen is false, channel was already closed, do nothing
|
||||
default:
|
||||
// Channel is empty and open, safe to close
|
||||
client.closeOnce.Do(func() {
|
||||
close(client.Send)
|
||||
}
|
||||
})
|
||||
}
|
||||
count := len(h.clients)
|
||||
h.mu.Unlock()
|
||||
|
|
@ -220,11 +203,12 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
|
|||
}
|
||||
close(clientChan)
|
||||
|
||||
// Start workers for parallel processing
|
||||
h.broadcastWg.Add(h.workerPoolSize)
|
||||
// Use a local WaitGroup to avoid blocking hub shutdown
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(h.workerPoolSize)
|
||||
for i := 0; i < h.workerPoolSize; i++ {
|
||||
go func() {
|
||||
defer h.broadcastWg.Done()
|
||||
defer wg.Done()
|
||||
for client := range clientChan {
|
||||
func() {
|
||||
defer func() {
|
||||
|
|
@ -246,7 +230,7 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
|
|||
}
|
||||
|
||||
// Wait for all workers to finish
|
||||
h.broadcastWg.Wait()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Broadcast sends a message to all connected clients
|
||||
|
|
@ -259,6 +243,11 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) {
|
|||
return
|
||||
}
|
||||
|
||||
// Skip all work if no clients are connected
|
||||
if h.GetClientCount() == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
msg := Message{
|
||||
Type: messageType,
|
||||
Payload: payload,
|
||||
|
|
@ -271,10 +260,12 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) {
|
|||
return
|
||||
}
|
||||
|
||||
// Limit message size to prevent memory issues
|
||||
const maxMessageSize = 1024 * 1024 // 1MB
|
||||
// If message exceeds size limit, send a lightweight invalidate notification
|
||||
// instead of dropping it entirely — the frontend will re-fetch via REST API
|
||||
const maxMessageSize = 100 * 1024 * 1024 // 100MB (User override)
|
||||
if len(data) > maxMessageSize {
|
||||
logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
|
||||
logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType)
|
||||
h.broadcastInvalidate(messageType)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -298,6 +289,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) {
|
|||
return
|
||||
}
|
||||
|
||||
// Skip all work if no clients are connected
|
||||
if h.GetClientCount() == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
msg := Message{
|
||||
Type: messageType,
|
||||
Payload: payload,
|
||||
|
|
@ -310,10 +306,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) {
|
|||
return
|
||||
}
|
||||
|
||||
// Limit message size to prevent memory issues
|
||||
const maxMessageSize = 1024 * 1024 // 1MB
|
||||
// If message exceeds size limit, send a lightweight invalidate notification
|
||||
const maxMessageSize = 100 * 1024 * 1024 // 100MB (User override)
|
||||
if len(data) > maxMessageSize {
|
||||
logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
|
||||
logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType)
|
||||
h.broadcastInvalidate(messageType)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -374,6 +371,31 @@ func (h *Hub) Stop() {
|
|||
}
|
||||
}
|
||||
|
||||
// broadcastInvalidate sends a lightweight invalidate message to all clients,
|
||||
// telling them to re-fetch the specified data type via REST API.
|
||||
// This is used when the full payload exceeds the WebSocket message size limit.
|
||||
func (h *Hub) broadcastInvalidate(originalType MessageType) {
|
||||
msg := Message{
|
||||
Type: MessageTypeInvalidate,
|
||||
Payload: map[string]string{"type": string(originalType)},
|
||||
Time: getCurrentTimestamp(),
|
||||
}
|
||||
|
||||
data, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
logger.Error("Failed to marshal invalidate message:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Non-blocking send with timeout
|
||||
select {
|
||||
case h.broadcast <- data:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
logger.Warning("WebSocket broadcast channel is full, dropping invalidate message")
|
||||
case <-h.ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
// getCurrentTimestamp returns current Unix timestamp in milliseconds
|
||||
func getCurrentTimestamp() int64 {
|
||||
return time.Now().UnixMilli()
|
||||
|
|
|
|||
|
|
@ -24,6 +24,16 @@ func GetHub() *Hub {
|
|||
return wsHub
|
||||
}
|
||||
|
||||
// HasClients returns true if there are any WebSocket clients connected.
|
||||
// Use this to skip expensive work (DB queries, serialization) when no browser is open.
|
||||
func HasClients() bool {
|
||||
hub := GetHub()
|
||||
if hub == nil {
|
||||
return false
|
||||
}
|
||||
return hub.GetClientCount() > 0
|
||||
}
|
||||
|
||||
// BroadcastStatus broadcasts server status update to all connected clients
|
||||
func BroadcastStatus(status any) {
|
||||
hub := GetHub()
|
||||
|
|
@ -80,3 +90,14 @@ func BroadcastXrayState(state string, errorMsg string) {
|
|||
hub.Broadcast(MessageTypeXrayState, stateUpdate)
|
||||
}
|
||||
}
|
||||
|
||||
// BroadcastInvalidate sends a lightweight invalidate signal for the given data type,
|
||||
// telling connected frontends to re-fetch data via REST API.
|
||||
// Use this instead of BroadcastInbounds/BroadcastOutbounds when you know the payload
|
||||
// will be too large, to avoid wasting resources on serialization.
|
||||
func BroadcastInvalidate(dataType MessageType) {
|
||||
hub := GetHub()
|
||||
if hub != nil {
|
||||
hub.broadcastInvalidate(dataType)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue