Use gated fabric sessions for VPN transport

This commit is contained in:
2026-05-16 01:03:01 +03:00
parent e16f456fe8
commit 03efff6770
6 changed files with 259 additions and 5 deletions
@@ -22,6 +22,7 @@ import (
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/authority" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/authority"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/client" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/client"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/config" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/config"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/hostagent" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/hostagent"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/state" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/state"
@@ -124,7 +125,7 @@ func main() {
log.Printf("workload status failed: %v", err) log.Printf("workload status failed: %v", err)
} }
} }
if err := ensureVPNGatewayRuntime(ctx, api, identity, vpnGateway, meshState); err != nil { if err := ensureVPNGatewayRuntime(ctx, api, cfg, identity, vpnGateway, meshState); err != nil {
log.Printf("vpn gateway runtime failed: %v", err) log.Printf("vpn gateway runtime failed: %v", err)
} }
if err := reportVPNAssignmentStatus(ctx, api, identity, vpnGateway); err != nil { if err := reportVPNAssignmentStatus(ctx, api, identity, vpnGateway); err != nil {
@@ -351,6 +352,8 @@ type syntheticMeshState struct {
ProductionForwardingEnabled bool ProductionForwardingEnabled bool
VPNFabricInbox *vpnruntime.FabricPacketInbox VPNFabricInbox *vpnruntime.FabricPacketInbox
VPNFabricIngress *vpnruntime.FabricClientPacketIngress VPNFabricIngress *vpnruntime.FabricClientPacketIngress
VPNFabricSessionPeers *mesh.FabricSessionPeerManager
PeerEndpoints map[string]string
VPNGateway *vpnruntime.Gateway VPNGateway *vpnruntime.Gateway
ServiceChannelAccessStats *fabricServiceChannelAccessStats ServiceChannelAccessStats *fabricServiceChannelAccessStats
RemoteWorkspaceFrameSink *mesh.RemoteWorkspaceFrameProbeSink RemoteWorkspaceFrameSink *mesh.RemoteWorkspaceFrameProbeSink
@@ -797,6 +800,8 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c
ProductionForwardingEnabled: productionForwardingEnabled, ProductionForwardingEnabled: productionForwardingEnabled,
VPNFabricInbox: vpnFabricInbox, VPNFabricInbox: vpnFabricInbox,
VPNFabricIngress: vpnFabricIngress, VPNFabricIngress: vpnFabricIngress,
VPNFabricSessionPeers: mesh.NewFabricSessionPeerManager(),
PeerEndpoints: copyStringMap(peerEndpoints),
VPNGateway: vpnGateway, VPNGateway: vpnGateway,
ServiceChannelAccessStats: serviceChannelAccessStats, ServiceChannelAccessStats: serviceChannelAccessStats,
RemoteWorkspaceFrameSink: remoteWorkspaceFrameSink, RemoteWorkspaceFrameSink: remoteWorkspaceFrameSink,
@@ -1608,6 +1613,11 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i
} }
productionForwardingEnabled := cfg.MeshProductionForwardingEnabled || loadedConfig.ProductionForwarding productionForwardingEnabled := cfg.MeshProductionForwardingEnabled || loadedConfig.ProductionForwarding
meshState.ProductionForwardingEnabled = productionForwardingEnabled meshState.ProductionForwardingEnabled = productionForwardingEnabled
if !sameStringMap(meshState.PeerEndpoints, loadedConfig.PeerEndpoints) && meshState.VPNFabricSessionPeers != nil {
_ = meshState.VPNFabricSessionPeers.Close()
meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager()
}
meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints)
if productionForwardingEnabled { if productionForwardingEnabled {
meshState.ProductionForwardTransport = mesh.NewHTTPProductionForwardTransport(loadedConfig.PeerEndpoints) meshState.ProductionForwardTransport = mesh.NewHTTPProductionForwardTransport(loadedConfig.PeerEndpoints)
} else { } else {
@@ -3987,6 +3997,29 @@ func sameStringSlice(left []string, right []string) bool {
return true return true
} }
func sameStringMap(left map[string]string, right map[string]string) bool {
if len(left) != len(right) {
return false
}
for key, leftValue := range left {
if right[key] != leftValue {
return false
}
}
return true
}
func copyStringMap(values map[string]string) map[string]string {
if len(values) == 0 {
return map[string]string{}
}
out := make(map[string]string, len(values))
for key, value := range values {
out[key] = value
}
return out
}
func minInt(left, right int) int { func minInt(left, right int) int {
if left < right { if left < right {
return left return left
@@ -4241,7 +4274,7 @@ func parseDNSServerList(value string) []string {
return out return out
} }
func ensureVPNGatewayRuntime(ctx context.Context, api *client.Client, identity state.Identity, gateway *vpnruntime.Gateway, meshState *syntheticMeshState) error { func ensureVPNGatewayRuntime(ctx context.Context, api *client.Client, cfg config.Config, identity state.Identity, gateway *vpnruntime.Gateway, meshState *syntheticMeshState) error {
assignments, err := api.NodeVPNAssignments(ctx, identity.ClusterID, identity.NodeID) assignments, err := api.NodeVPNAssignments(ctx, identity.ClusterID, identity.NodeID)
if err != nil { if err != nil {
return err return err
@@ -4283,7 +4316,7 @@ func ensureVPNGatewayRuntime(ctx context.Context, api *client.Client, identity s
gateway.AddressCIDR = "10.77.0.1/24" gateway.AddressCIDR = "10.77.0.1/24"
gateway.RouteCIDR = "10.77.0.0/24" gateway.RouteCIDR = "10.77.0.0/24"
gateway.PollTimeout = 25 * time.Second gateway.PollTimeout = 25 * time.Second
if transport := fabricGatewayTransportForAssignment(identity, assignment, meshState, api); transport != nil { if transport := fabricGatewayTransportForAssignment(ctx, cfg, identity, assignment, meshState, api); transport != nil {
if _, ok := gateway.Transport.(vpnruntime.BackendPacketTransport); ok { if _, ok := gateway.Transport.(vpnruntime.BackendPacketTransport); ok {
gateway.Stop() gateway.Stop()
} }
@@ -4353,7 +4386,7 @@ func localGatewayTransportForAssignment(identity state.Identity, assignment clie
} }
} }
func fabricGatewayTransportForAssignment(identity state.Identity, assignment client.NodeVPNAssignment, meshState *syntheticMeshState, _ *client.Client) vpnruntime.PacketTransport { func fabricGatewayTransportForAssignment(ctx context.Context, cfg config.Config, identity state.Identity, assignment client.NodeVPNAssignment, meshState *syntheticMeshState, _ *client.Client) vpnruntime.PacketTransport {
if meshState == nil || meshState.ProductionForwardTransport == nil || meshState.VPNFabricInbox == nil { if meshState == nil || meshState.ProductionForwardTransport == nil || meshState.VPNFabricInbox == nil {
return nil return nil
} }
@@ -4361,6 +4394,11 @@ func fabricGatewayTransportForAssignment(identity state.Identity, assignment cli
if !ok { if !ok {
return nil return nil
} }
if cfg.VPNFabricSessionTransportEnabled {
if transport := fabricSessionGatewayTransportForAssignment(ctx, identity, assignment, meshState, nextHop); transport != nil {
return transport
}
}
return &vpnruntime.FabricPacketTransport{ return &vpnruntime.FabricPacketTransport{
ForwardTransport: meshState.ProductionForwardTransport, ForwardTransport: meshState.ProductionForwardTransport,
Inbox: meshState.VPNFabricInbox, Inbox: meshState.VPNFabricInbox,
@@ -4376,6 +4414,68 @@ func fabricGatewayTransportForAssignment(identity state.Identity, assignment cli
} }
} }
func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity state.Identity, assignment client.NodeVPNAssignment, meshState *syntheticMeshState, nextHop string) vpnruntime.PacketTransport {
if meshState == nil || meshState.VPNFabricInbox == nil || meshState.VPNFabricSessionPeers == nil || assignment.VPNConnectionID == "" || nextHop == "" {
return nil
}
endpoint := strings.TrimRight(strings.TrimSpace(meshState.PeerEndpoints[nextHop]), "/")
if endpoint == "" {
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=peer_endpoint_missing", assignment.VPNConnectionID, nextHop)
return nil
}
dialCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
pump, err := meshState.VPNFabricSessionPeers.Get(dialCtx, mesh.FabricSessionPeerTarget{
PeerID: nextHop,
BaseURL: endpoint,
Options: mesh.FabricSessionDialOptions{
Token: fabricSessionGatewayToken(identity, assignment, nextHop),
Timeout: 3 * time.Second,
},
Pump: mesh.FabricSessionPumpOptions{
OutboundBuffer: 256,
InboundBuffer: 256,
ErrorBuffer: 16,
},
})
if err != nil {
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, err)
return nil
}
streamID := uint64(time.Now().UnixNano())
if streamID == 0 {
streamID = 1
}
if err := pump.Send(dialCtx, fabricproto.Frame{
Type: fabricproto.FrameOpenStream,
StreamID: streamID,
TrafficClass: fabricproto.TrafficClassInteractive,
}); err != nil {
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=stream_open_failed error=%v", assignment.VPNConnectionID, nextHop, err)
return nil
}
return &vpnruntime.FabricSessionPacketTransport{
Sender: pump,
Receiver: pump,
Inbox: meshState.VPNFabricInbox,
StreamID: streamID,
VPNConnectionID: assignment.VPNConnectionID,
SendDirection: vpnruntime.FabricDirectionGatewayToClient,
ReceiveDirection: vpnruntime.FabricDirectionClientToGateway,
TrafficClass: vpnruntime.FabricTrafficClassInteractive,
}
}
func fabricSessionGatewayToken(identity state.Identity, assignment client.NodeVPNAssignment, nextHop string) string {
tokenParts := []string{
"rap_fsn_vpn",
strings.ReplaceAll(identity.NodeID, "-", "_"),
strings.ReplaceAll(nextHop, "-", "_"),
strings.ReplaceAll(assignment.VPNConnectionID, "-", "_"),
}
return strings.Join(tokenParts, "_")
}
func selectVPNPacketRoute(routes []mesh.SyntheticRoute, clusterID string, localNodeID string) (mesh.SyntheticRoute, string, bool) { func selectVPNPacketRoute(routes []mesh.SyntheticRoute, clusterID string, localNodeID string) (mesh.SyntheticRoute, string, bool) {
now := time.Now().UTC() now := time.Now().UTC()
for _, route := range routes { for _, route := range routes {
@@ -201,6 +201,8 @@ func TestRouteManagerDecisionsFromControlPlaneRejectsGuardedRemediationCommand(t
func TestGatewayTransportForAssignmentUsesFabricWithoutBackendFallback(t *testing.T) { func TestGatewayTransportForAssignmentUsesFabricWithoutBackendFallback(t *testing.T) {
inbox := vpnruntime.NewFabricPacketInbox(4) inbox := vpnruntime.NewFabricPacketInbox(4)
transport := fabricGatewayTransportForAssignment( transport := fabricGatewayTransportForAssignment(
context.Background(),
config.Config{},
state.Identity{ClusterID: "cluster-1", NodeID: "exit-1"}, state.Identity{ClusterID: "cluster-1", NodeID: "exit-1"},
client.NodeVPNAssignment{VPNConnectionID: "vpn-1"}, client.NodeVPNAssignment{VPNConnectionID: "vpn-1"},
&syntheticMeshState{ &syntheticMeshState{
@@ -223,6 +225,74 @@ func TestGatewayTransportForAssignmentUsesFabricWithoutBackendFallback(t *testin
} }
} }
func TestGatewayTransportForAssignmentUsesFabricSessionWhenEnabled(t *testing.T) {
server := httptest.NewServer(mesh.Server{
Local: mesh.PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"},
FabricSessionEnabled: true,
}.Handler())
defer server.Close()
inbox := vpnruntime.NewFabricPacketInbox(4)
transport := fabricGatewayTransportForAssignment(
context.Background(),
config.Config{VPNFabricSessionTransportEnabled: true},
state.Identity{ClusterID: "cluster-1", NodeID: "exit-1"},
client.NodeVPNAssignment{VPNConnectionID: "vpn-1"},
&syntheticMeshState{
ProductionForwardTransport: noopProductionForwardTransport{},
VPNFabricInbox: inbox,
VPNFabricSessionPeers: mesh.NewFabricSessionPeerManager(),
PeerEndpoints: map[string]string{"entry-1": server.URL},
Routes: []mesh.SyntheticRoute{{
RouteID: "route-exit-entry",
ClusterID: "cluster-1",
SourceNodeID: "exit-1",
DestinationNodeID: "entry-1",
Hops: []string{"exit-1", "entry-1"},
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
ExpiresAt: time.Now().UTC().Add(time.Minute),
}},
},
nil,
)
sessionTransport, ok := transport.(*vpnruntime.FabricSessionPacketTransport)
if !ok {
t.Fatalf("transport = %T, want fabric session packet transport", transport)
}
if err := sessionTransport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("packet")}); err != nil {
t.Fatalf("send fabric session packet: %v", err)
}
}
func TestGatewayTransportForAssignmentFallsBackWhenFabricSessionUnavailable(t *testing.T) {
inbox := vpnruntime.NewFabricPacketInbox(4)
transport := fabricGatewayTransportForAssignment(
context.Background(),
config.Config{VPNFabricSessionTransportEnabled: true},
state.Identity{ClusterID: "cluster-1", NodeID: "exit-1"},
client.NodeVPNAssignment{VPNConnectionID: "vpn-1"},
&syntheticMeshState{
ProductionForwardTransport: noopProductionForwardTransport{},
VPNFabricInbox: inbox,
VPNFabricSessionPeers: mesh.NewFabricSessionPeerManager(),
PeerEndpoints: map[string]string{},
Routes: []mesh.SyntheticRoute{{
RouteID: "route-exit-entry",
ClusterID: "cluster-1",
SourceNodeID: "exit-1",
DestinationNodeID: "entry-1",
Hops: []string{"exit-1", "entry-1"},
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
ExpiresAt: time.Now().UTC().Add(time.Minute),
}},
},
nil,
)
if _, ok := transport.(*vpnruntime.FabricPacketTransport); !ok {
t.Fatalf("transport = %T, want fallback fabric packet transport", transport)
}
}
func TestLocalGatewayTransportForAssignmentUsesLocalInboxWithoutBackendFallback(t *testing.T) { func TestLocalGatewayTransportForAssignmentUsesLocalInboxWithoutBackendFallback(t *testing.T) {
transport := localGatewayTransportForAssignment( transport := localGatewayTransportForAssignment(
state.Identity{ClusterID: "cluster-1", NodeID: "exit-1"}, state.Identity{ClusterID: "cluster-1", NodeID: "exit-1"},
@@ -70,7 +70,52 @@ func (t *FabricSessionPacketTransport) ReceiveGatewayPacketBatch(ctx context.Con
if direction == "" { if direction == "" {
direction = FabricDirectionClientToGateway direction = FabricDirectionClientToGateway
} }
if packets, err := t.Inbox.Receive(ctx, t.VPNConnectionID, direction, 5*time.Millisecond); err != nil || len(packets) > 0 {
return packets, err
}
if t.Receiver == nil {
return t.Inbox.Receive(ctx, t.VPNConnectionID, direction, timeout) return t.Inbox.Receive(ctx, t.VPNConnectionID, direction, timeout)
}
if timeout <= 0 {
timeout = 25 * time.Second
}
timer := time.NewTimer(timeout)
defer timer.Stop()
frames := t.Receiver.Frames()
errorsCh := t.Receiver.Errors()
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timer.C:
return nil, nil
case err, ok := <-errorsCh:
if !ok {
errorsCh = nil
continue
}
if err != nil {
return nil, err
}
case frame, ok := <-frames:
if !ok {
return t.Inbox.Receive(ctx, t.VPNConnectionID, direction, 5*time.Millisecond)
}
if frame.Type != fabricproto.FrameData || (t.StreamID != 0 && frame.StreamID != t.StreamID) {
continue
}
payload, err := DecodeFabricVPNPacketDataFrame(frame)
if err != nil {
return nil, err
}
if payload.VPNConnectionID == t.VPNConnectionID && payload.Direction == direction {
return cleanPacketBatch(payload.Packets), nil
}
if err := t.Inbox.DeliverFabricSessionFrame(ctx, frame); err != nil {
return nil, err
}
}
}
} }
func (t *FabricSessionPacketTransport) RunFrameIngress(ctx context.Context) error { func (t *FabricSessionPacketTransport) RunFrameIngress(ctx context.Context) error {
@@ -265,6 +265,40 @@ func TestFabricSessionPacketTransportRunFrameIngressDeliversInbox(t *testing.T)
} }
} }
func TestFabricSessionPacketTransportReceiveReadsPumpFrames(t *testing.T) {
inbox := NewFabricPacketInbox(4)
receiver := memoryFabricSessionReceiver{
frames: make(chan fabricproto.Frame, 1),
errors: make(chan error, 1),
}
transport := &FabricSessionPacketTransport{
Receiver: receiver,
Inbox: inbox,
StreamID: 711,
VPNConnectionID: "vpn-1",
ReceiveDirection: FabricDirectionClientToGateway,
}
frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{
StreamID: 711,
Sequence: 1,
VPNConnectionID: "vpn-1",
Direction: FabricDirectionClientToGateway,
Packets: [][]byte{[]byte("request")},
})
if err != nil {
t.Fatalf("new fabric vpn frame: %v", err)
}
receiver.frames <- frame
packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second)
if err != nil {
t.Fatalf("receive gateway packet: %v", err)
}
if len(packets) != 1 || string(packets[0]) != "request" {
t.Fatalf("packets = %#v", packets)
}
}
func TestFabricSessionPacketTransportIngressIgnoresOtherStreams(t *testing.T) { func TestFabricSessionPacketTransportIngressIgnoresOtherStreams(t *testing.T) {
inbox := NewFabricPacketInbox(4) inbox := NewFabricPacketInbox(4)
receiver := memoryFabricSessionReceiver{ receiver := memoryFabricSessionReceiver{
@@ -193,6 +193,8 @@ func (g *Gateway) Snapshot() map[string]any {
func (g *Gateway) transportName() string { func (g *Gateway) transportName() string {
switch g.Transport.(type) { switch g.Transport.(type) {
case *FabricSessionPacketTransport:
return "fabric_session"
case *FabricPacketTransport: case *FabricPacketTransport:
return "fabric_mesh" return "fabric_mesh"
case *LocalPacketTransport: case *LocalPacketTransport:
@@ -284,6 +284,9 @@ Node config now carries a separate gated
`RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED` switch and heartbeat report for the `RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED` switch and heartbeat report for the
binary VPN packet transport, keeping endpoint exposure and VPN dataplane binary VPN packet transport, keeping endpoint exposure and VPN dataplane
rollout independently controllable. rollout independently controllable.
When the VPN fabric-session switch is enabled, node-agent now attempts to use a
long-lived peer session for gateway packet transport and falls back to the
existing HTTP production envelope path when the peer session is unavailable.
Deliverables: Deliverables: