Files
m 20d361a886
build / backend (push) Has been cancelled
build / node-agent (push) Has been cancelled
build / worker (push) Has been cancelled
рабочий вариант, но скороть 10 МБит
2026-05-22 21:46:49 +03:00

209 lines
6.1 KiB
Go

package vpnruntime
import (
"fmt"
"sort"
"sync"
"time"
)
const (
FabricServiceStreamRegistrySchemaVersion = "rap.fabric_service_stream_registry.v1"
FabricServiceStreamStateOpen = "open"
FabricServiceStreamStateClosed = "closed"
FabricServiceStreamStateReset = "reset"
)
type FabricServiceStream struct {
TunnelID string `json:"tunnel_id"`
ServiceID string `json:"service_id"`
StreamID uint64 `json:"stream_id"`
TrafficClass string `json:"traffic_class"`
Direction string `json:"direction,omitempty"`
State string `json:"state"`
ServiceTunnel FabricServiceTunnel `json:"service_tunnel"`
OpenedAt time.Time `json:"opened_at"`
UpdatedAt time.Time `json:"updated_at"`
Metadata map[string]string `json:"metadata,omitempty"`
}
type FabricServiceStreamRegistry struct {
mu sync.RWMutex
streams map[string]FabricServiceStream
}
func NewFabricServiceStreamRegistry() *FabricServiceStreamRegistry {
return &FabricServiceStreamRegistry{streams: map[string]FabricServiceStream{}}
}
func (r *FabricServiceStreamRegistry) Register(stream FabricServiceStream) FabricServiceStream {
if r == nil {
return FabricServiceStream{}
}
now := time.Now().UTC()
stream.ServiceTunnel = NormalizeServiceTunnel(stream.ServiceTunnel, stream.TunnelID)
stream.TunnelID = firstNonEmptyTunnelString(stream.TunnelID, stream.ServiceTunnel.TunnelID)
stream.ServiceID = firstNonEmptyTunnelString(stream.ServiceID, stream.ServiceTunnel.ServiceID)
stream.TrafficClass = normalizeFabricTrafficClass(stream.TrafficClass)
if stream.State == "" {
stream.State = FabricServiceStreamStateOpen
}
if stream.OpenedAt.IsZero() {
stream.OpenedAt = now
}
stream.UpdatedAt = now
r.mu.Lock()
defer r.mu.Unlock()
if r.streams == nil {
r.streams = map[string]FabricServiceStream{}
}
if existing, ok := r.streams[serviceStreamKey(stream.TunnelID, stream.StreamID)]; ok {
if !existing.OpenedAt.IsZero() {
stream.OpenedAt = existing.OpenedAt
}
}
r.streams[serviceStreamKey(stream.TunnelID, stream.StreamID)] = stream
return stream
}
func (r *FabricServiceStreamRegistry) MarkClosed(tunnelID string, streamID uint64) {
r.markState(tunnelID, streamID, FabricServiceStreamStateClosed)
}
func (r *FabricServiceStreamRegistry) MarkReset(tunnelID string, streamID uint64) {
r.markState(tunnelID, streamID, FabricServiceStreamStateReset)
}
func (r *FabricServiceStreamRegistry) StreamsForTunnel(tunnelID string) []FabricServiceStream {
if r == nil || tunnelID == "" {
return nil
}
r.mu.RLock()
defer r.mu.RUnlock()
out := make([]FabricServiceStream, 0)
for _, stream := range r.streams {
if stream.TunnelID == tunnelID {
out = append(out, cloneFabricServiceStream(stream))
}
}
sort.Slice(out, func(i, j int) bool { return out[i].StreamID < out[j].StreamID })
return out
}
func (r *FabricServiceStreamRegistry) Snapshot() map[string]any {
if r == nil {
return map[string]any{"schema_version": FabricServiceStreamRegistrySchemaVersion, "stream_count": 0}
}
r.mu.RLock()
defer r.mu.RUnlock()
items := make([]map[string]any, 0, len(r.streams))
openCount := 0
for _, stream := range r.streams {
if stream.State == FabricServiceStreamStateOpen {
openCount++
}
item := map[string]any{
"tunnel_id": stream.TunnelID,
"service_id": stream.ServiceID,
"stream_id": stream.StreamID,
"traffic_class": stream.TrafficClass,
"direction": stream.Direction,
"state": stream.State,
"service_tunnel": stream.ServiceTunnel.Snapshot(),
}
if !stream.OpenedAt.IsZero() {
item["opened_at"] = stream.OpenedAt.Format(time.RFC3339Nano)
}
if !stream.UpdatedAt.IsZero() {
item["updated_at"] = stream.UpdatedAt.Format(time.RFC3339Nano)
}
if len(stream.Metadata) > 0 {
item["metadata"] = cloneStringMap(stream.Metadata)
}
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
left, _ := items[i]["stream_id"].(uint64)
right, _ := items[j]["stream_id"].(uint64)
return left < right
})
return map[string]any{
"schema_version": FabricServiceStreamRegistrySchemaVersion,
"stream_count": len(items),
"open_count": openCount,
"streams": items,
}
}
func (r *FabricServiceStreamRegistry) markState(tunnelID string, streamID uint64, state string) {
if r == nil || tunnelID == "" || streamID == 0 {
return
}
r.mu.Lock()
defer r.mu.Unlock()
key := serviceStreamKey(tunnelID, streamID)
stream, ok := r.streams[key]
if !ok {
return
}
stream.State = state
stream.UpdatedAt = time.Now().UTC()
r.streams[key] = stream
}
func serviceStreamKey(tunnelID string, streamID uint64) string {
return fmt.Sprintf("%s\x00%d", tunnelID, streamID)
}
func cloneFabricServiceStream(stream FabricServiceStream) FabricServiceStream {
stream.Metadata = cloneStringMap(stream.Metadata)
return stream
}
func serviceStreamsSnapshotItems(streams []FabricServiceStream) []map[string]any {
if len(streams) == 0 {
return nil
}
items := make([]map[string]any, 0, len(streams))
for _, stream := range streams {
item := map[string]any{
"tunnel_id": stream.TunnelID,
"service_id": stream.ServiceID,
"stream_id": stream.StreamID,
"traffic_class": stream.TrafficClass,
"direction": stream.Direction,
"state": stream.State,
"service_tunnel": stream.ServiceTunnel.Snapshot(),
}
if !stream.OpenedAt.IsZero() {
item["opened_at"] = stream.OpenedAt.Format(time.RFC3339Nano)
}
if !stream.UpdatedAt.IsZero() {
item["updated_at"] = stream.UpdatedAt.Format(time.RFC3339Nano)
}
if len(stream.Metadata) > 0 {
item["metadata"] = cloneStringMap(stream.Metadata)
}
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
left, _ := items[i]["stream_id"].(uint64)
right, _ := items[j]["stream_id"].(uint64)
return left < right
})
return items
}
func cloneStringMap(values map[string]string) map[string]string {
if len(values) == 0 {
return nil
}
out := make(map[string]string, len(values))
for key, value := range values {
out[key] = value
}
return out
}