From 2cedc2e2f386d90192700258372b50395d166370 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Fri, 15 May 2026 23:55:21 +0300 Subject: [PATCH] Define distributed fabric node protocol plan --- .../internal/fabricproto/frame.go | 203 +++++++++++ .../internal/fabricproto/frame_test.go | 138 ++++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 323 ++++++++++++++++++ 3 files changed, 664 insertions(+) create mode 100644 agents/rap-node-agent/internal/fabricproto/frame.go create mode 100644 agents/rap-node-agent/internal/fabricproto/frame_test.go create mode 100644 docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md diff --git a/agents/rap-node-agent/internal/fabricproto/frame.go b/agents/rap-node-agent/internal/fabricproto/frame.go new file mode 100644 index 0000000..76e234c --- /dev/null +++ b/agents/rap-node-agent/internal/fabricproto/frame.go @@ -0,0 +1,203 @@ +package fabricproto + +import ( + "encoding/binary" + "errors" + "fmt" + "io" +) + +const ( + Magic uint32 = 0x52415046 // RAPF + Version uint8 = 1 + + HeaderSize = 32 + DefaultMaxPayload = 1024 * 1024 +) + +type FrameType uint8 + +const ( + FrameHello FrameType = iota + 1 + FrameAuth + FrameSessionReady + FrameOpenStream + FrameData + FrameAck + FramePing + FramePong + FrameRouteUpdate + FrameStreamCredit + FrameNodePressure + FrameCloseStream + FrameResetStream + FrameGoAway +) + +type TrafficClass uint8 + +const ( + TrafficClassControl TrafficClass = iota + 1 + TrafficClassDNS + TrafficClassInteractive + TrafficClassReliable + TrafficClassBulk + TrafficClassDroppable +) + +type Frame struct { + Type FrameType + Flags uint16 + TrafficClass TrafficClass + StreamID uint64 + Sequence uint64 + Payload []byte +} + +var ( + ErrInvalidMagic = errors.New("invalid fabric frame magic") + ErrUnsupportedVer = errors.New("unsupported fabric frame version") + ErrUnknownFrameType = errors.New("unknown fabric frame type") + ErrInvalidStreamID = errors.New("invalid fabric frame stream id") + ErrInvalidPayloadLen = errors.New("invalid fabric frame payload length") + ErrUnknownTraffic = errors.New("unknown fabric traffic class") +) + +func MarshalFrame(frame Frame) ([]byte, error) { + if err := ValidateFrame(frame, DefaultMaxPayload); err != nil { + return nil, err + } + out := make([]byte, HeaderSize+len(frame.Payload)) + writeHeader(out[:HeaderSize], frame, uint32(len(frame.Payload))) + copy(out[HeaderSize:], frame.Payload) + return out, nil +} + +func WriteFrame(w io.Writer, frame Frame) error { + if err := ValidateFrame(frame, DefaultMaxPayload); err != nil { + return err + } + header := make([]byte, HeaderSize) + writeHeader(header, frame, uint32(len(frame.Payload))) + if _, err := w.Write(header); err != nil { + return err + } + if len(frame.Payload) == 0 { + return nil + } + _, err := w.Write(frame.Payload) + return err +} + +func ReadFrame(r io.Reader, maxPayload int) (Frame, error) { + if maxPayload <= 0 { + maxPayload = DefaultMaxPayload + } + header := make([]byte, HeaderSize) + if _, err := io.ReadFull(r, header); err != nil { + return Frame{}, err + } + frame, payloadLength, err := parseHeader(header, maxPayload) + if err != nil { + return Frame{}, err + } + if payloadLength == 0 { + return frame, nil + } + frame.Payload = make([]byte, payloadLength) + if _, err := io.ReadFull(r, frame.Payload); err != nil { + return Frame{}, err + } + return frame, nil +} + +func UnmarshalFrame(data []byte, maxPayload int) (Frame, error) { + if len(data) < HeaderSize { + return Frame{}, io.ErrUnexpectedEOF + } + if maxPayload <= 0 { + maxPayload = DefaultMaxPayload + } + frame, payloadLength, err := parseHeader(data[:HeaderSize], maxPayload) + if err != nil { + return Frame{}, err + } + if len(data)-HeaderSize != payloadLength { + return Frame{}, fmt.Errorf("%w: header=%d actual=%d", ErrInvalidPayloadLen, payloadLength, len(data)-HeaderSize) + } + if payloadLength > 0 { + frame.Payload = append([]byte(nil), data[HeaderSize:]...) + } + return frame, nil +} + +func ValidateFrame(frame Frame, maxPayload int) error { + if maxPayload <= 0 { + maxPayload = DefaultMaxPayload + } + if !KnownFrameType(frame.Type) { + return ErrUnknownFrameType + } + if len(frame.Payload) > maxPayload { + return fmt.Errorf("%w: %d > %d", ErrInvalidPayloadLen, len(frame.Payload), maxPayload) + } + if requiresStream(frame.Type) && frame.StreamID == 0 { + return ErrInvalidStreamID + } + if frame.TrafficClass != 0 && !KnownTrafficClass(frame.TrafficClass) { + return ErrUnknownTraffic + } + return nil +} + +func KnownFrameType(frameType FrameType) bool { + return frameType >= FrameHello && frameType <= FrameGoAway +} + +func KnownTrafficClass(trafficClass TrafficClass) bool { + return trafficClass >= TrafficClassControl && trafficClass <= TrafficClassDroppable +} + +func requiresStream(frameType FrameType) bool { + switch frameType { + case FrameOpenStream, FrameData, FrameAck, FrameStreamCredit, FrameCloseStream, FrameResetStream: + return true + default: + return false + } +} + +func writeHeader(header []byte, frame Frame, payloadLength uint32) { + binary.BigEndian.PutUint32(header[0:4], Magic) + header[4] = Version + header[5] = byte(frame.Type) + binary.BigEndian.PutUint16(header[6:8], frame.Flags) + header[8] = byte(frame.TrafficClass) + binary.BigEndian.PutUint64(header[12:20], frame.StreamID) + binary.BigEndian.PutUint64(header[20:28], frame.Sequence) + binary.BigEndian.PutUint32(header[28:32], payloadLength) +} + +func parseHeader(header []byte, maxPayload int) (Frame, int, error) { + if binary.BigEndian.Uint32(header[0:4]) != Magic { + return Frame{}, 0, ErrInvalidMagic + } + if header[4] != Version { + return Frame{}, 0, ErrUnsupportedVer + } + frame := Frame{ + Type: FrameType(header[5]), + Flags: binary.BigEndian.Uint16(header[6:8]), + TrafficClass: TrafficClass(header[8]), + StreamID: binary.BigEndian.Uint64(header[12:20]), + Sequence: binary.BigEndian.Uint64(header[20:28]), + } + payloadLength := int(binary.BigEndian.Uint32(header[28:32])) + if payloadLength > maxPayload { + return Frame{}, 0, fmt.Errorf("%w: %d > %d", ErrInvalidPayloadLen, payloadLength, maxPayload) + } + if err := ValidateFrame(frame, maxPayload); err != nil { + return Frame{}, 0, err + } + return frame, payloadLength, nil +} diff --git a/agents/rap-node-agent/internal/fabricproto/frame_test.go b/agents/rap-node-agent/internal/fabricproto/frame_test.go new file mode 100644 index 0000000..2270d3c --- /dev/null +++ b/agents/rap-node-agent/internal/fabricproto/frame_test.go @@ -0,0 +1,138 @@ +package fabricproto + +import ( + "bytes" + "errors" + "io" + "testing" +) + +func TestMarshalUnmarshalFrameRoundTrip(t *testing.T) { + frame := Frame{ + Type: FrameData, + Flags: 3, + TrafficClass: TrafficClassInteractive, + StreamID: 42, + Sequence: 7, + Payload: []byte("hello"), + } + + encoded, err := MarshalFrame(frame) + if err != nil { + t.Fatalf("marshal frame: %v", err) + } + + decoded, err := UnmarshalFrame(encoded, DefaultMaxPayload) + if err != nil { + t.Fatalf("unmarshal frame: %v", err) + } + if decoded.Type != frame.Type || decoded.Flags != frame.Flags || decoded.TrafficClass != frame.TrafficClass || + decoded.StreamID != frame.StreamID || decoded.Sequence != frame.Sequence || !bytes.Equal(decoded.Payload, frame.Payload) { + t.Fatalf("decoded frame = %+v, want %+v", decoded, frame) + } +} + +func TestReadWriteFrameRoundTrip(t *testing.T) { + var buf bytes.Buffer + frame := Frame{ + Type: FrameOpenStream, + TrafficClass: TrafficClassReliable, + StreamID: 9, + Payload: []byte(`{"service":"vpn"}`), + } + + if err := WriteFrame(&buf, frame); err != nil { + t.Fatalf("write frame: %v", err) + } + decoded, err := ReadFrame(&buf, DefaultMaxPayload) + if err != nil { + t.Fatalf("read frame: %v", err) + } + if decoded.Type != frame.Type || decoded.StreamID != frame.StreamID || !bytes.Equal(decoded.Payload, frame.Payload) { + t.Fatalf("decoded frame = %+v, want %+v", decoded, frame) + } +} + +func TestRejectsInvalidMagic(t *testing.T) { + encoded, err := MarshalFrame(Frame{Type: FrameHello}) + if err != nil { + t.Fatalf("marshal frame: %v", err) + } + encoded[0] = 0 + + _, err = UnmarshalFrame(encoded, DefaultMaxPayload) + if !errors.Is(err, ErrInvalidMagic) { + t.Fatalf("error = %v, want %v", err, ErrInvalidMagic) + } +} + +func TestRejectsUnsupportedVersion(t *testing.T) { + encoded, err := MarshalFrame(Frame{Type: FrameHello}) + if err != nil { + t.Fatalf("marshal frame: %v", err) + } + encoded[4] = Version + 1 + + _, err = UnmarshalFrame(encoded, DefaultMaxPayload) + if !errors.Is(err, ErrUnsupportedVer) { + t.Fatalf("error = %v, want %v", err, ErrUnsupportedVer) + } +} + +func TestRejectsUnknownFrameType(t *testing.T) { + if err := ValidateFrame(Frame{Type: FrameType(255)}, DefaultMaxPayload); !errors.Is(err, ErrUnknownFrameType) { + t.Fatalf("error = %v, want %v", err, ErrUnknownFrameType) + } +} + +func TestRejectsStreamFrameWithoutStreamID(t *testing.T) { + if err := ValidateFrame(Frame{Type: FrameData}, DefaultMaxPayload); !errors.Is(err, ErrInvalidStreamID) { + t.Fatalf("error = %v, want %v", err, ErrInvalidStreamID) + } +} + +func TestRejectsOversizedPayload(t *testing.T) { + frame := Frame{ + Type: FrameData, + StreamID: 1, + Payload: bytes.Repeat([]byte("x"), 5), + } + if err := ValidateFrame(frame, 4); !errors.Is(err, ErrInvalidPayloadLen) { + t.Fatalf("error = %v, want %v", err, ErrInvalidPayloadLen) + } +} + +func TestRejectsUnknownTrafficClass(t *testing.T) { + frame := Frame{ + Type: FrameOpenStream, + TrafficClass: TrafficClass(99), + StreamID: 1, + } + if err := ValidateFrame(frame, DefaultMaxPayload); !errors.Is(err, ErrUnknownTraffic) { + t.Fatalf("error = %v, want %v", err, ErrUnknownTraffic) + } +} + +func TestUnmarshalRequiresExactPayloadLength(t *testing.T) { + encoded, err := MarshalFrame(Frame{Type: FrameData, StreamID: 1, Payload: []byte("hello")}) + if err != nil { + t.Fatalf("marshal frame: %v", err) + } + encoded = encoded[:len(encoded)-1] + + _, err = UnmarshalFrame(encoded, DefaultMaxPayload) + if !errors.Is(err, ErrInvalidPayloadLen) { + t.Fatalf("error = %v, want %v", err, ErrInvalidPayloadLen) + } +} + +func TestReadFrameReturnsUnexpectedEOFOnShortPayload(t *testing.T) { + encoded, err := MarshalFrame(Frame{Type: FrameData, StreamID: 1, Payload: []byte("hello")}) + if err != nil { + t.Fatalf("marshal frame: %v", err) + } + _, err = ReadFrame(bytes.NewReader(encoded[:len(encoded)-1]), DefaultMaxPayload) + if !errors.Is(err, io.ErrUnexpectedEOF) { + t.Fatalf("error = %v, want %v", err, io.ErrUnexpectedEOF) + } +} diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md new file mode 100644 index 0000000..31fe29e --- /dev/null +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -0,0 +1,323 @@ +# Distributed Fabric Node Protocol Plan + +This document fixes the target direction for the Secure Access Fabric after the +VPN performance investigation. The platform must not be treated as a VPN +server, RDP gateway, or web console. It is a distributed overlay transport where +every participating device is a fabric node, and VPN/RDP/HTTP/admin/storage are +services running over that fabric. + +## Core Position + +Every device is a node. + +A phone, home server, cloud server, relay, admin-console host, storage host, and +update-cache host share the same base identity model. They differ by roles, +capabilities, policy, trust level, and current health. + +```text +Node = identity + roles + capabilities + policy + health + local state +``` + +The Android VPN app is therefore not only a client. It is a mobile fabric node. +It may carry VPN traffic, participate in route discovery, relay traffic when +policy allows, host limited control/storage roles when approved, and report +mobile-specific capacity signals such as battery, network type, NAT behavior, +foreground/background state, and metered network policy. + +## What Was Missing + +The current implementation proves route leases and production VPN forwarding, +but it still has a data-plane shape that cannot scale to high throughput: + +- too much payload traffic is carried as small request/response HTTP forwarding + calls; +- JSON/base64 payload envelopes add overhead and CPU cost; +- one overloaded stream can delay unrelated traffic; +- route health is visible, but the transport does not yet provide enough + low-latency per-stream feedback; +- the phone behaves mostly as a service client, not as a full fabric node; +- service discovery and route execution are not yet separated cleanly enough; +- fallback paths can keep traffic alive, but can also hide architecture + bottlenecks if used as the primary data plane. + +For 100 Mbps per active device and future 1000+ or millions of devices, the +fabric must move to a persistent, binary, multiplexed data plane with explicit +route and stream semantics. + +## Non-Negotiable Principles + +1. Fabric is the lower transport layer. VPN, RDP, HTTP, admin console, storage, + and update delivery are services above it. +2. Service adapters must not discover topology, own route selection, or invent + failover logic. They request transport from the fabric. +3. Control plane and data plane are separate. API/console traffic must not be + the packet transport mechanism. +4. Every data session carries many independent streams. A blocked bulk download + must not stall RDP, DNS, control, or telemetry. +5. Routes are leased and replaceable. Route selection uses quality, policy, + locality, role eligibility, cost, trust, and current load. +6. The fabric is distributed. Central control can coordinate, but the runtime + must keep working through cached policy, peer directories, route leases, and + local health when central components are degraded. +7. Mobile nodes are first-class nodes with stricter capability scoring. +8. HTTP forwarding remains a compatibility and emergency fallback, not the + primary high-speed data plane. + +## Node Roles + +Initial role vocabulary: + +- `mobile-edge`: mobile Android/iOS fabric node. +- `entry`: accepts external sessions. +- `relay`: forwards fabric traffic between nodes. +- `exit`: terminates routes into a target network or service zone. +- `service-host`: runs service adapters such as admin console, VPN exit, RDP, + HTTP ingress, storage, or update-cache. +- `control-plane`: participates in control authority, policy decisions, route + authority, or quorum work. +- `route-coordinator`: calculates or assists route candidates for a partition, + region, or service class. +- `storage`: stores approved replicated fabric state. +- `observer`: collects telemetry and health without carrying user traffic. +- `update-cache`: mirrors signed artifacts close to nodes. + +Roles are policy decisions, not binary builds. A phone can theoretically receive +any role, but scheduler scoring must account for battery, OS restrictions, NAT, +uplink stability, foreground state, and user cost policy. + +## Capability Model + +Nodes must advertise capability facts in heartbeats and peer updates: + +- supported fabric protocol versions; +- supported transports: UDP/QUIC, TCP, WebSocket, HTTPS fallback; +- NAT type and reachability; +- measured RTT/loss/jitter/bandwidth to peers and entry candidates; +- CPU, memory, queue depth, file descriptor/socket pressure; +- battery state, charging state, mobile/wifi network type, metered policy; +- max relay bandwidth and allowed traffic classes; +- service roles and service capacity; +- trust tier and allowed tenant/organization scopes; +- local policy version, peer directory version, route cache version. + +## Fabric Data Session V1 + +The first practical protocol step is a persistent binary data session. It may +initially run over WebSocket/TCP for faster delivery, but the framing must be +transport-neutral so the same protocol can move to QUIC/UDP. + +Minimum frame set: + +```text +HELLO node identity, protocol version, capabilities +AUTH signed session token or mTLS-bound proof +SESSION_READY accepted limits, route epoch, peer epoch +OPEN_STREAM stream id, service id, traffic class, route id +DATA stream id, sequence, flags, payload +ACK stream id, received sequence/window +PING/PONG RTT and liveness +ROUTE_UPDATE new route lease or alternate route set +STREAM_CREDIT per-stream backpressure window +NODE_PRESSURE queue/cpu/memory/network pressure signal +CLOSE_STREAM normal stream close +RESET_STREAM failed stream, other streams remain alive +GOAWAY draining or protocol shutdown +``` + +Traffic classes: + +- `control`: authorization, route updates, attach/detach, liveness. +- `dns`: small, latency-sensitive name resolution. +- `interactive`: RDP input, SSH interactive, UI control. +- `reliable`: normal web/API traffic. +- `bulk`: downloads, uploads, sync, large media. +- `droppable`: telemetry samples, optional probes, low-value background data. + +Each stream has independent flow control and backpressure. Bulk can be slowed or +moved to another route without blocking control or interactive streams. + +## Route Model + +The fabric must maintain multiple candidate routes for an active session: + +```text +phone-a -> entry-1 -> home-1 +phone-a -> phone-b -> relay-2 -> home-1 +phone-a -> entry-2 -> relay-4 -> service-host-7 +``` + +Route scoring inputs: + +- policy and role eligibility; +- route length and failure domains; +- RTT, jitter, packet loss, bandwidth estimate; +- queue depth and retransmit pressure; +- current node CPU/memory/socket pressure; +- mobile battery/charging/metered status; +- historical reliability; +- service locality; +- tenant/organization isolation; +- cost and operator preference. + +Routes are issued as short leases with route id, epoch, allowed channels, +allowed service classes, hop list or next-hop policy, expiry, and fencing rules. + +## Service Discovery + +Services are logical names, not fixed hosts: + +```text +service: admin-console +replicas: home-1, node-2, node-9 +policy: active-active or leader/follower +ingress: vpn.cin.su / admin.cin.su / internal name +``` + +`vpn.cin.su` as an HTTP/HTTPS entry is a service endpoint. It can be hosted on +any eligible service-host node. If one replica fails, another replica can accept +the service lease and traffic can be routed to it. + +## Scale Model + +For 1000 devices, the platform needs entry pools, exit pools, route leases, +session placement, and overload protection. + +For millions of devices, the platform additionally needs regional route +coordinators, distributed peer directories, local control partitions, telemetry +sampling, policy sharding, and resource accounting. + +Every device joining the system increases potential edge capacity, but only if +the scheduler can safely decide when that node is allowed to relay, store, serve, +or only consume. + +## Security And Abuse Controls + +The distributed model increases power and also risk. The following controls are +required before mobile relay/control/storage roles are broadly enabled: + +- node identity is cryptographic; IP address is never identity; +- all route leases are signed or locally verifiable; +- roles are scoped by organization, tenant, service, and time; +- mobile relay is opt-in by policy and user/device state; +- storage uses encrypted shards and explicit retention policy; +- control-plane participation requires trust tier and quorum policy; +- nodes never receive more topology or secret data than their role requires; +- abuse controls rate-limit relay use, route churn, and failed authentication; +- traffic accounting records who relayed what class and how much, without + exposing payload contents. + +## Observability + +The current tests show why aggregate "VPN works" is not enough. The fabric needs +per-node, per-route, and per-stream metrics: + +- throughput by direction and traffic class; +- RTT, jitter, loss, retransmits, queue depth; +- frame encode/decode errors; +- stream resets and close reasons; +- route switch reason and time to recovery; +- node pressure and scheduler decisions; +- service discovery failover events; +- Android foreground/background and network transition events. + +## Work Plan + +### Stage FNP-0: Architecture Lock + +Status: this document. + +Deliverables: + +- fix "every device is a node" as the model; +- separate fabric, services, control, and data plane; +- define missing protocol, route, scale, security, and observability pieces. + +### Stage FNP-1: Binary Frame Contract + +Deliverables: + +- add a transport-neutral Go package for Fabric Data Session V1 frame types; +- encode/decode binary frames with size limits and validation; +- add tests for malformed frames, max frame size, stream ids, and frame type + compatibility; +- do not connect it to production traffic yet. + +### Stage FNP-2: Persistent Session Runtime Skeleton + +Deliverables: + +- implement in-memory session runtime with streams, sequence numbers, ACK, + stream credit, reset, and close; +- prove that a blocked bulk stream does not block control/interactive streams; +- expose per-stream metrics. + +### Stage FNP-3: WebSocket/TCP Compatibility Transport + +Deliverables: + +- carry binary frames over one persistent WebSocket/TCP connection; +- replace high-frequency `/mesh/v1/forward` packet POST usage for VPN routes in + a gated mode; +- keep HTTP forwarding as fallback. + +### Stage FNP-4: Android As Mobile Fabric Node + +Deliverables: + +- Android advertises node capabilities, network state, battery state, and + supported transports; +- Android opens Fabric Data Session V1 to entry; +- VPN packets map to independent streams/classes; +- diagnostics can run per-stream and per-route tests. + +### Stage FNP-5: Route Leases And Multipath + +Deliverables: + +- route result includes primary and alternate routes; +- runtime can switch new streams to a better route; +- interactive streams can recover quickly after route fencing; +- route health uses dataplane metrics, not only HTTP request success. + +### Stage FNP-6: QUIC/UDP Transport + +Deliverables: + +- implement QUIC transport for Fabric Data Session V1; +- preserve WebSocket/TCP as fallback; +- test 4G/Wi-Fi transition and NAT behavior; +- benchmark throughput, latency, and recovery against current HTTP forwarding. + +### Stage FNP-7: Distributed Service Discovery + +Deliverables: + +- service names map to eligible service replicas; +- admin console and VPN service can move between service-host nodes; +- service failover is expressed as leases and route updates. + +### Stage FNP-8: Mobile Relay And Distributed Capacity + +Deliverables: + +- mobile nodes can opt into relay under strict policy; +- scheduler scores battery, metered network, NAT, trust, and load; +- route planner can use mobile nodes where they are closer/faster; +- accounting and abuse controls are enforced. + +### Stage FNP-9: Scale To Large Fleets + +Deliverables: + +- entry and route coordinator pools; +- peer directory sharding; +- telemetry sampling and aggregation; +- per-tenant quotas and fairness; +- load tests for 1000 simulated devices, then larger synthetic fleets. + +## Immediate Next Action + +Start Stage FNP-1 in `rap-node-agent` as a non-production protocol package. The +goal is to create the binary frame contract and tests without disturbing the +current VPN path. After that, wire it into a gated persistent session runtime and +only then move Android/VPN traffic onto it.