diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 2efc567..950d996 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -22,6 +22,7 @@ import ( "github.com/example/remote-access-platform/agents/rap-node-agent/internal/authority" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/client" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/config" + "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/hostagent" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/state" @@ -124,7 +125,7 @@ func main() { log.Printf("workload status failed: %v", err) } } - if err := ensureVPNGatewayRuntime(ctx, api, identity, vpnGateway, meshState); err != nil { + if err := ensureVPNGatewayRuntime(ctx, api, cfg, identity, vpnGateway, meshState); err != nil { log.Printf("vpn gateway runtime failed: %v", err) } if err := reportVPNAssignmentStatus(ctx, api, identity, vpnGateway); err != nil { @@ -351,6 +352,8 @@ type syntheticMeshState struct { ProductionForwardingEnabled bool VPNFabricInbox *vpnruntime.FabricPacketInbox VPNFabricIngress *vpnruntime.FabricClientPacketIngress + VPNFabricSessionPeers *mesh.FabricSessionPeerManager + PeerEndpoints map[string]string VPNGateway *vpnruntime.Gateway ServiceChannelAccessStats *fabricServiceChannelAccessStats RemoteWorkspaceFrameSink *mesh.RemoteWorkspaceFrameProbeSink @@ -797,6 +800,8 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c ProductionForwardingEnabled: productionForwardingEnabled, VPNFabricInbox: vpnFabricInbox, VPNFabricIngress: vpnFabricIngress, + VPNFabricSessionPeers: mesh.NewFabricSessionPeerManager(), + PeerEndpoints: copyStringMap(peerEndpoints), VPNGateway: vpnGateway, ServiceChannelAccessStats: serviceChannelAccessStats, RemoteWorkspaceFrameSink: remoteWorkspaceFrameSink, @@ -1608,6 +1613,11 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i } productionForwardingEnabled := cfg.MeshProductionForwardingEnabled || loadedConfig.ProductionForwarding meshState.ProductionForwardingEnabled = productionForwardingEnabled + if !sameStringMap(meshState.PeerEndpoints, loadedConfig.PeerEndpoints) && meshState.VPNFabricSessionPeers != nil { + _ = meshState.VPNFabricSessionPeers.Close() + meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() + } + meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints) if productionForwardingEnabled { meshState.ProductionForwardTransport = mesh.NewHTTPProductionForwardTransport(loadedConfig.PeerEndpoints) } else { @@ -3987,6 +3997,29 @@ func sameStringSlice(left []string, right []string) bool { return true } +func sameStringMap(left map[string]string, right map[string]string) bool { + if len(left) != len(right) { + return false + } + for key, leftValue := range left { + if right[key] != leftValue { + return false + } + } + return true +} + +func copyStringMap(values map[string]string) map[string]string { + if len(values) == 0 { + return map[string]string{} + } + out := make(map[string]string, len(values)) + for key, value := range values { + out[key] = value + } + return out +} + func minInt(left, right int) int { if left < right { return left @@ -4241,7 +4274,7 @@ func parseDNSServerList(value string) []string { return out } -func ensureVPNGatewayRuntime(ctx context.Context, api *client.Client, identity state.Identity, gateway *vpnruntime.Gateway, meshState *syntheticMeshState) error { +func ensureVPNGatewayRuntime(ctx context.Context, api *client.Client, cfg config.Config, identity state.Identity, gateway *vpnruntime.Gateway, meshState *syntheticMeshState) error { assignments, err := api.NodeVPNAssignments(ctx, identity.ClusterID, identity.NodeID) if err != nil { return err @@ -4283,7 +4316,7 @@ func ensureVPNGatewayRuntime(ctx context.Context, api *client.Client, identity s gateway.AddressCIDR = "10.77.0.1/24" gateway.RouteCIDR = "10.77.0.0/24" gateway.PollTimeout = 25 * time.Second - if transport := fabricGatewayTransportForAssignment(identity, assignment, meshState, api); transport != nil { + if transport := fabricGatewayTransportForAssignment(ctx, cfg, identity, assignment, meshState, api); transport != nil { if _, ok := gateway.Transport.(vpnruntime.BackendPacketTransport); ok { gateway.Stop() } @@ -4353,7 +4386,7 @@ func localGatewayTransportForAssignment(identity state.Identity, assignment clie } } -func fabricGatewayTransportForAssignment(identity state.Identity, assignment client.NodeVPNAssignment, meshState *syntheticMeshState, _ *client.Client) vpnruntime.PacketTransport { +func fabricGatewayTransportForAssignment(ctx context.Context, cfg config.Config, identity state.Identity, assignment client.NodeVPNAssignment, meshState *syntheticMeshState, _ *client.Client) vpnruntime.PacketTransport { if meshState == nil || meshState.ProductionForwardTransport == nil || meshState.VPNFabricInbox == nil { return nil } @@ -4361,6 +4394,11 @@ func fabricGatewayTransportForAssignment(identity state.Identity, assignment cli if !ok { return nil } + if cfg.VPNFabricSessionTransportEnabled { + if transport := fabricSessionGatewayTransportForAssignment(ctx, identity, assignment, meshState, nextHop); transport != nil { + return transport + } + } return &vpnruntime.FabricPacketTransport{ ForwardTransport: meshState.ProductionForwardTransport, Inbox: meshState.VPNFabricInbox, @@ -4376,6 +4414,68 @@ func fabricGatewayTransportForAssignment(identity state.Identity, assignment cli } } +func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity state.Identity, assignment client.NodeVPNAssignment, meshState *syntheticMeshState, nextHop string) vpnruntime.PacketTransport { + if meshState == nil || meshState.VPNFabricInbox == nil || meshState.VPNFabricSessionPeers == nil || assignment.VPNConnectionID == "" || nextHop == "" { + return nil + } + endpoint := strings.TrimRight(strings.TrimSpace(meshState.PeerEndpoints[nextHop]), "/") + if endpoint == "" { + log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=peer_endpoint_missing", assignment.VPNConnectionID, nextHop) + return nil + } + dialCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + pump, err := meshState.VPNFabricSessionPeers.Get(dialCtx, mesh.FabricSessionPeerTarget{ + PeerID: nextHop, + BaseURL: endpoint, + Options: mesh.FabricSessionDialOptions{ + Token: fabricSessionGatewayToken(identity, assignment, nextHop), + Timeout: 3 * time.Second, + }, + Pump: mesh.FabricSessionPumpOptions{ + OutboundBuffer: 256, + InboundBuffer: 256, + ErrorBuffer: 16, + }, + }) + if err != nil { + log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, err) + return nil + } + streamID := uint64(time.Now().UnixNano()) + if streamID == 0 { + streamID = 1 + } + if err := pump.Send(dialCtx, fabricproto.Frame{ + Type: fabricproto.FrameOpenStream, + StreamID: streamID, + TrafficClass: fabricproto.TrafficClassInteractive, + }); err != nil { + log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=stream_open_failed error=%v", assignment.VPNConnectionID, nextHop, err) + return nil + } + return &vpnruntime.FabricSessionPacketTransport{ + Sender: pump, + Receiver: pump, + Inbox: meshState.VPNFabricInbox, + StreamID: streamID, + VPNConnectionID: assignment.VPNConnectionID, + SendDirection: vpnruntime.FabricDirectionGatewayToClient, + ReceiveDirection: vpnruntime.FabricDirectionClientToGateway, + TrafficClass: vpnruntime.FabricTrafficClassInteractive, + } +} + +func fabricSessionGatewayToken(identity state.Identity, assignment client.NodeVPNAssignment, nextHop string) string { + tokenParts := []string{ + "rap_fsn_vpn", + strings.ReplaceAll(identity.NodeID, "-", "_"), + strings.ReplaceAll(nextHop, "-", "_"), + strings.ReplaceAll(assignment.VPNConnectionID, "-", "_"), + } + return strings.Join(tokenParts, "_") +} + func selectVPNPacketRoute(routes []mesh.SyntheticRoute, clusterID string, localNodeID string) (mesh.SyntheticRoute, string, bool) { now := time.Now().UTC() for _, route := range routes { diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index 937afcd..e8b3182 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -201,6 +201,8 @@ func TestRouteManagerDecisionsFromControlPlaneRejectsGuardedRemediationCommand(t func TestGatewayTransportForAssignmentUsesFabricWithoutBackendFallback(t *testing.T) { inbox := vpnruntime.NewFabricPacketInbox(4) transport := fabricGatewayTransportForAssignment( + context.Background(), + config.Config{}, state.Identity{ClusterID: "cluster-1", NodeID: "exit-1"}, client.NodeVPNAssignment{VPNConnectionID: "vpn-1"}, &syntheticMeshState{ @@ -223,6 +225,74 @@ func TestGatewayTransportForAssignmentUsesFabricWithoutBackendFallback(t *testin } } +func TestGatewayTransportForAssignmentUsesFabricSessionWhenEnabled(t *testing.T) { + server := httptest.NewServer(mesh.Server{ + Local: mesh.PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, + FabricSessionEnabled: true, + }.Handler()) + defer server.Close() + + inbox := vpnruntime.NewFabricPacketInbox(4) + transport := fabricGatewayTransportForAssignment( + context.Background(), + config.Config{VPNFabricSessionTransportEnabled: true}, + state.Identity{ClusterID: "cluster-1", NodeID: "exit-1"}, + client.NodeVPNAssignment{VPNConnectionID: "vpn-1"}, + &syntheticMeshState{ + ProductionForwardTransport: noopProductionForwardTransport{}, + VPNFabricInbox: inbox, + VPNFabricSessionPeers: mesh.NewFabricSessionPeerManager(), + PeerEndpoints: map[string]string{"entry-1": server.URL}, + Routes: []mesh.SyntheticRoute{{ + RouteID: "route-exit-entry", + ClusterID: "cluster-1", + SourceNodeID: "exit-1", + DestinationNodeID: "entry-1", + Hops: []string{"exit-1", "entry-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + }}, + }, + nil, + ) + sessionTransport, ok := transport.(*vpnruntime.FabricSessionPacketTransport) + if !ok { + t.Fatalf("transport = %T, want fabric session packet transport", transport) + } + if err := sessionTransport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("packet")}); err != nil { + t.Fatalf("send fabric session packet: %v", err) + } +} + +func TestGatewayTransportForAssignmentFallsBackWhenFabricSessionUnavailable(t *testing.T) { + inbox := vpnruntime.NewFabricPacketInbox(4) + transport := fabricGatewayTransportForAssignment( + context.Background(), + config.Config{VPNFabricSessionTransportEnabled: true}, + state.Identity{ClusterID: "cluster-1", NodeID: "exit-1"}, + client.NodeVPNAssignment{VPNConnectionID: "vpn-1"}, + &syntheticMeshState{ + ProductionForwardTransport: noopProductionForwardTransport{}, + VPNFabricInbox: inbox, + VPNFabricSessionPeers: mesh.NewFabricSessionPeerManager(), + PeerEndpoints: map[string]string{}, + Routes: []mesh.SyntheticRoute{{ + RouteID: "route-exit-entry", + ClusterID: "cluster-1", + SourceNodeID: "exit-1", + DestinationNodeID: "entry-1", + Hops: []string{"exit-1", "entry-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + }}, + }, + nil, + ) + if _, ok := transport.(*vpnruntime.FabricPacketTransport); !ok { + t.Fatalf("transport = %T, want fallback fabric packet transport", transport) + } +} + func TestLocalGatewayTransportForAssignmentUsesLocalInboxWithoutBackendFallback(t *testing.T) { transport := localGatewayTransportForAssignment( state.Identity{ClusterID: "cluster-1", NodeID: "exit-1"}, diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go index afaaad9..8025a6c 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go @@ -70,7 +70,52 @@ func (t *FabricSessionPacketTransport) ReceiveGatewayPacketBatch(ctx context.Con if direction == "" { direction = FabricDirectionClientToGateway } - return t.Inbox.Receive(ctx, t.VPNConnectionID, direction, timeout) + if packets, err := t.Inbox.Receive(ctx, t.VPNConnectionID, direction, 5*time.Millisecond); err != nil || len(packets) > 0 { + return packets, err + } + if t.Receiver == nil { + return t.Inbox.Receive(ctx, t.VPNConnectionID, direction, timeout) + } + if timeout <= 0 { + timeout = 25 * time.Second + } + timer := time.NewTimer(timeout) + defer timer.Stop() + frames := t.Receiver.Frames() + errorsCh := t.Receiver.Errors() + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-timer.C: + return nil, nil + case err, ok := <-errorsCh: + if !ok { + errorsCh = nil + continue + } + if err != nil { + return nil, err + } + case frame, ok := <-frames: + if !ok { + return t.Inbox.Receive(ctx, t.VPNConnectionID, direction, 5*time.Millisecond) + } + if frame.Type != fabricproto.FrameData || (t.StreamID != 0 && frame.StreamID != t.StreamID) { + continue + } + payload, err := DecodeFabricVPNPacketDataFrame(frame) + if err != nil { + return nil, err + } + if payload.VPNConnectionID == t.VPNConnectionID && payload.Direction == direction { + return cleanPacketBatch(payload.Packets), nil + } + if err := t.Inbox.DeliverFabricSessionFrame(ctx, frame); err != nil { + return nil, err + } + } + } } func (t *FabricSessionPacketTransport) RunFrameIngress(ctx context.Context) error { diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index cc8a6bb..1f28f4a 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -265,6 +265,40 @@ func TestFabricSessionPacketTransportRunFrameIngressDeliversInbox(t *testing.T) } } +func TestFabricSessionPacketTransportReceiveReadsPumpFrames(t *testing.T) { + inbox := NewFabricPacketInbox(4) + receiver := memoryFabricSessionReceiver{ + frames: make(chan fabricproto.Frame, 1), + errors: make(chan error, 1), + } + transport := &FabricSessionPacketTransport{ + Receiver: receiver, + Inbox: inbox, + StreamID: 711, + VPNConnectionID: "vpn-1", + ReceiveDirection: FabricDirectionClientToGateway, + } + frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{ + StreamID: 711, + Sequence: 1, + VPNConnectionID: "vpn-1", + Direction: FabricDirectionClientToGateway, + Packets: [][]byte{[]byte("request")}, + }) + if err != nil { + t.Fatalf("new fabric vpn frame: %v", err) + } + receiver.frames <- frame + + packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second) + if err != nil { + t.Fatalf("receive gateway packet: %v", err) + } + if len(packets) != 1 || string(packets[0]) != "request" { + t.Fatalf("packets = %#v", packets) + } +} + func TestFabricSessionPacketTransportIngressIgnoresOtherStreams(t *testing.T) { inbox := NewFabricPacketInbox(4) receiver := memoryFabricSessionReceiver{ diff --git a/agents/rap-node-agent/internal/vpnruntime/gateway.go b/agents/rap-node-agent/internal/vpnruntime/gateway.go index 090fe39..5d7c0ff 100644 --- a/agents/rap-node-agent/internal/vpnruntime/gateway.go +++ b/agents/rap-node-agent/internal/vpnruntime/gateway.go @@ -193,6 +193,8 @@ func (g *Gateway) Snapshot() map[string]any { func (g *Gateway) transportName() string { switch g.Transport.(type) { + case *FabricSessionPacketTransport: + return "fabric_session" case *FabricPacketTransport: return "fabric_mesh" case *LocalPacketTransport: diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 51d408b..5985945 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -284,6 +284,9 @@ Node config now carries a separate gated `RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED` switch and heartbeat report for the binary VPN packet transport, keeping endpoint exposure and VPN dataplane rollout independently controllable. +When the VPN fabric-session switch is enabled, node-agent now attempts to use a +long-lived peer session for gateway packet transport and falls back to the +existing HTTP production envelope path when the peer session is unavailable. Deliverables: