From a5f962db8423f62228d70f36faee65ef7cedaa07 Mon Sep 17 00:00:00 2001 From: JetSprow Date: Wed, 29 Apr 2026 18:30:49 +1000 Subject: [PATCH] feat: release v3.0.0 risk telemetry --- agent/jboard-agent/Makefile | 2 +- agent/jboard-agent/README.md | 11 +- agent/jboard-agent/cmd/agent/main.go | 3 +- agent/jboard-agent/internal/config/config.go | 34 +- .../internal/probe/route_classify.go | 124 +++++++ .../internal/probe/route_classify_test.go | 35 ++ agent/jboard-agent/internal/probe/trace.go | 40 +- agent/jboard-agent/internal/probe/xraylog.go | 345 ++++++++++++++++++ .../internal/probe/xraylog_test.go | 63 ++++ package-lock.json | 4 +- package.json | 2 +- prisma/schema.prisma | 9 + scripts/install-jboard-agent.sh | 125 ++++++- scripts/upgrade-jboard-agent.sh | 150 +++++++- src/actions/admin/settings.ts | 23 ++ src/app/(admin)/admin/nodes/node-actions.tsx | 4 +- src/app/(admin)/admin/settings/page.tsx | 5 + .../(admin)/admin/settings/settings-form.tsx | 35 +- .../subscription-risk-geo-details.tsx | 4 +- .../_components/subscription-risk-table.tsx | 10 +- .../(admin)/admin/subscription-risk/page.tsx | 4 +- .../subscription-access-risk-section.tsx | 10 +- src/app/(user)/support/page.tsx | 2 +- src/app/api/agent/node-access/route.ts | 148 ++++++++ src/app/api/agent/trace/route.ts | 4 +- src/app/api/traces/route.ts | 4 +- src/lib/request-context.ts | 4 +- src/lib/route-classify.ts | 77 ++++ src/lib/trace-normalize.ts | 6 + src/services/subscription-risk-review.ts | 12 +- src/services/subscription-risk.ts | 148 +++++++- 31 files changed, 1367 insertions(+), 80 deletions(-) create mode 100644 agent/jboard-agent/internal/probe/route_classify.go create mode 100644 agent/jboard-agent/internal/probe/route_classify_test.go create mode 100644 agent/jboard-agent/internal/probe/xraylog.go create mode 100644 agent/jboard-agent/internal/probe/xraylog_test.go create mode 100644 src/app/api/agent/node-access/route.ts create mode 100644 src/lib/route-classify.ts diff --git a/agent/jboard-agent/Makefile b/agent/jboard-agent/Makefile index bd7a010..2f0de09 100644 --- a/agent/jboard-agent/Makefile +++ b/agent/jboard-agent/Makefile @@ -1,5 +1,5 @@ BINARY := jboard-agent -VERSION := 2.3.0 +VERSION := 3.0.0 LDFLAGS := -s -w .PHONY: build build-linux clean diff --git a/agent/jboard-agent/README.md b/agent/jboard-agent/README.md index 338a1fa..6f83b7d 100644 --- a/agent/jboard-agent/README.md +++ b/agent/jboard-agent/README.md @@ -1,11 +1,12 @@ # jboard-agent -`jboard-agent` 只负责节点探测上报: +`jboard-agent` 以旁路方式负责节点探测和可选的 Xray access log 风控上报: - 三网 TCP 延迟:`POST /api/agent/latency` - 三网路由跟踪:`POST /api/agent/trace` +- Xray access log 聚合:`POST /api/agent/node-access`,安装/升级脚本会自动探测并写入 `XRAY_ACCESS_LOG_PATH` -节点入站、客户端开通、暂停、删除、流量限制等配置均由 3x-ui 面板维护。J-Board 后端不向节点下发 Xray/Hy2 配置。 +节点入站、客户端开通、暂停、删除、流量限制等配置均由 3x-ui 面板维护。Agent 只读日志文件,不修改 3x-ui 配置、不重启 Xray。 ## 构建 @@ -29,12 +30,16 @@ AUTH_TOKEN=后台生成的探测Token \ | --- | --- | --- | | `LATENCY_INTERVAL` | `5m` | 延迟探测间隔,支持 `30s`、`5m` 或秒数 | | `TRACE_INTERVAL` | `30m` | 路由探测间隔,支持 `30m` 或秒数 | +| `XRAY_ACCESS_LOG_PATH` | 自动探测 | Xray access log 路径;安装/升级脚本会优先查找 `/usr/local/x-ui/access.log` 等常见路径,仍为空时禁用节点真实连接风控 | +| `XRAY_LOG_INTERVAL` | `1m` | 日志读取和聚合上报间隔 | +| `XRAY_LOG_STATE_FILE` | `/var/lib/jboard-agent/xray-log-state.json` | 日志 offset 状态文件 | +| `XRAY_LOG_START_AT_END` | `1` | 首次启动从文件末尾开始,避免上传历史巨量日志;设为 `0` 可从头读取 | 路由探测依赖 `nexttrace` 命令;延迟探测无需额外依赖。 ## systemd -推荐从 J-Board 后台节点页复制一键安装命令。该命令会下载 release 二进制、安装 `nexttrace`、写入 systemd 服务并启动。 +推荐从 J-Board 后台节点页复制一键安装命令。该命令会下载 release 二进制、安装 `nexttrace`、自动探测 3x-ui/Xray access log、写入 systemd 服务并启动。 ## 延迟算法 diff --git a/agent/jboard-agent/cmd/agent/main.go b/agent/jboard-agent/cmd/agent/main.go index ae925dd..ae2d524 100644 --- a/agent/jboard-agent/cmd/agent/main.go +++ b/agent/jboard-agent/cmd/agent/main.go @@ -12,7 +12,7 @@ import ( "github.com/jboard/jboard-agent/internal/probe" ) -const version = "2.3.0" +const version = "3.0.0" func main() { debug.SetGCPercent(50) @@ -25,6 +25,7 @@ func main() { go probe.LatencyLoop(ctx, cfg) go probe.TraceLoop(ctx, cfg) + go probe.XrayAccessLogLoop(ctx, cfg) sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) diff --git a/agent/jboard-agent/internal/config/config.go b/agent/jboard-agent/internal/config/config.go index 92d9563..3fb3038 100644 --- a/agent/jboard-agent/internal/config/config.go +++ b/agent/jboard-agent/internal/config/config.go @@ -4,6 +4,7 @@ import ( "log" "os" "strconv" + "strings" "time" ) @@ -13,14 +14,23 @@ type Config struct { LatencyInterval time.Duration TraceInterval time.Duration + + XrayAccessLogPath string + XrayLogStateFile string + XrayLogInterval time.Duration + XrayLogStartAtEnd bool } func Load() *Config { cfg := &Config{ - ServerURL: envOrDefault("SERVER_URL", ""), - AuthToken: envOrDefault("AUTH_TOKEN", ""), - LatencyInterval: envDuration("LATENCY_INTERVAL", 5*time.Minute), - TraceInterval: envDuration("TRACE_INTERVAL", 30*time.Minute), + ServerURL: envOrDefault("SERVER_URL", ""), + AuthToken: envOrDefault("AUTH_TOKEN", ""), + LatencyInterval: envDuration("LATENCY_INTERVAL", 5*time.Minute), + TraceInterval: envDuration("TRACE_INTERVAL", 30*time.Minute), + XrayAccessLogPath: envOrDefault("XRAY_ACCESS_LOG_PATH", ""), + XrayLogStateFile: envOrDefault("XRAY_LOG_STATE_FILE", "/var/lib/jboard-agent/xray-log-state.json"), + XrayLogInterval: envDuration("XRAY_LOG_INTERVAL", time.Minute), + XrayLogStartAtEnd: envBool("XRAY_LOG_START_AT_END", true), } if cfg.ServerURL == "" || cfg.AuthToken == "" { @@ -53,3 +63,19 @@ func envDuration(key string, fallback time.Duration) time.Duration { return fallback } + +func envBool(key string, fallback bool) bool { + v := os.Getenv(key) + if v == "" { + return fallback + } + + switch strings.ToLower(strings.TrimSpace(v)) { + case "1", "true", "yes", "y", "on": + return true + case "0", "false", "no", "n", "off": + return false + default: + return fallback + } +} diff --git a/agent/jboard-agent/internal/probe/route_classify.go b/agent/jboard-agent/internal/probe/route_classify.go new file mode 100644 index 0000000..84b42ee --- /dev/null +++ b/agent/jboard-agent/internal/probe/route_classify.go @@ -0,0 +1,124 @@ +package probe + +import ( + "regexp" + "strings" +) + +var asnPattern = regexp.MustCompile(`(?i)(?:^|\b)AS?\s*(\d{2,10})(?:\b|$)`) + +func normalizeRouteText(value string) string { + return strings.ToUpper(strings.TrimSpace(value)) +} + +func normalizeASN(value string) string { + text := normalizeRouteText(value) + if text == "" { + return "" + } + match := asnPattern.FindStringSubmatch(text) + if len(match) > 1 { + return match[1] + } + for _, r := range text { + if r < '0' || r > '9' { + return "" + } + } + return text +} + +func hasASN(asns map[string]struct{}, values ...string) bool { + for _, value := range values { + if _, ok := asns[value]; ok { + return true + } + } + return false +} + +func hasText(combined string, values ...string) bool { + for _, value := range values { + if strings.Contains(combined, value) { + return true + } + } + return false +} + +func hasIPPrefix(ips []string, prefixes ...string) bool { + for _, ip := range ips { + for _, prefix := range prefixes { + if strings.HasPrefix(ip, prefix) { + return true + } + } + } + return false +} + +func detectSummary(hops []hopDetail) string { + var texts []string + var ips []string + asns := make(map[string]struct{}) + + for _, hop := range hops { + if hop.IP != "" && hop.IP != "*" { + ips = append(ips, hop.IP) + } + parts := []string{hop.Geo, hop.ASN, hop.Owner, hop.ISP} + text := normalizeRouteText(strings.Join(parts, " ")) + texts = append(texts, text) + + if asn := normalizeASN(hop.ASN); asn != "" { + asns[asn] = struct{}{} + } + for _, match := range asnPattern.FindAllStringSubmatch(text, -1) { + if len(match) > 1 { + asns[match[1]] = struct{}{} + } + } + } + + combined := strings.Join(texts, " ") + cn2Evidence := hasASN(asns, "4809") || + hasIPPrefix(ips, "59.43.") || + hasText(combined, "CN2", "CTGNET", "CHINANET NEXT CARRYING NETWORK", "CHINA TELECOM GLOBAL") + cn2GIAText := hasText(combined, "CN2 GIA", "CN2GIA", "GIA", "GLOBAL INTERNET ACCESS") + ordinaryTelecomHops := 0 + for _, text := range texts { + if strings.Contains(text, "AS4134") || + strings.Contains(text, "CHINANET BACKBONE") || + strings.Contains(text, "CHINANET 163") || + strings.Contains(text, "163骨干") { + ordinaryTelecomHops++ + } + } + for _, ip := range ips { + if strings.HasPrefix(ip, "202.97.") { + ordinaryTelecomHops++ + } + } + + if cn2Evidence { + if cn2GIAText || ordinaryTelecomHops <= 1 { + return "CN2 GIA" + } + return "CN2 GT" + } + + if hasASN(asns, "9929", "10099") || hasText(combined, "CUII", "A网", "AS9929") { + return "AS9929" + } + if hasText(combined, "CMIN2") || hasASN(asns, "58807", "58809", "58813", "58819", "59807") { + return "CMIN2" + } + if hasText(combined, "CMI") || hasASN(asns, "58453") { + return "CMI" + } + if hasASN(asns, "4837") || hasText(combined, "AS4837") { + return "AS4837" + } + + return "普通线路" +} diff --git a/agent/jboard-agent/internal/probe/route_classify_test.go b/agent/jboard-agent/internal/probe/route_classify_test.go new file mode 100644 index 0000000..e3bab3d --- /dev/null +++ b/agent/jboard-agent/internal/probe/route_classify_test.go @@ -0,0 +1,35 @@ +package probe + +import "testing" + +func TestDetectSummaryCN2GIAFromAS4809And59_43(t *testing.T) { + hops := []hopDetail{ + {Hop: 1, IP: "*"}, + {Hop: 2, IP: "59.43.246.237", ASN: "AS4809", Geo: "中国 上海 China Telecom CN2"}, + {Hop: 3, IP: "219.141.136.12", ASN: "AS4134", Geo: "中国 北京 电信"}, + } + if got := detectSummary(hops); got != "CN2 GIA" { + t.Fatalf("detectSummary() = %q, want CN2 GIA", got) + } +} + +func TestDetectSummaryCN2GTWhenCN2FallsBackTo163(t *testing.T) { + hops := []hopDetail{ + {Hop: 1, IP: "59.43.248.1", ASN: "AS4809", Geo: "CN2"}, + {Hop: 2, IP: "202.97.12.1", ASN: "AS4134", Geo: "CHINANET BACKBONE"}, + {Hop: 3, IP: "202.97.18.1", ASN: "AS4134", Geo: "CHINANET BACKBONE"}, + } + if got := detectSummary(hops); got != "CN2 GT" { + t.Fatalf("detectSummary() = %q, want CN2 GT", got) + } +} + +func TestDetectSummaryCMIN2BeforeCMI(t *testing.T) { + hops := []hopDetail{ + {Hop: 2, IP: "223.120.20.1", ASN: "AS58807", Geo: "CMIN2 China Mobile"}, + {Hop: 3, IP: "211.136.25.153", ASN: "AS9808", Geo: "CMI Mobile"}, + } + if got := detectSummary(hops); got != "CMIN2" { + t.Fatalf("detectSummary() = %q, want CMIN2", got) + } +} diff --git a/agent/jboard-agent/internal/probe/trace.go b/agent/jboard-agent/internal/probe/trace.go index a107fd4..0905e8e 100644 --- a/agent/jboard-agent/internal/probe/trace.go +++ b/agent/jboard-agent/internal/probe/trace.go @@ -27,6 +27,9 @@ type hopDetail struct { IP string `json:"ip"` Geo string `json:"geo"` Latency float64 `json:"latency"` + ASN string `json:"asn,omitempty"` + Owner string `json:"owner,omitempty"` + ISP string `json:"isp,omitempty"` } type traceResult struct { @@ -129,7 +132,6 @@ func runTrace(ip string) ([]hopDetail, string, error) { } var hops []hopDetail - var asnumbers []string for i, hopGroup := range parsed.Hops { hop := hopDetail{Hop: i + 1} for _, probe := range hopGroup { @@ -151,9 +153,9 @@ func runTrace(ip string) ([]hopDetail, string, error) { parts = append(parts, probe.Geo.Owner) } hop.Geo = strings.Join(parts, " ") - if probe.Geo.Asnumber != "" { - asnumbers = append(asnumbers, probe.Geo.Asnumber) - } + hop.ASN = probe.Geo.Asnumber + hop.Owner = probe.Geo.Owner + hop.ISP = probe.Geo.Isp } break } @@ -167,34 +169,6 @@ func runTrace(ip string) ([]hopDetail, string, error) { hops[0].Geo = "" } - summary := detectSummary(hops, asnumbers) + summary := detectSummary(hops) return hops, summary, nil } - -func detectSummary(hops []hopDetail, asnumbers []string) string { - combined := "" - for _, h := range hops { - combined += " " + strings.ToUpper(h.Geo) - } - asSet := "" - for _, asn := range asnumbers { - asSet += " " + asn - } - - switch { - case strings.Contains(combined, "CN2") && strings.Contains(combined, "GIA"): - return "CN2 GIA" - case strings.Contains(combined, "CN2"): - return "CN2 GT" - case strings.Contains(asSet, "9929") || strings.Contains(combined, "CUII") || strings.Contains(combined, "A网"): - return "AS9929" - case strings.Contains(asSet, "4837"): - return "AS4837" - case strings.Contains(combined, "CMI") || strings.Contains(asSet, "58453"): - return "CMI" - case strings.Contains(combined, "CMIN2") || strings.Contains(asSet, "59807"): - return "CMIN2" - default: - return "普通线路" - } -} diff --git a/agent/jboard-agent/internal/probe/xraylog.go b/agent/jboard-agent/internal/probe/xraylog.go new file mode 100644 index 0000000..0a84a3b --- /dev/null +++ b/agent/jboard-agent/internal/probe/xraylog.go @@ -0,0 +1,345 @@ +package probe + +import ( + "context" + "encoding/json" + "errors" + "log" + "net" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" + "syscall" + "time" + + "github.com/jboard/jboard-agent/internal/config" +) + +const maxXrayReadBytes int64 = 2 * 1024 * 1024 +const maxXrayEventsPerPush = 300 + +var xrayAccessLinePattern = regexp.MustCompile(`^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})\s+(\S+)\s+(accepted|rejected)\s+(?:(tcp|udp):)?(\S+)\s+\[([^\]]+)\](?:.*?\bemail:\s*([^\s]+))?`) + +type xrayLogState struct { + Path string `json:"path"` + Inode uint64 `json:"inode"` + Offset int64 `json:"offset"` +} + +type nodeAccessEvent struct { + ClientEmail string `json:"clientEmail"` + SourceIP string `json:"sourceIp"` + InboundTag string `json:"inboundTag,omitempty"` + Network string `json:"network,omitempty"` + TargetHost string `json:"targetHost,omitempty"` + TargetPort int `json:"targetPort,omitempty"` + Action string `json:"action"` + ConnectionCount int `json:"connectionCount"` + UniqueTargetCount int `json:"uniqueTargetCount,omitempty"` + FirstSeenAt string `json:"firstSeenAt,omitempty"` + LastSeenAt string `json:"lastSeenAt,omitempty"` +} + +type nodeAccessPayload struct { + Events []nodeAccessEvent `json:"events"` +} + +type parsedXrayAccess struct { + ClientEmail string + SourceIP string + InboundTag string + Network string + TargetHost string + TargetPort int + Action string + SeenAt time.Time +} + +type accessAggregate struct { + event nodeAccessEvent + targets map[string]struct{} +} + +func XrayAccessLogLoop(ctx context.Context, cfg *config.Config) { + if strings.TrimSpace(cfg.XrayAccessLogPath) == "" { + log.Println("[xray-log] disabled; set XRAY_ACCESS_LOG_PATH to enable node access risk telemetry") + return + } + + ticker := time.NewTicker(cfg.XrayLogInterval) + defer ticker.Stop() + + collectAndPushXrayLogs(cfg) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + collectAndPushXrayLogs(cfg) + } + } +} + +func collectAndPushXrayLogs(cfg *config.Config) { + events, state, err := readNewXrayEvents(cfg) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + log.Printf("[xray-log] read error: %v", err) + } + return + } + if len(events) == 0 { + if err := saveXrayLogState(cfg.XrayLogStateFile, state); err != nil { + log.Printf("[xray-log] state save error: %v", err) + } + return + } + + payload := nodeAccessPayload{Events: events} + body, _ := json.Marshal(payload) + if err := postToServer(cfg, "/api/agent/node-access", body); err != nil { + log.Printf("[xray-log] push error: %v", err) + return + } + if err := saveXrayLogState(cfg.XrayLogStateFile, state); err != nil { + log.Printf("[xray-log] state save error: %v", err) + } + log.Printf("[xray-log] pushed %d aggregate access events", len(events)) +} + +func readNewXrayEvents(cfg *config.Config) ([]nodeAccessEvent, xrayLogState, error) { + path := strings.TrimSpace(cfg.XrayAccessLogPath) + info, err := os.Stat(path) + if err != nil { + return nil, xrayLogState{}, err + } + inode := fileInode(info) + state := loadXrayLogState(cfg.XrayLogStateFile) + + if state.Path != path || state.Inode != inode || state.Offset > info.Size() { + state = xrayLogState{Path: path, Inode: inode} + if cfg.XrayLogStartAtEnd { + state.Offset = info.Size() + } + } + + if info.Size() <= state.Offset { + return nil, state, nil + } + + readBytes := info.Size() - state.Offset + if readBytes > maxXrayReadBytes { + readBytes = maxXrayReadBytes + } + + file, err := os.Open(path) + if err != nil { + return nil, state, err + } + defer file.Close() + + buf := make([]byte, readBytes) + n, err := file.ReadAt(buf, state.Offset) + if err != nil && n == 0 { + return nil, state, err + } + data := string(buf[:n]) + consumed := int64(n) + if lastNewline := strings.LastIndexByte(data, '\n'); lastNewline >= 0 && lastNewline < len(data)-1 { + data = data[:lastNewline+1] + consumed = int64(len(data)) + } + state.Offset += consumed + + events := aggregateXrayAccessLines(strings.Split(data, "\n")) + if len(events) > maxXrayEventsPerPush { + events = events[:maxXrayEventsPerPush] + } + + return events, state, nil +} + +func loadXrayLogState(path string) xrayLogState { + data, err := os.ReadFile(path) + if err != nil { + return xrayLogState{} + } + var state xrayLogState + if err := json.Unmarshal(data, &state); err != nil { + return xrayLogState{} + } + return state +} + +func saveXrayLogState(path string, state xrayLogState) error { + if strings.TrimSpace(path) == "" { + return nil + } + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return err + } + data, _ := json.Marshal(state) + return os.WriteFile(path, data, 0o600) +} + +func fileInode(info os.FileInfo) uint64 { + stat, ok := info.Sys().(*syscall.Stat_t) + if !ok || stat == nil { + return 0 + } + return uint64(stat.Ino) +} + +func aggregateXrayAccessLines(lines []string) []nodeAccessEvent { + aggregates := make(map[string]*accessAggregate) + var order []string + + for _, line := range lines { + parsed, ok := parseXrayAccessLine(line) + if !ok || parsed.ClientEmail == "" || parsed.SourceIP == "" { + continue + } + key := strings.Join([]string{ + parsed.ClientEmail, + parsed.SourceIP, + parsed.InboundTag, + parsed.Network, + parsed.Action, + }, "|") + + agg, ok := aggregates[key] + if !ok { + agg = &accessAggregate{ + event: nodeAccessEvent{ + ClientEmail: parsed.ClientEmail, + SourceIP: parsed.SourceIP, + InboundTag: parsed.InboundTag, + Network: parsed.Network, + TargetHost: parsed.TargetHost, + TargetPort: parsed.TargetPort, + Action: parsed.Action, + ConnectionCount: 0, + FirstSeenAt: parsed.SeenAt.Format(time.RFC3339), + LastSeenAt: parsed.SeenAt.Format(time.RFC3339), + }, + targets: make(map[string]struct{}), + } + aggregates[key] = agg + order = append(order, key) + } + + agg.event.ConnectionCount++ + if parsed.TargetHost != "" { + agg.targets[parsed.TargetHost] = struct{}{} + } + if parsed.SeenAt.After(parseRFC3339OrZero(agg.event.LastSeenAt)) { + agg.event.LastSeenAt = parsed.SeenAt.Format(time.RFC3339) + } + } + + events := make([]nodeAccessEvent, 0, len(order)) + for _, key := range order { + agg := aggregates[key] + agg.event.UniqueTargetCount = len(agg.targets) + events = append(events, agg.event) + } + return events +} + +func parseXrayAccessLine(line string) (parsedXrayAccess, bool) { + line = strings.TrimSpace(line) + if line == "" { + return parsedXrayAccess{}, false + } + match := xrayAccessLinePattern.FindStringSubmatch(line) + if len(match) == 0 { + return parsedXrayAccess{}, false + } + + seenAt, err := time.ParseInLocation("2006/01/02 15:04:05", match[1], time.Local) + if err != nil { + seenAt = time.Now() + } + network := strings.ToLower(match[4]) + targetHost, targetPort := splitTarget(match[5]) + if network == "" { + network = inferNetwork(match[5]) + } + + return parsedXrayAccess{ + ClientEmail: strings.TrimSpace(match[7]), + SourceIP: stripPort(match[2]), + InboundTag: normalizeInboundTag(match[6]), + Network: network, + TargetHost: targetHost, + TargetPort: targetPort, + Action: strings.ToLower(match[3]), + SeenAt: seenAt, + }, true +} + +func normalizeInboundTag(value string) string { + parts := strings.Split(value, ">>") + return strings.TrimSpace(parts[0]) +} + +func inferNetwork(target string) string { + if strings.HasPrefix(strings.ToLower(target), "udp:") { + return "udp" + } + return "tcp" +} + +func splitTarget(value string) (string, int) { + value = trimTransportPrefix(strings.TrimSpace(value)) + if host, port, err := net.SplitHostPort(value); err == nil { + return strings.Trim(host, "[]"), atoiOrZero(port) + } + idx := strings.LastIndex(value, ":") + if idx > 0 && idx < len(value)-1 { + port := atoiOrZero(value[idx+1:]) + if port > 0 { + return strings.Trim(value[:idx], "[]"), port + } + } + return strings.Trim(value, "[]"), 0 +} + +func stripPort(value string) string { + value = trimTransportPrefix(strings.TrimSpace(value)) + if host, _, err := net.SplitHostPort(value); err == nil { + return strings.Trim(host, "[]") + } + idx := strings.LastIndex(value, ":") + if idx > 0 && idx < len(value)-1 && atoiOrZero(value[idx+1:]) > 0 { + return strings.Trim(value[:idx], "[]") + } + return strings.Trim(value, "[]") +} + +func trimTransportPrefix(value string) string { + lower := strings.ToLower(value) + if strings.HasPrefix(lower, "tcp:") || strings.HasPrefix(lower, "udp:") { + return value[4:] + } + return value +} + +func atoiOrZero(value string) int { + n, err := strconv.Atoi(value) + if err != nil { + return 0 + } + return n +} + +func parseRFC3339OrZero(value string) time.Time { + parsed, err := time.Parse(time.RFC3339, value) + if err != nil { + return time.Time{} + } + return parsed +} diff --git a/agent/jboard-agent/internal/probe/xraylog_test.go b/agent/jboard-agent/internal/probe/xraylog_test.go new file mode 100644 index 0000000..f851757 --- /dev/null +++ b/agent/jboard-agent/internal/probe/xraylog_test.go @@ -0,0 +1,63 @@ +package probe + +import "testing" + +func TestParseXrayAccessLine(t *testing.T) { + line := "2026/04/29 10:11:12 203.0.113.9:51820 accepted tcp:example.com:443 [proxy-in >> freedom] email: user@example.com-cabc1234" + got, ok := parseXrayAccessLine(line) + if !ok { + t.Fatal("parseXrayAccessLine() failed") + } + if got.SourceIP != "203.0.113.9" { + t.Fatalf("SourceIP = %q", got.SourceIP) + } + if got.ClientEmail != "user@example.com-cabc1234" { + t.Fatalf("ClientEmail = %q", got.ClientEmail) + } + if got.InboundTag != "proxy-in" { + t.Fatalf("InboundTag = %q", got.InboundTag) + } + if got.Network != "tcp" || got.TargetHost != "example.com" || got.TargetPort != 443 { + t.Fatalf("target = %s %s:%d", got.Network, got.TargetHost, got.TargetPort) + } +} + +func TestParseXrayAccessLineWithTransportPrefixedSource(t *testing.T) { + line := "2026/04/29 10:11:12 tcp:203.0.113.9:51820 accepted tcp:example.com:443 [proxy] email: user@example.com-cabc1234" + got, ok := parseXrayAccessLine(line) + if !ok { + t.Fatal("parseXrayAccessLine() failed") + } + if got.SourceIP != "203.0.113.9" { + t.Fatalf("SourceIP = %q", got.SourceIP) + } +} + +func TestParseXrayAccessLineWithIPv6Source(t *testing.T) { + line := "2026/04/29 10:11:12 tcp:[2001:db8::1]:51820 accepted tcp:example.com:443 [proxy] email: user@example.com-cabc1234" + got, ok := parseXrayAccessLine(line) + if !ok { + t.Fatal("parseXrayAccessLine() failed") + } + if got.SourceIP != "2001:db8::1" { + t.Fatalf("SourceIP = %q", got.SourceIP) + } +} + +func TestAggregateXrayAccessLines(t *testing.T) { + lines := []string{ + "2026/04/29 10:11:12 203.0.113.9:51820 accepted tcp:example.com:443 [proxy] email: user@example.com-cabc1234", + "2026/04/29 10:11:13 203.0.113.9:51821 accepted tcp:openai.com:443 [proxy] email: user@example.com-cabc1234", + "2026/04/29 10:11:14 198.51.100.2:51821 accepted udp:1.1.1.1:53 [proxy] email: user@example.com-cabc1234", + } + events := aggregateXrayAccessLines(lines) + if len(events) != 2 { + t.Fatalf("len(events) = %d, want 2", len(events)) + } + if events[0].ConnectionCount != 2 || events[0].UniqueTargetCount != 2 { + t.Fatalf("first aggregate = %+v", events[0]) + } + if events[1].Network != "udp" || events[1].TargetPort != 53 { + t.Fatalf("second aggregate = %+v", events[1]) + } +} diff --git a/package-lock.json b/package-lock.json index 5a12688..89a5bdf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "j-board", - "version": "0.1.0", + "version": "3.0.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "j-board", - "version": "0.1.0", + "version": "3.0.0", "dependencies": { "@base-ui/react": "^1.4.1", "@marsidev/react-turnstile": "^1.5.0", diff --git a/package.json b/package.json index af8881c..c0d3180 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "j-board", - "version": "0.1.0", + "version": "3.0.0", "private": true, "scripts": { "dev": "next dev", diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 9a52c07..cd8bbf9 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -53,6 +53,10 @@ enum SubscriptionRiskReason { REGION_VARIANCE_SUSPEND COUNTRY_VARIANCE_WARNING COUNTRY_VARIANCE_SUSPEND + NODE_ACCESS_VOLUME_WARNING + NODE_ACCESS_VOLUME_SUSPEND + NODE_ACCESS_TARGET_WARNING + NODE_ACCESS_TARGET_SUSPEND } enum SubscriptionRiskReviewStatus { @@ -775,6 +779,11 @@ model AppConfig { subscriptionRiskCountrySuspend Int @default(3) subscriptionRiskIpLimitPerHour Int @default(180) subscriptionRiskTokenLimitPerHour Int @default(60) + nodeAccessRiskEnabled Boolean @default(true) + nodeAccessConnectionWarning Int @default(180) + nodeAccessConnectionSuspend Int @default(360) + nodeAccessUniqueTargetWarning Int @default(80) + nodeAccessUniqueTargetSuspend Int @default(160) inviteRewardCouponId String? inviteRewardRate Decimal @default(0) @db.Decimal(5, 2) inviteRewardEnabled Boolean @default(false) diff --git a/scripts/install-jboard-agent.sh b/scripts/install-jboard-agent.sh index 3966521..39e0339 100755 --- a/scripts/install-jboard-agent.sh +++ b/scripts/install-jboard-agent.sh @@ -8,6 +8,10 @@ SERVICE_NAME="${SERVICE_NAME:-jboard-agent}" ENV_FILE="${ENV_FILE:-/etc/jboard-agent.env}" LATENCY_INTERVAL="${LATENCY_INTERVAL:-5m}" TRACE_INTERVAL="${TRACE_INTERVAL:-30m}" +XRAY_ACCESS_LOG_PATH="${XRAY_ACCESS_LOG_PATH:-}" +XRAY_LOG_INTERVAL="${XRAY_LOG_INTERVAL:-1m}" +XRAY_LOG_STATE_FILE="${XRAY_LOG_STATE_FILE:-/var/lib/jboard-agent/xray-log-state.json}" +XRAY_LOG_START_AT_END="${XRAY_LOG_START_AT_END:-1}" INSTALL_NEXTTRACE="${INSTALL_NEXTTRACE:-1}" TMP_DIR="$(mktemp -d)" ARCH="$(uname -m)" @@ -35,6 +39,16 @@ run_as_root() { fi } +run_as_root_output() { + if [ "$(id -u)" -eq 0 ]; then + "$@" + elif command -v sudo >/dev/null 2>&1; then + sudo "$@" + else + return 1 + fi +} + detect_asset() { case "$ARCH" in x86_64|amd64) @@ -61,6 +75,77 @@ resolve_release_tag() { | head -n 1 } +detect_xray_access_log() { + if [ -n "$XRAY_ACCESS_LOG_PATH" ]; then + printf '%s\n' "$XRAY_ACCESS_LOG_PATH" + return 0 + fi + + for candidate in \ + /usr/local/x-ui/access.log \ + /usr/local/x-ui/bin/access.log \ + /usr/local/x-ui/xray/access.log \ + /etc/x-ui/access.log \ + /etc/x-ui/xray/access.log \ + /var/log/xray/access.log \ + /var/log/x-ui/access.log \ + /opt/3x-ui/access.log \ + /opt/x-ui/access.log; do + if run_as_root_output test -f "$candidate" 2>/dev/null; then + printf '%s\n' "$candidate" + return 0 + fi + done + + for root in /usr/local /etc /var/log /opt /var/lib/docker/volumes; do + if ! run_as_root_output test -d "$root" 2>/dev/null; then + continue + fi + while IFS= read -r candidate; do + case "$candidate" in + *x-ui*|*3x-ui*|*xray*|*Xray*) + printf '%s\n' "$candidate" + return 0 + ;; + esac + done < <(run_as_root_output find "$root" -type f \( -name 'access.log' -o -name '*xray*.log' \) 2>/dev/null | head -n 50) + done + + return 1 +} + +prepare_xray_access_log() { + local detected="" + detected="$(detect_xray_access_log || true)" + if [ -z "$detected" ]; then + XRAY_ACCESS_LOG_PATH="" + return 1 + fi + + XRAY_ACCESS_LOG_PATH="$detected" + run_as_root mkdir -p "$(dirname "$XRAY_LOG_STATE_FILE")" + if run_as_root_output test -f "$XRAY_ACCESS_LOG_PATH" 2>/dev/null; then + run_as_root chmod a+r "$XRAY_ACCESS_LOG_PATH" 2>/dev/null || true + fi + return 0 +} + +print_xray_log_hint() { + cat <<'HINT' + +Xray access log was not found automatically, so node access risk telemetry is disabled for now. +To enable it, open 3x-ui panel -> Xray Config and set: + +"log": { + "access": "/usr/local/x-ui/access.log", + "error": "/usr/local/x-ui/error.log", + "loglevel": "warning" +} + +Then restart x-ui and rerun this installer, or add XRAY_ACCESS_LOG_PATH=/usr/local/x-ui/access.log to /etc/jboard-agent.env. +HINT +} + ASSET="$(detect_asset)" RESOLVED_TAG="$(resolve_release_tag)" @@ -73,12 +158,12 @@ DOWNLOAD_BASE="https://github.com/${GH_REPO}/releases/download/${RESOLVED_TAG}" DOWNLOAD_URL="${DOWNLOAD_BASE}/${ASSET}" CHECKSUM_URL="${DOWNLOAD_BASE}/SHA256SUMS" -echo "[1/8] Release tag: ${RESOLVED_TAG}" -echo "[2/8] Downloading probe agent binary: ${ASSET}" +echo "[1/9] Release tag: ${RESOLVED_TAG}" +echo "[2/9] Downloading probe agent binary: ${ASSET}" curl -fsSL "$DOWNLOAD_URL" -o "$TMP_DIR/$ASSET" if curl -fsSL "$CHECKSUM_URL" -o "$TMP_DIR/SHA256SUMS" 2>/dev/null; then - echo "[3/8] Verifying checksum..." + echo "[3/9] Verifying checksum..." grep " ${ASSET}$" "$TMP_DIR/SHA256SUMS" > "$TMP_DIR/SHA256SUMS.current" ( cd "$TMP_DIR" @@ -89,32 +174,43 @@ if curl -fsSL "$CHECKSUM_URL" -o "$TMP_DIR/SHA256SUMS" 2>/dev/null; then fi ) else - echo "[3/8] Checksum file not found; skipping verification." + echo "[3/9] Checksum file not found; skipping verification." fi -echo "[4/8] Installing binary..." +echo "[4/9] Installing binary..." run_as_root install -m 0755 "$TMP_DIR/$ASSET" "${INSTALL_DIR}/jboard-agent" -run_as_root mkdir -p /var/log/jboard +run_as_root mkdir -p /var/log/jboard /var/lib/jboard-agent if [ "$INSTALL_NEXTTRACE" = "1" ] && ! command -v nexttrace >/dev/null 2>&1; then - echo "[5/8] Installing nexttrace for route probing..." + echo "[5/9] Installing nexttrace for route probing..." curl -fsSL https://raw.githubusercontent.com/nxtrace/NTrace-core/main/nt_install.sh -o "$TMP_DIR/nt_install.sh" run_as_root bash "$TMP_DIR/nt_install.sh" else - echo "[5/8] nexttrace already installed or skipped." + echo "[5/9] nexttrace already installed or skipped." fi -echo "[6/8] Writing environment file..." +echo "[6/9] Detecting Xray access log..." +if prepare_xray_access_log; then + echo "Found Xray access log: ${XRAY_ACCESS_LOG_PATH}" +else + echo "Xray access log not found; continuing without node access risk telemetry." +fi + +echo "[7/9] Writing environment file..." ENV_TMP="$TMP_DIR/jboard-agent.env" { printf 'SERVER_URL=%q\n' "$SERVER_URL" printf 'AUTH_TOKEN=%q\n' "$AUTH_TOKEN" printf 'LATENCY_INTERVAL=%q\n' "$LATENCY_INTERVAL" printf 'TRACE_INTERVAL=%q\n' "$TRACE_INTERVAL" + printf 'XRAY_ACCESS_LOG_PATH=%q\n' "$XRAY_ACCESS_LOG_PATH" + printf 'XRAY_LOG_INTERVAL=%q\n' "$XRAY_LOG_INTERVAL" + printf 'XRAY_LOG_STATE_FILE=%q\n' "$XRAY_LOG_STATE_FILE" + printf 'XRAY_LOG_START_AT_END=%q\n' "$XRAY_LOG_START_AT_END" } > "$ENV_TMP" run_as_root install -m 0600 "$ENV_TMP" "$ENV_FILE" -echo "[7/8] Writing systemd service..." +echo "[8/9] Writing systemd service..." SERVICE_TMP="$TMP_DIR/${SERVICE_NAME}.service" cat > "$SERVICE_TMP" </dev/null 2>&1; then + sudo "$@" + else + return 1 + fi +} + detect_asset() { case "$ARCH" in x86_64|amd64) @@ -50,6 +65,119 @@ resolve_release_tag() { | head -n 1 } +get_env_value() { + local key="$1" + if ! run_as_root_output test -f "$ENV_FILE" 2>/dev/null; then + return 0 + fi + run_as_root_output grep -E "^${key}=" "$ENV_FILE" 2>/dev/null \ + | tail -n 1 \ + | cut -d= -f2- \ + | sed -e "s/^\'//" -e "s/\'$//" -e 's/^"//' -e 's/"$//' || true +} + +detect_xray_access_log() { + if [ -n "$XRAY_ACCESS_LOG_PATH" ]; then + printf '%s\n' "$XRAY_ACCESS_LOG_PATH" + return 0 + fi + + for candidate in \ + /usr/local/x-ui/access.log \ + /usr/local/x-ui/bin/access.log \ + /usr/local/x-ui/xray/access.log \ + /etc/x-ui/access.log \ + /etc/x-ui/xray/access.log \ + /var/log/xray/access.log \ + /var/log/x-ui/access.log \ + /opt/3x-ui/access.log \ + /opt/x-ui/access.log; do + if run_as_root_output test -f "$candidate" 2>/dev/null; then + printf '%s\n' "$candidate" + return 0 + fi + done + + for root in /usr/local /etc /var/log /opt /var/lib/docker/volumes; do + if ! run_as_root_output test -d "$root" 2>/dev/null; then + continue + fi + while IFS= read -r candidate; do + case "$candidate" in + *x-ui*|*3x-ui*|*xray*|*Xray*) + printf '%s\n' "$candidate" + return 0 + ;; + esac + done < <(run_as_root_output find "$root" -type f \( -name 'access.log' -o -name '*xray*.log' \) 2>/dev/null | head -n 50) + done + + return 1 +} + +upsert_env_value() { + local key="$1" + local value="$2" + local quoted="" + printf -v quoted %q "$value" + + if run_as_root_output test -f "$ENV_FILE" 2>/dev/null; then + run_as_root_output cat "$ENV_FILE" > "$TMP_DIR/env.current" + else + : > "$TMP_DIR/env.current" + fi + + if grep -qE "^${key}=" "$TMP_DIR/env.current"; then + sed -E "s|^${key}=.*|${key}=${quoted}|" "$TMP_DIR/env.current" > "$TMP_DIR/env.next" + else + cp "$TMP_DIR/env.current" "$TMP_DIR/env.next" + printf '%s=%s\n' "$key" "$quoted" >> "$TMP_DIR/env.next" + fi + run_as_root install -m 0600 "$TMP_DIR/env.next" "$ENV_FILE" +} + +configure_xray_log_env() { + local current="" + current="$(get_env_value XRAY_ACCESS_LOG_PATH)" + if [ -n "$current" ]; then + XRAY_ACCESS_LOG_PATH="$current" + fi + + local detected="" + detected="$(detect_xray_access_log || true)" + if [ -z "$detected" ]; then + return 1 + fi + + XRAY_ACCESS_LOG_PATH="$detected" + run_as_root mkdir -p "$(dirname "$XRAY_LOG_STATE_FILE")" + if run_as_root_output test -f "$XRAY_ACCESS_LOG_PATH" 2>/dev/null; then + run_as_root chmod a+r "$XRAY_ACCESS_LOG_PATH" 2>/dev/null || true + fi + + upsert_env_value XRAY_ACCESS_LOG_PATH "$XRAY_ACCESS_LOG_PATH" + upsert_env_value XRAY_LOG_INTERVAL "$XRAY_LOG_INTERVAL" + upsert_env_value XRAY_LOG_STATE_FILE "$XRAY_LOG_STATE_FILE" + upsert_env_value XRAY_LOG_START_AT_END "$XRAY_LOG_START_AT_END" + return 0 +} + +print_xray_log_hint() { + cat <<'HINT' + +Xray access log was not found automatically. Node access risk telemetry remains disabled. +To enable it, open 3x-ui panel -> Xray Config and set: + +"log": { + "access": "/usr/local/x-ui/access.log", + "error": "/usr/local/x-ui/error.log", + "loglevel": "warning" +} + +Then restart x-ui and rerun this upgrade script, or add XRAY_ACCESS_LOG_PATH=/usr/local/x-ui/access.log to /etc/jboard-agent.env. +HINT +} + ASSET="$(detect_asset)" RESOLVED_TAG="$(resolve_release_tag)" @@ -62,12 +190,12 @@ DOWNLOAD_BASE="https://github.com/${GH_REPO}/releases/download/${RESOLVED_TAG}" DOWNLOAD_URL="${DOWNLOAD_BASE}/${ASSET}" CHECKSUM_URL="${DOWNLOAD_BASE}/SHA256SUMS" -echo "[1/5] Release tag: ${RESOLVED_TAG}" -echo "[2/5] Downloading probe agent binary: ${ASSET}" +echo "[1/6] Release tag: ${RESOLVED_TAG}" +echo "[2/6] Downloading probe agent binary: ${ASSET}" curl -fsSL "$DOWNLOAD_URL" -o "$TMP_DIR/$ASSET" if curl -fsSL "$CHECKSUM_URL" -o "$TMP_DIR/SHA256SUMS" 2>/dev/null; then - echo "[3/5] Verifying checksum..." + echo "[3/6] Verifying checksum..." grep " ${ASSET}$" "$TMP_DIR/SHA256SUMS" > "$TMP_DIR/SHA256SUMS.current" ( cd "$TMP_DIR" @@ -78,14 +206,22 @@ if curl -fsSL "$CHECKSUM_URL" -o "$TMP_DIR/SHA256SUMS" 2>/dev/null; then fi ) else - echo "[3/5] Checksum file not found; skipping verification." + echo "[3/6] Checksum file not found; skipping verification." fi -echo "[4/5] Installing binary..." +echo "[4/6] Installing binary..." run_as_root install -m 0755 "$TMP_DIR/$ASSET" "${INSTALL_DIR}/jboard-agent" -run_as_root mkdir -p /var/log/jboard +run_as_root mkdir -p /var/log/jboard /var/lib/jboard-agent -echo "[5/5] Restarting service..." +echo "[5/6] Detecting Xray access log..." +if configure_xray_log_env; then + echo "Node access risk telemetry: enabled (${XRAY_ACCESS_LOG_PATH})" +else + echo "Node access risk telemetry: disabled" + print_xray_log_hint +fi + +echo "[6/6] Restarting service..." run_as_root systemctl daemon-reload run_as_root systemctl restart "$SERVICE_NAME" diff --git a/src/actions/admin/settings.ts b/src/actions/admin/settings.ts index 4eff8aa..c971f0f 100644 --- a/src/actions/admin/settings.ts +++ b/src/actions/admin/settings.ts @@ -38,6 +38,11 @@ const settingsSchema = z.object({ subscriptionRiskCountrySuspend: z.coerce.number().int().min(2).max(100).optional(), subscriptionRiskIpLimitPerHour: z.coerce.number().int().min(1).max(100000).optional(), subscriptionRiskTokenLimitPerHour: z.coerce.number().int().min(1).max(100000).optional(), + nodeAccessRiskEnabled: z.string().optional(), + nodeAccessConnectionWarning: z.coerce.number().int().min(1).max(100000).optional(), + nodeAccessConnectionSuspend: z.coerce.number().int().min(1).max(100000).optional(), + nodeAccessUniqueTargetWarning: z.coerce.number().int().min(1).max(100000).optional(), + nodeAccessUniqueTargetSuspend: z.coerce.number().int().min(1).max(100000).optional(), inviteRewardEnabled: z.string().optional(), inviteRewardRate: z.coerce.number().min(0).max(100).optional(), inviteRewardCouponId: z.string().trim().optional(), @@ -158,6 +163,18 @@ function buildSettingsUpdate(parsed: z.infer, current: Aw parsed.subscriptionRiskIpLimitPerHour ?? current.subscriptionRiskIpLimitPerHour, subscriptionRiskTokenLimitPerHour: parsed.subscriptionRiskTokenLimitPerHour ?? current.subscriptionRiskTokenLimitPerHour, + nodeAccessRiskEnabled: optionalBoolean( + parsed.nodeAccessRiskEnabled, + current.nodeAccessRiskEnabled, + ), + nodeAccessConnectionWarning: + parsed.nodeAccessConnectionWarning ?? current.nodeAccessConnectionWarning, + nodeAccessConnectionSuspend: + parsed.nodeAccessConnectionSuspend ?? current.nodeAccessConnectionSuspend, + nodeAccessUniqueTargetWarning: + parsed.nodeAccessUniqueTargetWarning ?? current.nodeAccessUniqueTargetWarning, + nodeAccessUniqueTargetSuspend: + parsed.nodeAccessUniqueTargetSuspend ?? current.nodeAccessUniqueTargetSuspend, inviteRewardEnabled: optionalBoolean(parsed.inviteRewardEnabled, current.inviteRewardEnabled), inviteRewardRate: parsed.inviteRewardRate ?? Number(current.inviteRewardRate), inviteRewardCouponId: parsed.inviteRewardCouponId || null, @@ -182,6 +199,12 @@ function buildSettingsUpdate(parsed: z.infer, current: Aw if (next.subscriptionRiskCountrySuspend < next.subscriptionRiskCountryWarning) { throw new Error("国家暂停阈值不能小于国家警告阈值"); } + if (next.nodeAccessConnectionSuspend < next.nodeAccessConnectionWarning) { + throw new Error("节点连接暂停阈值不能小于警告阈值"); + } + if (next.nodeAccessUniqueTargetSuspend < next.nodeAccessUniqueTargetWarning) { + throw new Error("节点目标数暂停阈值不能小于警告阈值"); + } if (next.smtpEnabled || next.emailVerificationRequired) { if (!next.smtpHost || !next.smtpPort || !next.smtpFromEmail) { diff --git a/src/app/(admin)/admin/nodes/node-actions.tsx b/src/app/(admin)/admin/nodes/node-actions.tsx index 79a0698..3676d79 100644 --- a/src/app/(admin)/admin/nodes/node-actions.tsx +++ b/src/app/(admin)/admin/nodes/node-actions.tsx @@ -91,7 +91,7 @@ export function NodeActions({ node, siteUrl }: { node: NodeActionValue; siteUrl: size="sm" variant="outline" title="撤销这个探测 Token?" - description="撤销后,延迟和线路探测程序将无法继续上报数据。" + description="撤销后,延迟、线路探测和节点日志风控程序将无法继续上报数据。" confirmLabel="撤销 Token" successMessage="探测 Token 已撤销" errorMessage="撤销失败" @@ -170,7 +170,7 @@ export function NodeActions({ node, siteUrl }: { node: NodeActionValue; siteUrl:

)}

