добавил синхронизацию и изменил функцию синхронизации

This commit is contained in:
Дмитрий Саенко 2025-10-22 13:41:20 +03:00
parent 773e770cee
commit 20efe2258c

View file

@ -3,11 +3,11 @@
package service package service
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"net/url"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -319,12 +319,7 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo
s.xrayApi.Close() s.xrayApi.Close()
} }
body, err := json.Marshal(inbound) s.syncWithSlaves("POST", "/panel/api/inbounds/add", "application/x-www-form-urlencoded", inbound, nil)
if err != nil {
return inbound, false, err
}
s.syncWithSlaves("POST", "/panel/api/inbounds/add", bytes.NewReader(body))
return inbound, needRestart, err return inbound, needRestart, err
} }
@ -372,6 +367,12 @@ func (s *InboundService) DelInbound(id int) (bool, error) {
} }
} }
// syncWithSlaves is used to notify slaves about the deletion of an inbound configuration.
// The reason why we pass an empty string as the third parameter is that the third parameter
// is used to specify the HTTP body of the request, but in this case, we don't need to send
// any body in the request.
s.syncWithSlaves("POST", "/panel/api/inbounds/del/:id", "", nil, nil, strconv.Itoa(id)) //
return needRestart, db.Delete(model.Inbound{}, id).Error return needRestart, db.Delete(model.Inbound{}, id).Error
} }
@ -520,6 +521,8 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
} }
s.xrayApi.Close() s.xrayApi.Close()
s.syncWithSlaves("POST", "/panel/api/inbounds/update", "application/x-www-form-urlencoded", inbound, nil, strconv.Itoa(inbound.Id))
return inbound, needRestart, tx.Save(oldInbound).Error return inbound, needRestart, tx.Save(oldInbound).Error
} }
@ -683,10 +686,7 @@ func (s *InboundService) AddInboundClient(data *model.Inbound) (bool, error) {
} }
s.xrayApi.Close() s.xrayApi.Close()
if err == nil { s.syncWithSlaves("POST", "/panel/inbounds/api/addClient", "application/www-form-urlencoded", data, nil)
body, _ := json.Marshal(data)
s.syncWithSlaves("POST", "/panel/inbound/api/addClient", bytes.NewReader(body))
}
return needRestart, tx.Save(oldInbound).Error return needRestart, tx.Save(oldInbound).Error
} }
@ -777,9 +777,7 @@ func (s *InboundService) DelInboundClient(inboundId int, clientId string) (bool,
} }
} }
if err == nil { s.syncWithSlaves(http.MethodPost, "/panel/inbounds/api", "application/www-form-urlencoded", nil, nil, strconv.Itoa(inboundId), "delClient", clientId)
s.syncWithSlaves("POST", fmt.Sprintf("/panel/inbound/api/%d/delClient/%s", inboundId, clientId), nil)
}
return needRestart, db.Save(oldInbound).Error return needRestart, db.Save(oldInbound).Error
} }
@ -957,10 +955,7 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin
needRestart = true needRestart = true
} }
if err == nil { s.syncWithSlaves("POST", "/panel/inbounds/api/updateClient", "application/www-form-urlencoded", data, nil, clientId)
body, _ := json.Marshal(data)
s.syncWithSlaves("POST", fmt.Sprintf("/panel/inbound/api/updateClient/%s", clientId), bytes.NewReader(body))
}
return needRestart, tx.Save(oldInbound).Error return needRestart, tx.Save(oldInbound).Error
} }
@ -2390,7 +2385,20 @@ func (s *InboundService) FilterAndSortClientEmails(emails []string) ([]string, [
return validEmails, extraEmails, nil return validEmails, extraEmails, nil
} }
func (s *InboundService) syncWithSlaves(method string, path string, body io.Reader) { // syncWithSlaves sends a request to all enabled servers to sync their data.
// It takes the HTTP method, path, content type, body data, query parameters, and any additional path parameters.
// It marshals the body data according to the content type, and sets the X-API-KEY header to the server's API key.
// If the request fails or the response status code is not OK, it logs a warning.
// It is intended to be used for syncing data between the main server and its slaves.
//
// Parameters:
// - method: the HTTP method to use (e.g. GET, POST, PUT, DELETE)
// - path: the path to send the request to (e.g. /clients, /servers)
// - contentType: the content type of the request body (e.g. application/json, application/x-www-form-urlencoded)
// - bodyData: the data to send in the request body (e.g. a JSON object, a URL-encoded string)
// - queryParams: a map of query parameters to send with the request (e.g. {"sort": "name", "order": "asc"})
// - params: any additional path parameters to send with the request (e.g. "123", "abc")
func (s *InboundService) syncWithSlaves(method string, path string, contentType string, bodyData any, queryParams map[string]string, params ...string) {
serverService := MultiServerService{} serverService := MultiServerService{}
servers, err := serverService.GetServers() servers, err := serverService.GetServers()
if err != nil { if err != nil {
@ -2403,15 +2411,63 @@ func (s *InboundService) syncWithSlaves(method string, path string, body io.Read
continue continue
} }
url := fmt.Sprintf("http://%s:%d%s", server.Address, server.Port, path) urlStr := fmt.Sprintf("http://%s:%d%s", server.Address, server.Port, path)
req, err := http.NewRequest(method, url, body)
if len(params) > 0 {
urlStr = fmt.Sprintf("%s/%s", urlStr, strings.Join(params, "/"))
}
if len(queryParams) > 0 {
values := urlValuesFromMap(queryParams)
urlStr = fmt.Sprintf("%s?%s", urlStr, values.Encode())
}
var bodyReader io.Reader
if bodyData != nil {
switch contentType {
case "application/json":
bodyBytes, err := json.Marshal(bodyData)
if err != nil {
logger.Warningf("Failed to marshal body for server %s: %v", server.Name, err)
continue
}
bodyReader = strings.NewReader(string(bodyBytes))
case "application/x-www-form-urlencoded":
form := url.Values{}
switch v := bodyData.(type) {
case map[string]string:
for key, value := range v {
form.Set(key, value)
}
case map[string]interface{}:
for key, value := range v {
form.Set(key, fmt.Sprintf("%v", value))
}
default:
logger.Warningf("Unsupported body type for form encoding on server %s", server.Name)
continue
}
bodyReader = strings.NewReader(form.Encode())
default:
logger.Warningf("Unsupported content type %s for server %s", contentType, server.Name)
continue
}
}
req, err := http.NewRequest(method, urlStr, bodyReader)
if err != nil { if err != nil {
logger.Warningf("Failed to create request for server %s: %v", server.Name, err) logger.Warningf("Failed to create request for server %s: %v", server.Name, err)
continue continue
} }
req.Header.Set("Content-Type", "application/json") if contentType != "" {
req.Header.Set("Api-Key", server.APIKey) req.Header.Set("Content-Type", contentType)
}
req.Header.Set("X-API-KEY", server.APIKey)
client := &http.Client{} client := &http.Client{}
resp, err := client.Do(req) resp, err := client.Do(req)
@ -2428,6 +2484,15 @@ func (s *InboundService) syncWithSlaves(method string, path string, body io.Read
} }
} }
// Вспомогательная функция для конвертации map → url.Values
func urlValuesFromMap(m map[string]string) url.Values {
values := url.Values{}
for k, v := range m {
values.Set(k, v)
}
return values
}
func (s *InboundService) DelInboundClientByEmail(inboundId int, email string) (bool, error) { func (s *InboundService) DelInboundClientByEmail(inboundId int, email string) (bool, error) {
oldInbound, err := s.GetInbound(inboundId) oldInbound, err := s.GetInbound(inboundId)
if err != nil { if err != nil {