3x-ui/web/service/metric_history.go
MHSanaei d4c020f365
feat(dashboard): more System History metrics, persistence & localized labels
- Sample swap %, TCP/UDP connection counts and disk-usage % on the host ticker
- System History: Swap overlaid on the RAM tab, plus new Connections and Disk Usage tabs
- Persist the host time-series across restarts: gob snapshot beside the DB, written on a timer and at shutdown, restored on boot
- Live-refresh the open chart (2s for short ranges, 10s for longer)
- Localize CPU/RAM/Swap and the new tab/chart titles across all 13 languages and route legend series names through i18n
2026-06-03 12:16:31 +02:00

237 lines
7 KiB
Go

package service
import (
"encoding/gob"
"os"
"path/filepath"
"sync"
"time"
"github.com/mhsanaei/3x-ui/v3/config"
"github.com/mhsanaei/3x-ui/v3/logger"
)
// MetricSample is one point of any time-series we keep in memory.
// The frontend deserializes both keys, so they must stay short.
type MetricSample struct {
T int64 `json:"t"`
V float64 `json:"v"`
}
// metricCapacityDefault caps each ring buffer at ~5h worth of @2s samples
// or ~25h worth of @10s samples. Plenty for the bucketed aggregation
// view and small enough that the working set per metric stays under
// ~150 KiB.
const metricCapacityDefault = 9000
// metricHistory is a thread-safe, in-memory ring buffer keyed by
// arbitrary strings. Two singletons live below: one for system-wide
// host metrics, one for per-node metrics. Keeping them in this file
// (rather than scattered across services) makes the storage model
// easy to reason about and avoids double-locking.
type metricHistory struct {
mu sync.Mutex
metrics map[string][]MetricSample
}
func newMetricHistory() *metricHistory {
return &metricHistory{metrics: map[string][]MetricSample{}}
}
// append stores a single sample for the given metric, deduping when
// two appends happen within the same wall-clock second (which can
// happen if the cron tick is faster than the metric's natural rate).
func (h *metricHistory) append(metric string, t time.Time, v float64) {
h.mu.Lock()
defer h.mu.Unlock()
buf := h.metrics[metric]
p := MetricSample{T: t.Unix(), V: v}
if n := len(buf); n > 0 && buf[n-1].T == p.T {
buf[n-1] = p
} else {
buf = append(buf, p)
}
if len(buf) > metricCapacityDefault {
buf = buf[len(buf)-metricCapacityDefault:]
}
h.metrics[metric] = buf
}
// drop removes the entire history for one metric. Used when a node is
// deleted so its old samples don't linger forever in the singleton.
func (h *metricHistory) drop(metric string) {
h.mu.Lock()
delete(h.metrics, metric)
h.mu.Unlock()
}
// snapshot returns a deep copy of every series, safe to serialize without
// holding the lock during disk I/O.
func (h *metricHistory) snapshot() map[string][]MetricSample {
h.mu.Lock()
defer h.mu.Unlock()
out := make(map[string][]MetricSample, len(h.metrics))
for k, v := range h.metrics {
cp := make([]MetricSample, len(v))
copy(cp, v)
out[k] = cp
}
return out
}
// restore replaces the in-memory series with a previously persisted set,
// re-applying the per-series capacity cap so a tampered or oversized file
// can't grow the working set unbounded.
func (h *metricHistory) restore(data map[string][]MetricSample) {
h.mu.Lock()
defer h.mu.Unlock()
for k, v := range data {
if len(v) > metricCapacityDefault {
v = v[len(v)-metricCapacityDefault:]
}
h.metrics[k] = v
}
}
// aggregate returns up to maxPoints buckets of size bucketSeconds,
// each bucket carrying the arithmetic mean of the underlying samples.
// Bucket alignment is to absolute Unix-second boundaries so two
// concurrent calls (e.g. two browser tabs) see identical x-axes.
func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints int) []map[string]any {
if bucketSeconds <= 0 || maxPoints <= 0 {
return []map[string]any{}
}
cutoff := time.Now().Add(-time.Duration(bucketSeconds*maxPoints) * time.Second).Unix()
h.mu.Lock()
hist := h.metrics[metric]
startIdx := 0
for i := len(hist) - 1; i >= 0; i-- {
if hist[i].T < cutoff {
startIdx = i + 1
break
}
}
if startIdx >= len(hist) {
h.mu.Unlock()
return []map[string]any{}
}
tmp := make([]MetricSample, len(hist)-startIdx)
copy(tmp, hist[startIdx:])
h.mu.Unlock()
if len(tmp) == 0 {
return []map[string]any{}
}
bSize := int64(bucketSeconds)
curBucket := (tmp[0].T / bSize) * bSize
var out []map[string]any
var acc []float64
flush := func(ts int64) {
if len(acc) == 0 {
return
}
sum := 0.0
for _, v := range acc {
sum += v
}
out = append(out, map[string]any{"t": ts, "v": sum / float64(len(acc))})
acc = acc[:0]
}
for _, p := range tmp {
b := (p.T / bSize) * bSize
if b != curBucket {
flush(curBucket)
curBucket = b
}
acc = append(acc, p.V)
}
flush(curBucket)
if len(out) > maxPoints {
out = out[len(out)-maxPoints:]
}
if out == nil {
return []map[string]any{}
}
return out
}
// systemMetrics holds whole-host time series (cpu, mem, netUp, etc.)
// fed by ServerService.RefreshStatus every 2s. nodeMetrics holds
// per-node CPU/Mem fed by NodeHeartbeatJob every 10s. Both are
// process-local — survival across panel restart is not required.
var (
systemMetrics = newMetricHistory()
nodeMetrics = newMetricHistory()
xrayMetrics = newMetricHistory()
)
// SystemMetricKeys lists the metric names ServerService writes on every
// status sample. Exposed for documentation/test purposes; the
// controller validates incoming names against an allow-list.
var SystemMetricKeys = []string{
"cpu", "mem", "swap", "netUp", "netDown", "pktUp", "pktDown", "diskRead", "diskWrite", "diskUsage", "tcpCount", "udpCount", "online", "load1", "load5", "load15",
}
// NodeMetricKeys lists the per-node metric names NodeHeartbeatJob writes.
var NodeMetricKeys = []string{"cpu", "mem"}
// XrayMetricKeys lists series sourced from xray's /debug/vars expvar
// endpoint. Populated by XrayMetricsService.Sample on the same 2s cadence
// as the system metrics, but only when the xray config has a `metrics`
// block configured.
var XrayMetricKeys = []string{
"xrAlloc", "xrSys", "xrHeapObjects", "xrNumGC", "xrPauseNs",
}
// systemMetricsStorePath is where the host time-series is persisted between
// restarts. It lives next to the database so a single volume mount carries
// both. Only systemMetrics is persisted — node and xray series are cheap to
// rebuild and tied to live connections.
func systemMetricsStorePath() string {
return filepath.Join(config.GetDBFolderPath(), "system_metrics.gob")
}
// PersistSystemMetrics writes the host time-series to disk via a temp file +
// rename so a crash mid-write can't corrupt the previous snapshot. Called on a
// timer and at shutdown.
func PersistSystemMetrics() error {
path := systemMetricsStorePath()
tmp := path + ".tmp"
f, err := os.Create(tmp)
if err != nil {
return err
}
if err := gob.NewEncoder(f).Encode(systemMetrics.snapshot()); err != nil {
f.Close()
os.Remove(tmp)
return err
}
if err := f.Close(); err != nil {
os.Remove(tmp)
return err
}
return os.Rename(tmp, path)
}
// RestoreSystemMetrics loads a previously persisted host time-series on startup.
// A missing file is not an error (first boot). Aggregation already windows by
// time, so any gap from downtime is handled by the readers.
func RestoreSystemMetrics() {
path := systemMetricsStorePath()
f, err := os.Open(path)
if err != nil {
if !os.IsNotExist(err) {
logger.Warning("restore system metrics failed:", err)
}
return
}
defer f.Close()
var data map[string][]MetricSample
if err := gob.NewDecoder(f).Decode(&data); err != nil {
logger.Warning("decode system metrics failed:", err)
return
}
systemMetrics.restore(data)
}