- 此 Agent 仅用于 `/api/agent/latency` 和 `/api/agent/trace` 探测上报;节点客户端开通已改由 3x-ui 面板 API 处理。 + 此 Agent 用于 `/api/agent/latency`、`/api/agent/trace` 探测上报;安装脚本会自动查找 3x-ui/Xray access log,找到后启用节点日志风控。Agent 只读日志,不修改 3x-ui 配置。

diff --git a/src/app/(admin)/admin/settings/page.tsx b/src/app/(admin)/admin/settings/page.tsx index d0efe09..fecfd01 100644 --- a/src/app/(admin)/admin/settings/page.tsx +++ b/src/app/(admin)/admin/settings/page.tsx @@ -48,6 +48,11 @@ export default async function AdminSettingsPage() { subscriptionRiskCountrySuspend: config.subscriptionRiskCountrySuspend, subscriptionRiskIpLimitPerHour: config.subscriptionRiskIpLimitPerHour, subscriptionRiskTokenLimitPerHour: config.subscriptionRiskTokenLimitPerHour, + nodeAccessRiskEnabled: config.nodeAccessRiskEnabled, + nodeAccessConnectionWarning: config.nodeAccessConnectionWarning, + nodeAccessConnectionSuspend: config.nodeAccessConnectionSuspend, + nodeAccessUniqueTargetWarning: config.nodeAccessUniqueTargetWarning, + nodeAccessUniqueTargetSuspend: config.nodeAccessUniqueTargetSuspend, inviteRewardEnabled: config.inviteRewardEnabled, inviteRewardRate: Number(config.inviteRewardRate), inviteRewardCouponId: config.inviteRewardCouponId, diff --git a/src/app/(admin)/admin/settings/settings-form.tsx b/src/app/(admin)/admin/settings/settings-form.tsx index be9a366..c9397f6 100644 --- a/src/app/(admin)/admin/settings/settings-form.tsx +++ b/src/app/(admin)/admin/settings/settings-form.tsx @@ -37,6 +37,11 @@ interface AppConfig { subscriptionRiskCountrySuspend: number; subscriptionRiskIpLimitPerHour: number; subscriptionRiskTokenLimitPerHour: number; + nodeAccessRiskEnabled: boolean; + nodeAccessConnectionWarning: number; + nodeAccessConnectionSuspend: number; + nodeAccessUniqueTargetWarning: number; + nodeAccessUniqueTargetSuspend: number; inviteRewardEnabled: boolean; inviteRewardRate: number; inviteRewardCouponId: string | null; @@ -389,9 +394,37 @@ export function SettingsForm({ config, coupons }: { config: AppConfig; coupons: defaultValue={config.subscriptionRiskTokenLimitPerHour} /> +
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +

- 默认值对应原规则:24 小时内 4 城市警告、5 城市暂停;2 省/地区警告、3 省/地区暂停;2 国家警告、3 国家暂停;IP 180 次/小时,订阅 60 次/小时。 + 默认值对应原规则:24 小时内 4 城市警告、5 城市暂停;2 省/地区警告、3 省/地区暂停;2 国家警告、3 国家暂停;IP 180 次/小时,订阅 60 次/小时。节点日志风控只在 Agent 配置 XRAY_ACCESS_LOG_PATH 后生效;连接数和不同目标数按 Agent 单次聚合窗口计算。

)} diff --git a/src/app/(admin)/admin/subscription-risk/_components/subscription-risk-geo-details.tsx b/src/app/(admin)/admin/subscription-risk/_components/subscription-risk-geo-details.tsx index 6c19842..532f09c 100644 --- a/src/app/(admin)/admin/subscription-risk/_components/subscription-risk-geo-details.tsx +++ b/src/app/(admin)/admin/subscription-risk/_components/subscription-risk-geo-details.tsx @@ -44,7 +44,7 @@ function WorldRiskMap({ summary }: { summary: SubscriptionRiskGeoSummary }) { viewBox="0 0 360 180" className="h-[15rem] w-full bg-[radial-gradient(circle_at_30%_20%,color-mix(in_oklch,var(--primary)_10%,transparent),transparent_30%),linear-gradient(135deg,var(--muted),var(--card))]" role="img" - aria-label="订阅访问 IP 世界地图分布" + aria-label="订阅访问与节点连接 IP 世界地图分布" > {[-120, -60, 0, 60, 120].map((longitude) => { @@ -164,7 +164,7 @@ export function SubscriptionRiskGeoDetails({ summary }: { summary: SubscriptionR
- IP 访问明细 + IP 访问/连接明细 diff --git a/src/app/(admin)/admin/subscription-risk/_components/subscription-risk-table.tsx b/src/app/(admin)/admin/subscription-risk/_components/subscription-risk-table.tsx index 9cc65f7..679e3bd 100644 --- a/src/app/(admin)/admin/subscription-risk/_components/subscription-risk-table.tsx +++ b/src/app/(admin)/admin/subscription-risk/_components/subscription-risk-table.tsx @@ -31,6 +31,14 @@ function reasonLabel(reason: SubscriptionRiskEvent["reason"]) { return "国家异常警告"; case "COUNTRY_VARIANCE_SUSPEND": return "国家异常暂停"; + case "NODE_ACCESS_VOLUME_WARNING": + return "节点高频警告"; + case "NODE_ACCESS_VOLUME_SUSPEND": + return "节点高频暂停"; + case "NODE_ACCESS_TARGET_WARNING": + return "目标分散警告"; + case "NODE_ACCESS_TARGET_SUSPEND": + return "目标分散暂停"; } } @@ -255,7 +263,7 @@ export function SubscriptionRiskTable({ events }: { events: SubscriptionRiskEven return ( ); } diff --git a/src/app/(admin)/admin/subscription-risk/page.tsx b/src/app/(admin)/admin/subscription-risk/page.tsx index 842ca91..dad5ffa 100644 --- a/src/app/(admin)/admin/subscription-risk/page.tsx +++ b/src/app/(admin)/admin/subscription-risk/page.tsx @@ -7,7 +7,7 @@ import { getSubscriptionRiskEvents } from "./risk-data"; export const metadata: Metadata = { title: "订阅风控", - description: "查看订阅访问异常、关联用户和人工处理状态。", + description: "查看订阅访问与节点连接异常、关联用户和人工处理状态。", }; export default async function AdminSubscriptionRiskPage({ @@ -22,7 +22,7 @@ export default async function AdminSubscriptionRiskPage({

订阅访问风控

-

记录订阅拉取 IP、地区变化和人工处理状态。

+

记录订阅拉取与节点真实连接 IP、地区变化和人工处理状态。

diff --git a/src/app/(user)/support/page.tsx b/src/app/(user)/support/page.tsx index fdb888d..752460c 100644 --- a/src/app/(user)/support/page.tsx +++ b/src/app/(user)/support/page.tsx @@ -47,7 +47,7 @@ export default async function SupportPage({ subject: "订阅风控复核申请", category: "订阅风控", priority: "HIGH" as const, - body: "我需要复核订阅风控限制。\n\n请在这里补充:近期访问订阅的设备、所在城市/国家、是否出差或旅行、是否曾分享订阅链接。\n\n系统判定:" + reasonLabel(riskEvent.reason) + "\n" + riskEvent.message, + body: "我需要复核订阅风控限制。\n\n请在这里补充:近期访问订阅的设备、所在城市/国家、是否出差或旅行、是否曾分享订阅链接或通过其他设备转发节点。\n\n系统判定:" + reasonLabel(riskEvent.reason) + "\n" + riskEvent.message, } : undefined; diff --git a/src/app/api/agent/node-access/route.ts b/src/app/api/agent/node-access/route.ts new file mode 100644 index 0000000..d8c85fd --- /dev/null +++ b/src/app/api/agent/node-access/route.ts @@ -0,0 +1,148 @@ +import { isIP } from "node:net"; +import { NextResponse } from "next/server"; +import { z } from "zod"; +import { prisma } from "@/lib/prisma"; +import { authenticateAgent, isAuthError } from "@/lib/agent-auth"; +import { getIpGeoContext } from "@/lib/request-context"; +import { getAppConfig } from "@/services/app-config"; +import { evaluateNodeAccessAbuseRisk, recordSubscriptionAccess } from "@/services/subscription-risk"; + +const MAX_EVENTS = 500; +const MAX_TEXT_LENGTH = 200; + +const nodeAccessEventSchema = z.object({ + clientEmail: z.string().trim().min(1).max(320), + sourceIp: z.string().trim().refine((value) => isIP(value) !== 0, "sourceIp 必须是有效 IP"), + inboundTag: z.string().trim().max(MAX_TEXT_LENGTH).optional().nullable(), + network: z.string().trim().max(16).optional().nullable(), + targetHost: z.string().trim().max(MAX_TEXT_LENGTH).optional().nullable(), + targetPort: z.coerce.number().int().min(0).max(65535).optional().nullable(), + action: z.string().trim().max(32).optional().nullable(), + connectionCount: z.coerce.number().int().min(1).max(100000).optional().default(1), + uniqueTargetCount: z.coerce.number().int().min(0).max(100000).optional().default(0), + firstSeenAt: z.string().trim().max(64).optional().nullable(), + lastSeenAt: z.string().trim().max(64).optional().nullable(), +}); + +const nodeAccessPayloadSchema = z.object({ + events: z.array(nodeAccessEventSchema).min(1).max(MAX_EVENTS), +}); + +function compactText(value: string | null | undefined) { + const text = value?.trim(); + return text ? text.slice(0, MAX_TEXT_LENGTH) : null; +} + +function normalizeAction(action: string | null | undefined) { + const normalized = action?.trim().toLowerCase(); + return normalized === "rejected" ? "rejected" : "accepted"; +} + +function buildReason(event: z.infer, nodeId: string) { + const parts = [ + "来源:节点 Xray access log", + "节点:" + nodeId, + event.inboundTag ? "入站:" + event.inboundTag : null, + event.network ? "网络:" + event.network : null, + event.targetPort ? "目标端口:" + event.targetPort : null, + event.targetHost ? "样本目标:" + event.targetHost : null, + "连接数:" + event.connectionCount, + event.uniqueTargetCount ? "不同目标:" + event.uniqueTargetCount : null, + event.firstSeenAt ? "首次:" + event.firstSeenAt : null, + event.lastSeenAt ? "最近:" + event.lastSeenAt : null, + ].filter(Boolean); + return parts.join(";").slice(0, 1000); +} + +export async function POST(req: Request) { + const auth = await authenticateAgent(req); + if (isAuthError(auth)) return auth; + const { nodeId } = auth; + + let body: unknown; + try { + body = await req.json(); + } catch { + return NextResponse.json({ error: "请求体不是有效 JSON,期望格式:{ events: [...] }" }, { status: 400 }); + } + + const parsed = nodeAccessPayloadSchema.safeParse(body); + if (!parsed.success) { + return NextResponse.json({ error: parsed.error.issues[0]?.message ?? "节点访问日志格式无效" }, { status: 400 }); + } + + const config = await getAppConfig(); + if (!config.subscriptionRiskEnabled || !config.nodeAccessRiskEnabled) { + return NextResponse.json({ ok: true, skipped: parsed.data.events.length, reason: "node_access_risk_disabled" }); + } + + const clientEmails = [...new Set(parsed.data.events.map((event) => event.clientEmail))]; + const clients = await prisma.nodeClient.findMany({ + where: { + email: { in: clientEmails }, + inbound: { serverId: nodeId }, + }, + select: { + email: true, + userId: true, + subscriptionId: true, + }, + }); + const clientByEmail = new Map(clients.map((client) => [client.email, client])); + + let processed = 0; + let skipped = 0; + let warnings = 0; + let suspended = 0; + + for (const event of parsed.data.events) { + const client = clientByEmail.get(event.clientEmail); + if (!client) { + skipped++; + continue; + } + + const action = normalizeAction(event.action); + const allowed = action === "accepted"; + const result = await recordSubscriptionAccess({ + kind: "SINGLE", + userId: client.userId, + subscriptionId: client.subscriptionId, + context: { + ip: event.sourceIp, + userAgent: "jboard-agent/xray-access-log", + geo: getIpGeoContext(event.sourceIp), + }, + allowed, + reason: buildReason({ + ...event, + inboundTag: compactText(event.inboundTag), + network: compactText(event.network), + targetHost: compactText(event.targetHost), + action, + }, nodeId), + evaluateRisk: allowed, + riskConfig: config, + sourceLabel: "节点真实连接", + }); + + const abuseResult = allowed + ? await evaluateNodeAccessAbuseRisk({ + userId: client.userId, + subscriptionId: client.subscriptionId, + ip: event.sourceIp, + connectionCount: event.connectionCount, + uniqueTargetCount: event.uniqueTargetCount, + targetHost: compactText(event.targetHost), + targetPort: event.targetPort ?? null, + config, + }) + : { warned: false, suspended: false }; + + processed++; + if (result.warned || abuseResult.warned) warnings++; + if (result.suspended || abuseResult.suspended) suspended++; + } + + return NextResponse.json({ ok: true, processed, skipped, warnings, suspended }); +} diff --git a/src/app/api/agent/trace/route.ts b/src/app/api/agent/trace/route.ts index 698fc10..0722a74 100644 --- a/src/app/api/agent/trace/route.ts +++ b/src/app/api/agent/trace/route.ts @@ -2,6 +2,7 @@ import { NextResponse } from "next/server"; import { prisma } from "@/lib/prisma"; import { authenticateAgent, isAuthError } from "@/lib/agent-auth"; import { normalizeTraceHops, normalizeTraceText } from "@/lib/trace-normalize"; +import { classifyTraceRoute } from "@/lib/route-classify"; export async function POST(req: Request) { const auth = await authenticateAgent(req); @@ -32,7 +33,8 @@ export async function POST(req: Request) { for (const trace of body.traces) { if (!validCarriers.has(trace.carrier)) continue; const normalizedHops = normalizeTraceHops(trace.hops); - const normalizedSummary = normalizeTraceText(trace.summary) || "路由信息"; + const submittedSummary = normalizeTraceText(trace.summary); + const normalizedSummary = classifyTraceRoute({ summary: submittedSummary, hops: normalizedHops }); const hopCount = Number(trace.hopCount); const normalizedHopCount = Number.isFinite(hopCount) && hopCount > 0 diff --git a/src/app/api/traces/route.ts b/src/app/api/traces/route.ts index 4543593..32be29d 100644 --- a/src/app/api/traces/route.ts +++ b/src/app/api/traces/route.ts @@ -1,5 +1,7 @@ import { NextResponse } from "next/server"; import { prisma } from "@/lib/prisma"; +import { classifyTraceRoute } from "@/lib/route-classify"; +import { normalizeTraceHops } from "@/lib/trace-normalize"; const MAX_NODE_IDS = 100; @@ -24,7 +26,7 @@ export async function GET(req: Request) { } result[row.nodeId].push({ carrier: row.carrier, - summary: row.summary, + summary: classifyTraceRoute({ summary: row.summary, hops: normalizeTraceHops(row.hops) }), hopCount: row.hopCount, hops: row.hops, updatedAt: row.updatedAt.toISOString(), diff --git a/src/lib/request-context.ts b/src/lib/request-context.ts index daa86cd..9477efc 100644 --- a/src/lib/request-context.ts +++ b/src/lib/request-context.ts @@ -79,7 +79,7 @@ function localizedName(record: { names?: object } | null | undefined) { return names["zh-CN"] ?? names.en ?? Object.values(names).find((value) => typeof value === "string") ?? null; } -function getGeoIpLocation(ip: string): RequestGeoContext { +export function getIpGeoContext(ip: string): RequestGeoContext { if (ip === "unknown" || !isIP(ip)) return emptyGeoContext(); const reader = getGeoIpReader(); @@ -244,7 +244,7 @@ export function getRequestGeo(headers: HeaderReader, ip = "unknown"): RequestGeo } const headerGeo = { country, region, regionCode, city, latitude, longitude, source }; - return mergeGeoContext(headerGeo, getGeoIpLocation(ip)); + return mergeGeoContext(headerGeo, getIpGeoContext(ip)); } export function getClientRequestContext(headers: HeaderReader): ClientRequestContext { diff --git a/src/lib/route-classify.ts b/src/lib/route-classify.ts new file mode 100644 index 0000000..f61dc83 --- /dev/null +++ b/src/lib/route-classify.ts @@ -0,0 +1,77 @@ +import { normalizeTraceText, type NormalizedTraceHop } from "@/lib/trace-normalize"; + +function normalizeAsn(value: unknown) { + const text = normalizeTraceText(value).toUpperCase(); + const match = text.match(/(?:^|\b)AS?\s*(\d{2,10})(?:\b|$)/); + if (match) return match[1]; + return /^\d{2,10}$/.test(text) ? text : ""; +} + +function isIpInPrefix(ip: string, prefix: string) { + return ip === prefix.slice(0, -1) || ip.startsWith(prefix); +} + +function countMatches(values: string[], predicate: (value: string) => boolean) { + return values.reduce((count, value) => count + (predicate(value) ? 1 : 0), 0); +} + +export function classifyTraceRoute(input: { + summary?: unknown; + hops: Array; +}) { + const summary = normalizeTraceText(input.summary).toUpperCase(); + const hopTexts = input.hops.map((hop) => [ + normalizeTraceText(hop.ip), + normalizeTraceText(hop.geo), + normalizeTraceText("owner" in hop ? hop.owner : ""), + normalizeTraceText("isp" in hop ? hop.isp : ""), + normalizeTraceText("asn" in hop ? hop.asn : ""), + ].join(" ").toUpperCase()); + const combined = [summary, ...hopTexts].join(" "); + const ips = input.hops.map((hop) => normalizeTraceText(hop.ip)).filter(Boolean); + const asns = new Set(); + + for (const hop of input.hops) { + const directAsn = normalizeAsn("asn" in hop ? hop.asn : ""); + if (directAsn) asns.add(directAsn); + } + for (const match of combined.matchAll(/(?:^|\b)AS\s*(\d{2,10})(?:\b|$)/g)) { + asns.add(match[1]); + } + + const hasAsn = (...values: string[]) => values.some((value) => asns.has(value)); + const hasText = (...values: string[]) => values.some((value) => combined.includes(value)); + const hasIpPrefix = (...prefixes: string[]) => ips.some((ip) => prefixes.some((prefix) => isIpInPrefix(ip, prefix))); + + const cn2Evidence = hasAsn("4809") + || hasIpPrefix("59.43.") + || hasText("CN2", "CTGNET", "CHINANET NEXT CARRYING NETWORK", "CHINA TELECOM GLOBAL"); + const cn2GiaText = hasText("CN2 GIA", "CN2GIA", "GIA", "GLOBAL INTERNET ACCESS"); + const ordinaryTelecomHops = countMatches(hopTexts, (text) => ( + text.includes("AS4134") + || text.includes("CHINANET BACKBONE") + || text.includes("CHINANET 163") + || text.includes("163骨干") + )) + countMatches(ips, (ip) => isIpInPrefix(ip, "202.97.")); + + if (cn2Evidence) { + if (cn2GiaText || ordinaryTelecomHops <= 1) return "CN2 GIA"; + return "CN2 GT"; + } + + if (hasAsn("9929", "10099") || hasText("CUII", "A网", "AS9929")) { + return "AS9929"; + } + if (hasText("CMIN2") || hasAsn("58807", "58809", "58813", "58819", "59807")) { + return "CMIN2"; + } + if (hasText("CMI") || hasAsn("58453")) { + return "CMI"; + } + if (hasAsn("4837") || hasText("AS4837")) { + return "AS4837"; + } + + const normalizedSummary = normalizeTraceText(input.summary); + return normalizedSummary && normalizedSummary !== "普通线路" ? normalizedSummary : "普通线路"; +} diff --git a/src/lib/trace-normalize.ts b/src/lib/trace-normalize.ts index 9d6835f..6c4c4e7 100644 --- a/src/lib/trace-normalize.ts +++ b/src/lib/trace-normalize.ts @@ -77,6 +77,9 @@ export interface NormalizedTraceHop { ip: string; geo: string; latency: number; + asn?: string; + owner?: string; + isp?: string; } export function normalizeTraceHops(hops: unknown): NormalizedTraceHop[] { @@ -94,6 +97,9 @@ export function normalizeTraceHops(hops: unknown): NormalizedTraceHop[] { ip: normalizeTraceText(hopObject.ip), geo: normalizeTraceText(hopObject.geo), latency: Math.max(0, toSafeNumber(hopObject.latency, 0)), + asn: normalizeTraceText(hopObject.asn) || undefined, + owner: normalizeTraceText(hopObject.owner) || undefined, + isp: normalizeTraceText(hopObject.isp) || undefined, }; }) .filter((hop) => hop.ip || hop.geo || hop.latency > 0); diff --git a/src/services/subscription-risk-review.ts b/src/services/subscription-risk-review.ts index 7501232..5e74018 100644 --- a/src/services/subscription-risk-review.ts +++ b/src/services/subscription-risk-review.ts @@ -142,6 +142,14 @@ export function reasonLabel(reason: SubscriptionRiskReason) { return "国家异常警告"; case "COUNTRY_VARIANCE_SUSPEND": return "国家异常暂停"; + case "NODE_ACCESS_VOLUME_WARNING": + return "节点高频警告"; + case "NODE_ACCESS_VOLUME_SUSPEND": + return "节点高频暂停"; + case "NODE_ACCESS_TARGET_WARNING": + return "目标分散警告"; + case "NODE_ACCESS_TARGET_SUSPEND": + return "目标分散暂停"; } } @@ -334,8 +342,8 @@ export function buildSubscriptionRiskReport(input: { const userLabel = user ? `${user.email}${user.name ? `(${user.name})` : ""}` : event.userId ?? "未知用户"; const windowRange = `${formatDate(event.windowStartedAt)} 至 ${formatDate(event.createdAt)}`; const actionSuggestion = event.level === "SUSPENDED" - ? "建议保持暂停,等待用户确认是否本人跨地区使用、订阅链接是否外泄,并在工单中补充说明后再解除限制。" - : "建议先联系用户确认近期访问来源;如果用户无法解释这些地区/IP,建议重置订阅链接并临时暂停相关订阅。"; + ? "建议保持暂停,等待用户确认是否本人跨地区使用、订阅链接是否外泄或节点连接是否被共享,并在工单中补充说明后再解除限制。" + : "建议先联系用户确认近期访问/连接来源;如果用户无法解释这些地区/IP,建议重置订阅链接并临时暂停相关订阅。"; return [ "订阅风控风险报告", diff --git a/src/services/subscription-risk.ts b/src/services/subscription-risk.ts index baac9fc..a565fd7 100644 --- a/src/services/subscription-risk.ts +++ b/src/services/subscription-risk.ts @@ -24,6 +24,11 @@ type SubscriptionRiskConfig = Pick< | "subscriptionRiskRegionSuspend" | "subscriptionRiskCountryWarning" | "subscriptionRiskCountrySuspend" + | "nodeAccessRiskEnabled" + | "nodeAccessConnectionWarning" + | "nodeAccessConnectionSuspend" + | "nodeAccessUniqueTargetWarning" + | "nodeAccessUniqueTargetSuspend" >; interface RecordSubscriptionAccessInput { @@ -35,6 +40,7 @@ interface RecordSubscriptionAccessInput { reason?: string | null; evaluateRisk?: boolean; riskConfig?: SubscriptionRiskConfig; + sourceLabel?: string | null; } interface RiskDecision { @@ -99,8 +105,9 @@ function riskMessage(options: { countryLabels: string[]; regionLabels: string[]; cityLabels: string[]; + sourceLabel?: string | null; }) { - const scope = getScopeLabel(options.kind); + const scope = options.sourceLabel?.trim() || getScopeLabel(options.kind); const locationSummary = options.decision.reason.startsWith("COUNTRY") ? `${options.countryCount} 个国家/地区:${formatKeyPreview(options.countryLabels)}` : options.decision.reason.startsWith("REGION") @@ -358,6 +365,7 @@ async function evaluateSubscriptionRisk(input: { userId?: string | null; subscriptionId?: string | null; ip: string; + sourceLabel?: string | null; db: DbClient; config?: SubscriptionRiskConfig; }): Promise { @@ -433,6 +441,7 @@ async function evaluateSubscriptionRisk(input: { countryLabels, regionLabels, cityLabels, + sourceLabel: input.sourceLabel, }); const { event, created } = await createRiskEvent({ @@ -497,6 +506,142 @@ async function evaluateSubscriptionRisk(input: { return { warned: true, suspended: false, eventId: event.id }; } +function decideNodeAccessAbuseRisk(input: { + connectionCount: number; + uniqueTargetCount: number; + config: SubscriptionRiskConfig; +}): RiskDecision | null { + if (!input.config.nodeAccessRiskEnabled) return null; + + if (input.config.subscriptionRiskAutoSuspend && input.uniqueTargetCount >= input.config.nodeAccessUniqueTargetSuspend) { + return { level: "SUSPENDED", reason: "NODE_ACCESS_TARGET_SUSPEND" }; + } + if (input.config.subscriptionRiskAutoSuspend && input.connectionCount >= input.config.nodeAccessConnectionSuspend) { + return { level: "SUSPENDED", reason: "NODE_ACCESS_VOLUME_SUSPEND" }; + } + if (input.uniqueTargetCount >= input.config.nodeAccessUniqueTargetWarning) { + return { level: "WARNING", reason: "NODE_ACCESS_TARGET_WARNING" }; + } + if (input.connectionCount >= input.config.nodeAccessConnectionWarning) { + return { level: "WARNING", reason: "NODE_ACCESS_VOLUME_WARNING" }; + } + + return null; +} + +function nodeAccessAbuseMessage(input: { + decision: RiskDecision; + ip: string; + connectionCount: number; + uniqueTargetCount: number; + targetHost?: string | null; + targetPort?: number | null; +}) { + const metric = input.decision.reason.includes("TARGET") + ? "不同目标 " + input.uniqueTargetCount + " 个" + : "连接 " + input.connectionCount + " 次"; + const targetValue = [input.targetHost, input.targetPort].filter(Boolean).join(":"); + const target = targetValue ? ",样本目标 " + targetValue : ""; + const action = input.decision.level === "SUSPENDED" ? "已自动暂停" : "已记录警告"; + return "节点真实连接行为异常,单个聚合窗口内出现 " + metric + ",来源 IP " + input.ip + target + "," + action + "。"; +} + +export async function evaluateNodeAccessAbuseRisk(input: { + userId: string; + subscriptionId: string; + ip: string; + connectionCount: number; + uniqueTargetCount: number; + targetHost?: string | null; + targetPort?: number | null; + config?: SubscriptionRiskConfig; + db?: DbClient; +}): Promise { + const db = input.db ?? prisma; + const config = input.config ?? await getAppConfig(db); + if (!config.subscriptionRiskEnabled || !config.nodeAccessRiskEnabled) { + return { warned: false, suspended: false }; + } + + const decision = decideNodeAccessAbuseRisk({ + connectionCount: input.connectionCount, + uniqueTargetCount: input.uniqueTargetCount, + config, + }); + if (!decision) return { warned: false, suspended: false }; + + const windowStartedAt = new Date(Date.now() - config.subscriptionRiskWindowHours * 60 * 60 * 1000); + const message = nodeAccessAbuseMessage({ + decision, + ip: input.ip, + connectionCount: input.connectionCount, + uniqueTargetCount: input.uniqueTargetCount, + targetHost: input.targetHost, + targetPort: input.targetPort, + }); + + const { event, created } = await createRiskEvent({ + kind: "SINGLE", + userId: input.userId, + subscriptionId: input.subscriptionId, + ip: input.ip, + decision, + message, + windowStartedAt, + countryLabels: [], + regionLabels: [], + cityLabels: [], + db, + }); + + if (created) { + const targetLabel = await getTargetLabel({ userId: input.userId, subscriptionId: input.subscriptionId }, db); + await recordAuditLog({ + action: decision.level === "SUSPENDED" ? "risk.node_access.suspend" : "risk.node_access.warning", + targetType: "UserSubscription", + targetId: input.subscriptionId, + targetLabel, + message, + metadata: { + eventId: event.id, + reason: decision.reason, + ip: input.ip, + connectionCount: input.connectionCount, + uniqueTargetCount: input.uniqueTargetCount, + targetHost: input.targetHost ?? null, + targetPort: input.targetPort ?? null, + windowStartedAt: windowStartedAt.toISOString(), + }, + }, db); + + if (decision.level === "WARNING") { + await createNotification({ + userId: input.userId, + type: "SUBSCRIPTION", + level: "WARNING", + title: "节点连接行为异常", + body: "检测到你的订阅在节点侧出现异常高频连接或目标分散。如果不是你本人操作,请重置订阅访问并联系管理员。", + link: "/subscriptions/" + input.subscriptionId, + dedupeKey: "risk:node-access:warning:" + event.id, + }, db); + } + } + + if (decision.level === "SUSPENDED") { + const suspendedIds = await suspendScopeForRisk({ + kind: "SINGLE", + userId: input.userId, + subscriptionId: input.subscriptionId, + message, + }); + revalidateRiskViews(suspendedIds); + return { warned: false, suspended: true, eventId: event.id }; + } + + if (created) revalidateRiskViews([input.subscriptionId]); + return { warned: true, suspended: false, eventId: event.id }; +} + export async function recordSubscriptionAccess( input: RecordSubscriptionAccessInput, db: DbClient = prisma, @@ -529,6 +674,7 @@ export async function recordSubscriptionAccess( userId: input.userId, subscriptionId: input.subscriptionId, ip: input.context.ip, + sourceLabel: input.sourceLabel, db, config: input.riskConfig, });