Files
m 20d361a886
build / backend (push) Has been cancelled
build / node-agent (push) Has been cancelled
build / worker (push) Has been cancelled
рабочий вариант, но скороть 10 МБит
2026-05-22 21:46:49 +03:00

117 lines
3.5 KiB
Go

package mesh
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync/atomic"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
)
var fabricControlForwardSequence atomic.Uint64
type FabricControlForwardResult struct {
Payload json.RawMessage `json:"payload,omitempty"`
LatencyMs int64 `json:"latency_ms"`
Endpoint string `json:"endpoint,omitempty"`
}
func FabricTransportTargetFromRegistryEndpoint(endpoint FabricRegistryEndpoint) FabricTransportTarget {
return FabricTransportTarget{
EndpointID: strings.TrimSpace(endpoint.EndpointID),
PeerID: strings.TrimSpace(endpoint.EndpointID),
Endpoint: fabricControlEndpointAddress(endpoint),
Transport: strings.TrimSpace(endpoint.Transport),
PeerCertSHA256: strings.TrimSpace(endpoint.PeerCertSHA256),
Timeout: 5 * time.Second,
InboundBuffer: 4,
ErrorBuffer: 4,
}
}
func fabricControlEndpointAddress(endpoint FabricRegistryEndpoint) string {
if mapped := fabricControlMetadataString(endpoint.Metadata, "maps_to"); mapped != "" {
if strings.Contains(mapped, "://") {
return mapped
}
return "quic://" + mapped
}
return strings.TrimSpace(endpoint.Address)
}
func fabricControlMetadataString(raw json.RawMessage, key string) string {
if len(raw) == 0 {
return ""
}
var metadata map[string]any
if err := json.Unmarshal(raw, &metadata); err != nil {
return ""
}
value, _ := metadata[key].(string)
return strings.TrimSpace(value)
}
func SendFabricControlForward(ctx context.Context, transport FabricTransport, endpoint FabricRegistryEndpoint, payload []byte, timeout time.Duration) (FabricControlForwardResult, error) {
if transport == nil {
return FabricControlForwardResult{}, fmt.Errorf("fabric control transport is unavailable")
}
if len(payload) == 0 {
return FabricControlForwardResult{}, fmt.Errorf("fabric control payload is empty")
}
if timeout <= 0 {
timeout = 5 * time.Second
}
target := FabricTransportTargetFromRegistryEndpoint(endpoint)
target.Timeout = timeout
session, err := transport.Connect(ctx, target)
if err != nil {
return FabricControlForwardResult{}, err
}
defer session.Close()
sequence := fabricControlForwardSequence.Add(1)
if err := session.Send(ctx, fabricproto.Frame{
Type: fabricproto.FrameData,
TrafficClass: fabricproto.TrafficClassReliable,
StreamID: FabricControlForwardQUICStreamID,
Sequence: sequence,
Payload: append([]byte(nil), payload...),
}); err != nil {
return FabricControlForwardResult{}, err
}
waitCtx := ctx
var cancel context.CancelFunc
if timeout > 0 {
waitCtx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
startedAt := time.Now()
for {
select {
case <-waitCtx.Done():
return FabricControlForwardResult{}, waitCtx.Err()
case err, ok := <-session.Errors():
if !ok {
return FabricControlForwardResult{}, fmt.Errorf("fabric control session closed")
}
if err != nil {
return FabricControlForwardResult{}, err
}
case frame, ok := <-session.Frames():
if !ok {
return FabricControlForwardResult{}, fmt.Errorf("fabric control session closed")
}
if frame.Type != fabricproto.FrameData || frame.StreamID != FabricControlForwardQUICStreamID || frame.Sequence != sequence {
continue
}
return FabricControlForwardResult{
Payload: append(json.RawMessage(nil), frame.Payload...),
LatencyMs: time.Since(startedAt).Milliseconds(),
Endpoint: endpoint.Address,
}, nil
}
}
}