package mesh import ( "bytes" "context" "crypto/ed25519" "crypto/sha256" "encoding/base64" "encoding/hex" "encoding/json" "errors" "fmt" "io" "net/http" "net/http/httptest" "strings" "sync" "testing" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/authority" "github.com/gorilla/websocket" ) func TestMeshHealthAcceptsSameCluster(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} server := httptest.NewServer(Server{Local: local}.Handler()) defer server.Close() client := NewClient(server.URL) ack, err := client.SendHealth(context.Background(), NewHealthMessage( PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, local, )) if err != nil { t.Fatalf("send health: %v", err) } if !ack.Accepted || ack.By.NodeID != "node-b" { t.Fatalf("unexpected ack: %+v", ack) } } func TestMeshHealthRejectsClusterMismatch(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} server := httptest.NewServer(Server{Local: local}.Handler()) defer server.Close() message := NewHealthMessage(PeerIdentity{ClusterID: "cluster-2", NodeID: "node-a"}, local) payload, err := json.Marshal(message) if err != nil { t.Fatalf("marshal message: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/health", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post health: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusForbidden { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusForbidden) } } func TestMeshForwardingDisabled(t *testing.T) { server := httptest.NewServer(Server{Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}}.Handler()) defer server.Close() resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/octet-stream", bytes.NewReader([]byte("payload"))) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusNotImplemented { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusNotImplemented) } } func TestMeshForwardingGateEnabledStillHasNoProductionRuntime(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, }.Handler()) defer server.Close() payload, err := json.Marshal(validProductionEnvelope(local)) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusNotImplemented { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusNotImplemented) } } func TestMeshForwardingGateDeliversFabricControlAtDestination(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-c"} var events []ProductionForwardLogEntry server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, ProductionForwardLogger: func(entry ProductionForwardLogEntry) { events = append(events, entry) }, }.Handler()) defer server.Close() envelope := validProductionEnvelope(local) envelope.SourceNodeID = "node-a" envelope.DestinationNodeID = local.NodeID envelope.CurrentHopNodeID = local.NodeID envelope.NextHopNodeID = local.NodeID payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusOK) } var result ProductionForwardResult if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { t.Fatalf("decode result: %v", err) } if !result.Accepted || !result.Delivered || result.Forwarded || result.By.NodeID != local.NodeID { t.Fatalf("unexpected result: %+v", result) } if !hasProductionForwardEvent(events, "production_forward_accepted") || !hasProductionForwardEvent(events, "production_forward_delivered") { t.Fatalf("missing production forward events: %+v", events) } } func TestMeshForwardingGateForwardsDirectFabricControlToNextHop(t *testing.T) { nodeC := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-c"} var deliveredObservation ProductionEnvelopeObservation serverC := httptest.NewServer(Server{ Local: nodeC, ProductionForwardingEnabled: true, ProductionEnvelopeObserver: func(_ context.Context, observation ProductionEnvelopeObservation) error { deliveredObservation = observation return nil }, }.Handler()) defer serverC.Close() nodeB := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} serverB := httptest.NewServer(Server{ Local: nodeB, ProductionForwardingEnabled: true, ProductionForwardTransport: NewHTTPProductionForwardTransport(map[string]string{ nodeC.NodeID: serverC.URL, }), }.Handler()) defer serverB.Close() envelope := validProductionEnvelope(nodeB) envelope.SourceNodeID = "node-a" envelope.DestinationNodeID = nodeC.NodeID envelope.CurrentHopNodeID = nodeB.NodeID envelope.NextHopNodeID = nodeC.NodeID payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(serverB.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusOK) } var result ProductionForwardResult if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { t.Fatalf("decode result: %v", err) } if !result.Accepted || !result.Forwarded || !result.Delivered || result.NextNodeID != nodeC.NodeID || result.By.NodeID != nodeB.NodeID { t.Fatalf("unexpected forward result: %+v", result) } if deliveredObservation.CurrentHopNodeID != nodeC.NodeID || deliveredObservation.MessageID != envelope.MessageID { t.Fatalf("destination did not observe forwarded envelope: %+v", deliveredObservation) } } func TestMeshForwardingGateForwardsMultiHopFabricControlByRoutePath(t *testing.T) { nodeC := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-c"} var deliveredObservation ProductionEnvelopeObservation var nodeREvents []ProductionForwardLogEntry var nodeBEvents []ProductionForwardLogEntry serverC := httptest.NewServer(Server{ Local: nodeC, ProductionForwardingEnabled: true, ProductionEnvelopeObserver: func(_ context.Context, observation ProductionEnvelopeObservation) error { deliveredObservation = observation return nil }, }.Handler()) defer serverC.Close() nodeR := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-r"} serverR := httptest.NewServer(Server{ Local: nodeR, ProductionForwardingEnabled: true, ProductionForwardTransport: NewHTTPProductionForwardTransport(map[string]string{ nodeC.NodeID: serverC.URL, }), ProductionForwardLogger: func(entry ProductionForwardLogEntry) { nodeREvents = append(nodeREvents, entry) }, }.Handler()) defer serverR.Close() nodeB := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} serverB := httptest.NewServer(Server{ Local: nodeB, ProductionForwardingEnabled: true, ProductionForwardTransport: NewHTTPProductionForwardTransport(map[string]string{ nodeR.NodeID: serverR.URL, }), ProductionForwardLogger: func(entry ProductionForwardLogEntry) { nodeBEvents = append(nodeBEvents, entry) }, }.Handler()) defer serverB.Close() envelope := validProductionEnvelope(nodeB) envelope.SourceNodeID = "node-a" envelope.DestinationNodeID = nodeC.NodeID envelope.CurrentHopNodeID = nodeB.NodeID envelope.NextHopNodeID = nodeR.NodeID envelope.RoutePath = []string{"node-a", nodeB.NodeID, nodeR.NodeID, nodeC.NodeID} payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(serverB.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusOK) } var result ProductionForwardResult if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { t.Fatalf("decode result: %v", err) } if !result.Accepted || !result.Forwarded || !result.Delivered || result.NextNodeID != nodeR.NodeID || result.By.NodeID != nodeB.NodeID { t.Fatalf("unexpected multi-hop result: %+v", result) } if deliveredObservation.CurrentHopNodeID != nodeC.NodeID || deliveredObservation.NextHopNodeID != nodeC.NodeID { t.Fatalf("destination did not observe final hop: %+v", deliveredObservation) } if len(deliveredObservation.VisitedNodeIDs) != 2 || deliveredObservation.VisitedNodeIDs[0] != nodeB.NodeID || deliveredObservation.VisitedNodeIDs[1] != nodeR.NodeID { t.Fatalf("visited path not propagated: %+v", deliveredObservation.VisitedNodeIDs) } if !hasProductionForwardEvent(nodeBEvents, "production_forward_forwarded") || !hasProductionForwardEvent(nodeREvents, "production_forward_forwarded") { t.Fatalf("missing relay forward events: nodeB=%+v nodeR=%+v", nodeBEvents, nodeREvents) } } func TestMeshForwardingGateForwardsConfiguredProductionRoute(t *testing.T) { nodeC := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-c"} route := configuredProductionRoute("route-1", []string{"node-a", "node-b", "node-r", nodeC.NodeID}) var deliveredObservation ProductionEnvelopeObservation serverC := httptest.NewServer(Server{ Local: nodeC, ProductionForwardingEnabled: true, ProductionRoutes: []SyntheticRoute{route}, ProductionEnvelopeObserver: func(_ context.Context, observation ProductionEnvelopeObservation) error { deliveredObservation = observation return nil }, }.Handler()) defer serverC.Close() nodeR := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-r"} serverR := httptest.NewServer(Server{ Local: nodeR, ProductionForwardingEnabled: true, ProductionRoutes: []SyntheticRoute{route}, ProductionForwardTransport: NewHTTPProductionForwardTransport(map[string]string{ nodeC.NodeID: serverC.URL, }), }.Handler()) defer serverR.Close() nodeB := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} serverB := httptest.NewServer(Server{ Local: nodeB, ProductionForwardingEnabled: true, ProductionRoutes: []SyntheticRoute{route}, ProductionForwardTransport: NewHTTPProductionForwardTransport(map[string]string{ nodeR.NodeID: serverR.URL, }), }.Handler()) defer serverB.Close() envelope := validProductionEnvelope(nodeB) envelope.SourceNodeID = "node-a" envelope.DestinationNodeID = nodeC.NodeID envelope.CurrentHopNodeID = nodeB.NodeID envelope.NextHopNodeID = nodeR.NodeID envelope.RoutePath = route.Hops payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(serverB.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusOK) } if deliveredObservation.RouteID != route.RouteID || deliveredObservation.CurrentHopNodeID != nodeC.NodeID { t.Fatalf("configured route was not delivered: %+v", deliveredObservation) } } func TestMeshForwardingGateRejectsUnknownConfiguredProductionRoute(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, ProductionRoutes: []SyntheticRoute{ configuredProductionRoute("route-other", []string{"node-a", local.NodeID, "node-c"}), }, }.Handler()) defer server.Close() envelope := validProductionEnvelope(local) payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusNotFound { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusNotFound) } } func TestMeshForwardingGateRejectsConfiguredProductionRouteWrongNextHop(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} route := configuredProductionRoute("route-1", []string{"node-a", local.NodeID, "node-r", "node-c"}) server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, ProductionRoutes: []SyntheticRoute{route}, }.Handler()) defer server.Close() envelope := validProductionEnvelope(local) envelope.SourceNodeID = "node-a" envelope.DestinationNodeID = "node-c" envelope.CurrentHopNodeID = local.NodeID envelope.NextHopNodeID = "node-c" envelope.RoutePath = route.Hops payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusBadRequest { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusBadRequest) } } func TestMeshForwardingGateRejectsRoutePathWrongNextHop(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} var events []ProductionForwardLogEntry server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, ProductionForwardLogger: func(entry ProductionForwardLogEntry) { events = append(events, entry) }, }.Handler()) defer server.Close() envelope := validProductionEnvelope(local) envelope.SourceNodeID = "node-a" envelope.DestinationNodeID = "node-c" envelope.CurrentHopNodeID = local.NodeID envelope.NextHopNodeID = "node-x" envelope.RoutePath = []string{"node-a", local.NodeID, "node-r", "node-c"} payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusBadRequest { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusBadRequest) } if !hasProductionForwardEvent(events, "production_forward_rejected") { t.Fatalf("missing reject event: %+v", events) } } func TestMeshForwardingGateRejectsRoutePathLoop(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, }.Handler()) defer server.Close() envelope := validProductionEnvelope(local) envelope.SourceNodeID = "node-a" envelope.DestinationNodeID = "node-c" envelope.CurrentHopNodeID = local.NodeID envelope.NextHopNodeID = "node-r" envelope.RoutePath = []string{"node-a", local.NodeID, "node-r", local.NodeID, "node-c"} payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusForbidden { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusForbidden) } } func TestMeshForwardingGateRejectsInvalidProductionEnvelope(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, }.Handler()) defer server.Close() envelope := validProductionEnvelope(local) envelope.PayloadHash = "bad-hash" payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusBadRequest { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusBadRequest) } } func TestMeshForwardingGateRejectsOversizedProductionEnvelopePayload(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} observed := false server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, ProductionEnvelopeObserver: func(context.Context, ProductionEnvelopeObservation) error { observed = true return nil }, }.Handler()) defer server.Close() envelope := validProductionEnvelope(local) envelope.Payload = json.RawMessage(`"` + string(bytes.Repeat([]byte("a"), MaxProductionEnvelopePayloadBytes+1)) + `"`) sum := sha256.Sum256(envelope.Payload) envelope.PayloadLength = len(envelope.Payload) envelope.PayloadHash = hex.EncodeToString(sum[:]) payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusBadRequest { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusBadRequest) } if observed { t.Fatal("observer called for oversized envelope") } } func TestMeshForwardingGateRejectsFutureCreatedAt(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} observed := false server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, ProductionEnvelopeObserver: func(context.Context, ProductionEnvelopeObservation) error { observed = true return nil }, }.Handler()) defer server.Close() envelope := validProductionEnvelope(local) envelope.CreatedAt = time.Now().UTC().Add(MaxProductionEnvelopeFutureSkew + time.Second) envelope.ExpiresAt = envelope.CreatedAt.Add(time.Minute) payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusBadRequest { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusBadRequest) } if observed { t.Fatal("observer called for future-created envelope") } } func TestMeshForwardingGateObservesValidEnvelopeWithoutPayload(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} var observed ProductionEnvelopeObservation server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, ProductionEnvelopeObserver: func(_ context.Context, observation ProductionEnvelopeObservation) error { observed = observation return nil }, }.Handler()) defer server.Close() envelope := validProductionEnvelope(local) payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusNotImplemented { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusNotImplemented) } if observed.MessageID != envelope.MessageID || observed.RouteID != envelope.RouteID { t.Fatalf("unexpected observation: %+v", observed) } if observed.PayloadHash != envelope.PayloadHash || observed.PayloadLength != envelope.PayloadLength { t.Fatalf("payload metadata missing from observation: %+v", observed) } } func TestMeshForwardingGateDoesNotObserveRejectedEnvelope(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} observed := false server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, ProductionEnvelopeObserver: func(context.Context, ProductionEnvelopeObservation) error { observed = true return nil }, }.Handler()) defer server.Close() envelope := validProductionEnvelope(local) envelope.ClusterID = "wrong-cluster" payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusForbidden { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusForbidden) } if observed { t.Fatal("observer called for rejected envelope") } } func TestMeshForwardingGateFailsClosedWhenObservationFails(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, ProductionEnvelopeObserver: func(context.Context, ProductionEnvelopeObservation) error { return errors.New("observer down") }, }.Handler()) defer server.Close() payload, err := json.Marshal(validProductionEnvelope(local)) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusInternalServerError { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusInternalServerError) } } func TestMeshForwardingGateFailsClosedWhenObservationPanics(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, ProductionEnvelopeObserver: func(context.Context, ProductionEnvelopeObservation) error { panic("observer panic") }, }.Handler()) defer server.Close() payload, err := json.Marshal(validProductionEnvelope(local)) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusInternalServerError { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusInternalServerError) } } func TestObserveProductionEnvelopeAllowsNilObserver(t *testing.T) { if err := observeProductionEnvelope(context.Background(), nil, ProductionEnvelopeObservation{}); err != nil { t.Fatalf("observeProductionEnvelope nil observer err = %v", err) } } func TestProductionEnvelopeObservationSinkKeepsBoundedMetadata(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} sink := NewProductionEnvelopeObservationSink(2) server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, ProductionEnvelopeObserver: sink.Observe, }.Handler()) defer server.Close() for i := 1; i <= 3; i++ { envelope := validProductionEnvelope(local) envelope.MessageID = "message-" + string(rune('0'+i)) payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } resp.Body.Close() if resp.StatusCode != http.StatusNotImplemented { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusNotImplemented) } } observations := sink.Snapshot() if len(observations) != 2 { t.Fatalf("observation count = %d, want 2", len(observations)) } if observations[0].MessageID != "message-2" || observations[1].MessageID != "message-3" { t.Fatalf("unexpected bounded observations: %+v", observations) } if observations[0].PayloadHash == "" || observations[0].PayloadLength == 0 { t.Fatalf("payload metadata missing from bounded observation: %+v", observations[0]) } metrics := sink.Metrics() if metrics.Capacity != 2 || metrics.CurrentDepth != 2 || metrics.AcceptedTotal != 3 || metrics.DroppedOldest != 1 { t.Fatalf("unexpected sink metrics: %+v", metrics) } } func TestProductionEnvelopeObservationSinkMetricsStartEmpty(t *testing.T) { sink := NewProductionEnvelopeObservationSink(3) metrics := sink.Metrics() if metrics.Capacity != 3 || metrics.CurrentDepth != 0 || metrics.AcceptedTotal != 0 || metrics.DroppedOldest != 0 { t.Fatalf("unexpected empty metrics: %+v", metrics) } } func TestMeshForwardingGateRejectsServiceChannel(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, }.Handler()) defer server.Close() envelope := validProductionEnvelope(local) envelope.ChannelClass = "render" payload, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusForbidden { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusForbidden) } } func TestMeshForwardingRequiresPost(t *testing.T) { server := httptest.NewServer(Server{Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}}.Handler()) defer server.Close() resp, err := http.Get(server.URL + "/mesh/v1/forward") if err != nil { t.Fatalf("get forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusMethodNotAllowed { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusMethodNotAllowed) } } func validProductionEnvelope(local PeerIdentity) ProductionEnvelope { payload := json.RawMessage(`{"kind":"control"}`) sum := sha256.Sum256(payload) now := time.Now().UTC() return ProductionEnvelope{ FabricProtocolVersion: ProtocolVersion, MessageID: "message-1", RouteID: "route-1", ClusterID: local.ClusterID, SourceNodeID: "node-a", DestinationNodeID: "node-c", CurrentHopNodeID: local.NodeID, NextHopNodeID: "node-c", ChannelClass: ProductionChannelFabricControl, MessageType: ProductionMessageFabricControl, TTL: 4, HopCount: 1, CreatedAt: now, ExpiresAt: now.Add(time.Minute), PayloadLength: len(payload), PayloadHash: hex.EncodeToString(sum[:]), Payload: payload, } } func configuredProductionRoute(routeID string, hops []string) SyntheticRoute { return SyntheticRoute{ RouteID: routeID, ClusterID: "cluster-1", SourceNodeID: hops[0], DestinationNodeID: hops[len(hops)-1], Hops: append([]string{}, hops...), AllowedChannels: []string{ProductionChannelFabricControl}, ExpiresAt: time.Now().UTC().Add(time.Hour), MaxTTL: 8, MaxHops: 8, } } func TestProductionForwardDeliversVPNPacketBatchOnAuthorizedVPNChannel(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-c"} payload := json.RawMessage(`{"vpn_connection_id":"vpn-1","packets":["AAAA"]}`) sum := sha256.Sum256(payload) now := time.Now().UTC() var delivered ProductionEnvelope server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, ProductionRoutes: []SyntheticRoute{{ RouteID: "route-vpn-1", ClusterID: "cluster-1", SourceNodeID: "node-a", DestinationNodeID: "node-c", Hops: []string{"node-a", "node-c"}, AllowedChannels: []string{ProductionChannelVPNPacket}, ExpiresAt: now.Add(time.Hour), MaxTTL: 8, MaxHops: 8, }}, ProductionEnvelopeDelivery: func(_ context.Context, envelope ProductionEnvelope) error { delivered = envelope return nil }, }.Handler()) defer server.Close() envelope := ProductionEnvelope{ FabricProtocolVersion: ProtocolVersion, MessageID: "vpn-message-1", RouteID: "route-vpn-1", ClusterID: "cluster-1", SourceNodeID: "node-a", DestinationNodeID: "node-c", CurrentHopNodeID: "node-c", NextHopNodeID: "node-c", RoutePath: []string{"node-a", "node-c"}, ChannelClass: ProductionChannelVPNPacket, MessageType: ProductionMessageVPNPacketBatch, TTL: 4, HopCount: 1, CreatedAt: now, ExpiresAt: now.Add(time.Minute), PayloadLength: len(payload), PayloadHash: hex.EncodeToString(sum[:]), Payload: payload, } body, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(body)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusOK) } if delivered.MessageID != envelope.MessageID || string(delivered.Payload) != string(payload) { t.Fatalf("delivered envelope = %+v", delivered) } } func TestProductionForwardRejectsVPNPacketOnFabricControlRoute(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-c"} envelope := validProductionEnvelope(local) envelope.RouteID = "route-vpn-blocked" envelope.RoutePath = []string{"node-a", "node-c"} envelope.ChannelClass = ProductionChannelVPNPacket envelope.MessageType = ProductionMessageVPNPacketBatch payload := json.RawMessage(`{"vpn_connection_id":"vpn-1","packets":["AAAA"]}`) sum := sha256.Sum256(payload) envelope.Payload = payload envelope.PayloadLength = len(payload) envelope.PayloadHash = hex.EncodeToString(sum[:]) server := httptest.NewServer(Server{ Local: local, ProductionForwardingEnabled: true, ProductionRoutes: []SyntheticRoute{configuredProductionRoute("route-vpn-blocked", []string{"node-a", "node-c"})}, }.Handler()) defer server.Close() body, err := json.Marshal(envelope) if err != nil { t.Fatalf("marshal envelope: %v", err) } resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(body)) if err != nil { t.Fatalf("post forward: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusForbidden { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusForbidden) } } func TestVPNPacketIngressFallsBackToBackendRelayWhenFabricPeerUnavailable(t *testing.T) { var backendBody []byte backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/api/v1/clusters/cluster-1/vpn-connections/vpn-1/tunnel/client/packets" { t.Fatalf("backend path = %s", r.URL.Path) } if r.Header.Get("X-RAP-Entry-Node") != "entry-1" { t.Fatalf("entry header = %q", r.Header.Get("X-RAP-Entry-Node")) } var err error backendBody, err = io.ReadAll(r.Body) if err != nil { t.Fatalf("read backend body: %v", err) } w.WriteHeader(http.StatusAccepted) })) defer backend.Close() server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: failingVPNPacketIngress{sendErr: ErrForwardPeerUnavailable}, BackendProxyBaseURL: backend.URL + "/api/v1", }.Handler()) defer server.Close() resp, err := http.Post(server.URL+"/api/v1/clusters/cluster-1/vpn-connections/vpn-1/tunnel/client/packets", "application/octet-stream", bytes.NewReader([]byte("packet"))) if err != nil { t.Fatalf("post vpn packet: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusAccepted { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusAccepted) } if string(backendBody) != "packet" { t.Fatalf("backend body = %q", string(backendBody)) } } func TestVPNPacketIngressFallsBackToBackendRelayWhenFabricInboxIsEmpty(t *testing.T) { backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/octet-stream") _, _ = w.Write([]byte("reply")) })) defer backend.Close() server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: failingVPNPacketIngress{}, BackendProxyBaseURL: backend.URL + "/api/v1", }.Handler()) defer server.Close() resp, err := http.Get(server.URL + "/api/v1/clusters/cluster-1/vpn-connections/vpn-1/tunnel/client/packets?timeout_ms=2") if err != nil { t.Fatalf("get vpn packet: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusOK) } body, err := io.ReadAll(resp.Body) if err != nil { t.Fatalf("read body: %v", err) } if string(body) != "reply" { t.Fatalf("body = %q", string(body)) } } func TestFabricServiceChannelVPNPacketIngressRequiresLeaseToken(t *testing.T) { ingress := &recordingVPNPacketIngress{} server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: ingress, }.Handler()) defer server.Close() resp, err := http.Post(server.URL+"/api/v1/clusters/cluster-1/fabric/service-channels/channel-1/vpn-connections/vpn-1/packets", "application/octet-stream", bytes.NewReader([]byte("packet"))) if err != nil { t.Fatalf("post service channel packet: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusUnauthorized { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusUnauthorized) } ingress.mu.Lock() defer ingress.mu.Unlock() if len(ingress.sent) != 0 { t.Fatalf("unexpected sent packets = %#v", ingress.sent) } } func TestFabricServiceChannelVPNPacketIngressMovesBatchOverFabricRuntime(t *testing.T) { ingress := &recordingVPNPacketIngress{ receive: [][]byte{[]byte("reply-1"), []byte("reply-2")}, } server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: ingress, }.Handler()) defer server.Close() body := encodeVPNIngressPacketBatch([][]byte{[]byte("packet-1"), []byte("packet-2")}) req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/clusters/cluster-1/fabric/service-channels/channel-1/vpn-connections/vpn-1/packets?batch=true", bytes.NewReader(body)) if err != nil { t.Fatalf("new request: %v", err) } req.Header.Set("Authorization", "Bearer rap_fsc_testtoken") req.Header.Set("X-RAP-Service-Class", FabricServiceClassVPNPackets) req.Header.Set("X-RAP-Channel-Class", ProductionChannelVPNPacket) req.Header.Set("X-RAP-Traffic-Class", "interactive") req.Header.Set("X-RAP-Fabric-Channel-ID", "channel-1") resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("post service channel packet batch: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusAccepted { t.Fatalf("post status = %d, want %d", resp.StatusCode, http.StatusAccepted) } ingress.mu.Lock() if ingress.clusterID != "cluster-1" || ingress.vpnConnectionID != "vpn-1" { t.Fatalf("ingress ids = %s %s", ingress.clusterID, ingress.vpnConnectionID) } if len(ingress.sent) != 2 || string(ingress.sent[0]) != "packet-1" || string(ingress.sent[1]) != "packet-2" { t.Fatalf("sent packets = %#v", ingress.sent) } if ingress.trafficClass != "interactive" { t.Fatalf("traffic class = %q, want interactive", ingress.trafficClass) } ingress.mu.Unlock() req, err = http.NewRequest(http.MethodGet, server.URL+"/api/v1/clusters/cluster-1/fabric/service-channels/channel-1/vpn-connections/vpn-1/packets?batch=true&timeout_ms=2", nil) if err != nil { t.Fatalf("new get request: %v", err) } req.Header.Set("X-RAP-Service-Channel-Token", "rap_fsc_testtoken") resp, err = http.DefaultClient.Do(req) if err != nil { t.Fatalf("get service channel packet batch: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Fatalf("get status = %d, want %d", resp.StatusCode, http.StatusOK) } payload, err := io.ReadAll(resp.Body) if err != nil { t.Fatalf("read get body: %v", err) } packets, err := decodeVPNIngressPacketBatch(payload) if err != nil { t.Fatalf("decode get batch: %v", err) } if len(packets) != 2 || string(packets[0]) != "reply-1" || string(packets[1]) != "reply-2" { t.Fatalf("reply packets = %#v", packets) } } func TestFabricServiceChannelVPNPacketIngressRequiresSignedLeaseWhenAuthorityPinned(t *testing.T) { publicKey, _, err := ed25519.GenerateKey(nil) if err != nil { t.Fatalf("generate key: %v", err) } ingress := &recordingVPNPacketIngress{} server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: ingress, ClusterAuthorityPublicKey: base64.StdEncoding.EncodeToString(publicKey), }.Handler()) defer server.Close() req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/clusters/cluster-1/fabric/service-channels/channel-1/vpn-connections/vpn-1/packets", bytes.NewReader([]byte("packet"))) if err != nil { t.Fatalf("new request: %v", err) } req.Header.Set("X-RAP-Service-Channel-Token", "rap_fsc_unsigned") resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("post service channel packet: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusForbidden { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusForbidden) } ingress.mu.Lock() defer ingress.mu.Unlock() if len(ingress.sent) != 0 { t.Fatalf("unexpected sent packets = %#v", ingress.sent) } } func TestFabricServiceChannelVPNPacketIngressUsesBackendIntrospectionWhenUnsigned(t *testing.T) { publicKey, _, err := ed25519.GenerateKey(nil) if err != nil { t.Fatalf("generate key: %v", err) } var introspected bool backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/api/v1/clusters/cluster-1/fabric/service-channels/channel-1/introspect" { t.Fatalf("introspection path = %s", r.URL.Path) } introspected = true w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{"fabric_service_channel_introspection":{"allowed":true,"status":"allowed","selected_entry_node_id":"entry-1","allowed_channels":["vpn_packet"],"preferred_route_id":"route-1","lease_status":"ready","primary_route":{"route_id":"route-1","status":"ready"},"data_plane":{"schema_version":"rap.fabric_service_channel_data_plane.v1","mode":"fabric_primary","working_data_transport":"fabric_service_channel","steady_state_transport":"fabric_route","backend_relay_policy":"degraded_fallback_only","service_neutral":true,"protocol_agnostic":true,"logical_flow_mode":"multi_flow_isolated","required_flow_isolation_classes":["control","vpn_packet"]},"expires_at":"2099-01-01T00:00:00Z"}}`)) })) defer backend.Close() ingress := &recordingVPNPacketIngress{} var accessEvents []FabricServiceChannelAccessLogEntry server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: ingress, BackendProxyBaseURL: backend.URL + "/api/v1", ClusterAuthorityPublicKey: base64.StdEncoding.EncodeToString(publicKey), FabricServiceChannelLogger: func(entry FabricServiceChannelAccessLogEntry) { accessEvents = append(accessEvents, entry) }, }.Handler()) defer server.Close() req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/clusters/cluster-1/fabric/service-channels/channel-1/vpn-connections/vpn-1/packets", bytes.NewReader([]byte("packet"))) if err != nil { t.Fatalf("new request: %v", err) } req.Header.Set("X-RAP-Service-Channel-Token", "rap_fsc_unsigned") resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("post service channel packet: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusAccepted { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusAccepted) } if got := resp.Header.Get("X-RAP-Service-Channel-Accepted-By"); got != "introspection" { t.Fatalf("accepted-by header = %q, want introspection", got) } if !introspected { t.Fatal("backend introspection was not called") } if len(accessEvents) != 1 || accessEvents[0].AcceptedBy != "introspection" || accessEvents[0].PreferredRouteID != "route-1" || !accessEvents[0].DataPlaneValid || accessEvents[0].WorkingDataTransport != "fabric_service_channel" || accessEvents[0].SteadyStateTransport != "fabric_route" || accessEvents[0].BackendRelayPolicy != "degraded_fallback_only" { t.Fatalf("unexpected access events: %+v", accessEvents) } ingress.mu.Lock() defer ingress.mu.Unlock() if len(ingress.sent) != 1 || string(ingress.sent[0]) != "packet" { t.Fatalf("sent packets = %#v", ingress.sent) } } func TestFabricServiceChannelVPNPacketIngressVerifiesSignedLeaseAuthority(t *testing.T) { publicKey, privateKey, err := ed25519.GenerateKey(nil) if err != nil { t.Fatalf("generate key: %v", err) } token := "rap_fsc_signedtest" payload := fabricServiceChannelLeaseAuthorityPayload{ SchemaVersion: "rap.fabric_service_channel_lease_authority.v1", ChannelID: "channel-1", ClusterID: "cluster-1", ResourceID: "vpn-1", ServiceClass: FabricServiceClassVPNPackets, SelectedEntryNodeID: "entry-1", SelectedExitNodeID: "exit-1", AllowedChannels: []string{ProductionChannelVPNPacket}, RouteGeneration: "rg-1", FencingEpoch: 7, TokenHash: fabricServiceChannelTokenHash(token), IssuedAt: time.Now().UTC().Add(-time.Minute), ExpiresAt: time.Now().UTC().Add(time.Minute), DataPlane: fabricServiceChannelDataPlaneContract{ SchemaVersion: "rap.fabric_service_channel_data_plane.v1", Mode: "fabric_primary", WorkingDataTransport: "fabric_service_channel", SteadyStateTransport: "fabric_route", BackendRelayPolicy: "degraded_fallback_only", ProductionForwardingRequired: true, ServiceNeutral: true, ProtocolAgnostic: true, LogicalFlowMode: "multi_flow_isolated", RequiredFlowIsolationClasses: []string{"control", ProductionChannelVPNPacket}, }, } payload.PrimaryRoute.RouteID = "route-signed" payload.PrimaryRoute.Status = "authorized" rawPayload, err := json.Marshal(payload) if err != nil { t.Fatalf("marshal payload: %v", err) } canonical, err := authority.CanonicalJSON(rawPayload) if err != nil { t.Fatalf("canonical payload: %v", err) } signature := authority.Signature{ SchemaVersion: authority.SignatureSchemaVersion, Algorithm: authority.AlgorithmEd25519, KeyFingerprint: authority.Fingerprint(publicKey), Signature: base64.StdEncoding.EncodeToString(ed25519.Sign(privateKey, canonical)), } rawSignature, err := json.Marshal(signature) if err != nil { t.Fatalf("marshal signature: %v", err) } ingress := &recordingVPNPacketIngress{} var accessEvents []FabricServiceChannelAccessLogEntry server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: ingress, ClusterAuthorityPublicKey: base64.StdEncoding.EncodeToString(publicKey), FabricServiceChannelLogger: func(entry FabricServiceChannelAccessLogEntry) { accessEvents = append(accessEvents, entry) }, }.Handler()) defer server.Close() req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/clusters/cluster-1/fabric/service-channels/channel-1/vpn-connections/vpn-1/packets", bytes.NewReader([]byte("packet"))) if err != nil { t.Fatalf("new request: %v", err) } req.Header.Set("X-RAP-Service-Channel-Token", token) req.Header.Set("X-RAP-Service-Channel-Authority-Payload", base64.RawURLEncoding.EncodeToString(rawPayload)) req.Header.Set("X-RAP-Service-Channel-Authority-Signature", base64.RawURLEncoding.EncodeToString(rawSignature)) resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("post service channel packet: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusAccepted { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusAccepted) } if len(accessEvents) != 1 || accessEvents[0].AcceptedBy != "signed" || accessEvents[0].PreferredRouteID != "route-signed" || !accessEvents[0].DataPlaneValid || accessEvents[0].WorkingDataTransport != "fabric_service_channel" || accessEvents[0].SteadyStateTransport != "fabric_route" || accessEvents[0].BackendRelayPolicy != "degraded_fallback_only" { t.Fatalf("unexpected signed data-plane access events: %+v", accessEvents) } } func TestFabricServiceChannelRemoteWorkspaceIngressValidatesSignedLeaseAuthority(t *testing.T) { publicKey, privateKey, err := ed25519.GenerateKey(nil) if err != nil { t.Fatalf("generate key: %v", err) } token := "rap_fsc_remoteworkspace" payload := fabricServiceChannelLeaseAuthorityPayload{ SchemaVersion: "rap.fabric_service_channel_lease_authority.v1", ChannelID: "channel-rw", ClusterID: "cluster-1", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, SelectedEntryNodeID: "entry-1", SelectedExitNodeID: "exit-1", AllowedChannels: []string{FabricServiceChannelControl, FabricServiceChannelInteractive, FabricServiceChannelReliable, FabricServiceChannelDroppable}, RouteGeneration: "rg-rw", FencingEpoch: 11, TokenHash: fabricServiceChannelTokenHash(token), IssuedAt: time.Now().UTC().Add(-time.Minute), ExpiresAt: time.Now().UTC().Add(time.Minute), DataPlane: fabricServiceChannelDataPlaneContract{ SchemaVersion: "rap.fabric_service_channel_data_plane.v1", Mode: "fabric_primary", WorkingDataTransport: "fabric_service_channel", SteadyStateTransport: "fabric_route", BackendRelayPolicy: "degraded_fallback_only", ProductionForwardingRequired: true, ServiceNeutral: true, ProtocolAgnostic: true, LogicalFlowMode: "multi_flow_isolated", RequiredFlowIsolationClasses: []string{FabricServiceChannelControl, FabricServiceChannelInteractive, FabricServiceChannelReliable, FabricServiceChannelDroppable}, }, } payload.PrimaryRoute.RouteID = "route-rw" payload.PrimaryRoute.Status = "authorized" rawPayload, err := json.Marshal(payload) if err != nil { t.Fatalf("marshal payload: %v", err) } canonical, err := authority.CanonicalJSON(rawPayload) if err != nil { t.Fatalf("canonical payload: %v", err) } signature := authority.Signature{ SchemaVersion: authority.SignatureSchemaVersion, Algorithm: authority.AlgorithmEd25519, KeyFingerprint: authority.Fingerprint(publicKey), Signature: base64.StdEncoding.EncodeToString(ed25519.Sign(privateKey, canonical)), } rawSignature, err := json.Marshal(signature) if err != nil { t.Fatalf("marshal signature: %v", err) } var accessEvents []FabricServiceChannelAccessLogEntry server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: &recordingVPNPacketIngress{}, ClusterAuthorityPublicKey: base64.StdEncoding.EncodeToString(publicKey), FabricServiceChannelLogger: func(entry FabricServiceChannelAccessLogEntry) { accessEvents = append(accessEvents, entry) }, }.Handler()) defer server.Close() req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/clusters/cluster-1/fabric/service-channels/channel-rw/remote-workspaces/workspace-1/streams/interactive", nil) if err != nil { t.Fatalf("new request: %v", err) } req.Header.Set("X-RAP-Service-Channel-Token", token) req.Header.Set("X-RAP-Service-Class", FabricServiceClassRemoteWorkspace) req.Header.Set("X-RAP-Channel-Class", FabricServiceChannelInteractive) req.Header.Set("X-RAP-Service-Channel-Authority-Payload", base64.RawURLEncoding.EncodeToString(rawPayload)) req.Header.Set("X-RAP-Service-Channel-Authority-Signature", base64.RawURLEncoding.EncodeToString(rawSignature)) resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("post remote workspace probe: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusAccepted { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusAccepted) } var decoded map[string]any if err := json.NewDecoder(resp.Body).Decode(&decoded); err != nil { t.Fatalf("decode response: %v", err) } if decoded["service_class"] != FabricServiceClassRemoteWorkspace || decoded["channel_class"] != FabricServiceChannelInteractive || decoded["payload_flow"] != "not_implemented" { t.Fatalf("unexpected response: %+v", decoded) } if len(accessEvents) != 1 || accessEvents[0].AcceptedBy != "signed" || accessEvents[0].ServiceClass != FabricServiceClassRemoteWorkspace || accessEvents[0].ChannelClass != FabricServiceChannelInteractive || accessEvents[0].PreferredRouteID != "route-rw" || !accessEvents[0].DataPlaneValid || accessEvents[0].WorkingDataTransport != "fabric_service_channel" || accessEvents[0].SteadyStateTransport != "fabric_route" { t.Fatalf("unexpected remote workspace access events: %+v", accessEvents) } } func TestFabricServiceChannelRemoteWorkspaceIngressAcceptsFrameBatchProbeOnly(t *testing.T) { publicKey, privateKey, err := ed25519.GenerateKey(nil) if err != nil { t.Fatalf("generate key: %v", err) } token := "rap_fsc_remoteworkspace_frames" payload := fabricServiceChannelLeaseAuthorityPayload{ SchemaVersion: "rap.fabric_service_channel_lease_authority.v1", ChannelID: "channel-rw", ClusterID: "cluster-1", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, SelectedEntryNodeID: "entry-1", AllowedChannels: []string{FabricServiceChannelInteractive, FabricServiceChannelDroppable}, TokenHash: fabricServiceChannelTokenHash(token), ExpiresAt: time.Now().UTC().Add(time.Minute), DataPlane: fabricServiceChannelDataPlaneContract{ SchemaVersion: "rap.fabric_service_channel_data_plane.v1", Mode: "fabric_primary", WorkingDataTransport: "fabric_service_channel", SteadyStateTransport: "fabric_route", BackendRelayPolicy: "degraded_fallback_only", ServiceNeutral: true, ProtocolAgnostic: true, LogicalFlowMode: "multi_flow_isolated", RequiredFlowIsolationClasses: []string{FabricServiceChannelInteractive, FabricServiceChannelDroppable}, }, } payload.PrimaryRoute.RouteID = "route-rw" payload.PrimaryRoute.Status = "authorized" rawPayload, err := json.Marshal(payload) if err != nil { t.Fatalf("marshal payload: %v", err) } canonical, err := authority.CanonicalJSON(rawPayload) if err != nil { t.Fatalf("canonical payload: %v", err) } signature := authority.Signature{ SchemaVersion: authority.SignatureSchemaVersion, Algorithm: authority.AlgorithmEd25519, KeyFingerprint: authority.Fingerprint(publicKey), Signature: base64.StdEncoding.EncodeToString(ed25519.Sign(privateKey, canonical)), } rawSignature, err := json.Marshal(signature) if err != nil { t.Fatalf("marshal signature: %v", err) } server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: &recordingVPNPacketIngress{}, RemoteWorkspaceFrameSink: NewRemoteWorkspaceFrameProbeSink(), ClusterAuthorityPublicKey: base64.StdEncoding.EncodeToString(publicKey), }.Handler()) defer server.Close() frameBatch := map[string]any{ "schema_version": "rap.remote_workspace_frame_batch.v1", "probe_only": true, "service_class": FabricServiceClassRemoteWorkspace, "channel_class": FabricServiceChannelInteractive, "adapter_contract_id": "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", "frames": []map[string]any{ {"channel": "input", "direction": "client_to_adapter", "payload_encoding": "none", "payload_length": 0, "droppable": true}, {"channel": "display", "direction": "adapter_to_client", "payload_encoding": "none", "payload_length": 0, "droppable": true}, }, } rawFrameBatch, err := json.Marshal(frameBatch) if err != nil { t.Fatalf("marshal frame batch: %v", err) } req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/clusters/cluster-1/fabric/service-channels/channel-rw/remote-workspaces/workspace-1/streams/interactive", bytes.NewReader(rawFrameBatch)) if err != nil { t.Fatalf("new request: %v", err) } req.Header.Set("X-RAP-Service-Channel-Token", token) req.Header.Set("X-RAP-Service-Class", FabricServiceClassRemoteWorkspace) req.Header.Set("X-RAP-Channel-Class", FabricServiceChannelInteractive) req.Header.Set("X-RAP-Service-Channel-Authority-Payload", base64.RawURLEncoding.EncodeToString(rawPayload)) req.Header.Set("X-RAP-Service-Channel-Authority-Signature", base64.RawURLEncoding.EncodeToString(rawSignature)) resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("post remote workspace frame probe: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusAccepted { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusAccepted) } var decoded map[string]any if err := json.NewDecoder(resp.Body).Decode(&decoded); err != nil { t.Fatalf("decode response: %v", err) } if decoded["payload_flow"] != "delivered_probe_only" || decoded["frame_batch_schema"] != "rap.remote_workspace_frame_batch.v1" || int(decoded["frame_count"].(float64)) != 2 { t.Fatalf("unexpected response: %+v", decoded) } adapterSessionID, ok := decoded["adapter_session_id"].(string) if !ok || !strings.HasPrefix(adapterSessionID, "rap-rw-adapter-session-") { t.Fatalf("adapter_session_id = %#v", decoded["adapter_session_id"]) } delivery, ok := decoded["adapter_delivery"].(map[string]any) if !ok || delivery["sink"] != "node_agent_rdp_worker_contract_probe" || delivery["schema_version"] != "rap.remote_workspace_frame_batch_delivery.v1" { t.Fatalf("unexpected adapter delivery: %+v", decoded["adapter_delivery"]) } if delivery["adapter_session_id"] != adapterSessionID || delivery["adapter_runtime_id"] != "node_agent_rdp_worker_contract_probe" || delivery["session_state"] != "probe_bound" { t.Fatalf("unexpected adapter session delivery fields: %+v", delivery) } if int(delivery["accepted_frames"].(float64)) != 2 || int(delivery["acked_frames"].(float64)) != 2 || int(delivery["queue_depth"].(float64)) != 0 { t.Fatalf("unexpected queue delivery fields: %+v", delivery) } } func TestRemoteWorkspaceFrameBatchProbeRejectsGuardrailViolations(t *testing.T) { valid := map[string]any{ "schema_version": "rap.remote_workspace_frame_batch.v1", "probe_only": true, "service_class": FabricServiceClassRemoteWorkspace, "channel_class": FabricServiceChannelInteractive, "adapter_contract_id": "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", "frames": []map[string]any{ {"channel": "input", "direction": "client_to_adapter", "payload_encoding": "none", "payload_length": 0, "droppable": true}, }, } tests := []struct { name string mutate func(map[string]any) wantErr string }{ { name: "production payload forwarding disabled", mutate: func(item map[string]any) { item["probe_only"] = false }, wantErr: "remote workspace payload forwarding is not implemented", }, { name: "unknown logical channel", mutate: func(item map[string]any) { item["frames"] = []map[string]any{{"channel": "unknown", "direction": "client_to_adapter"}} }, wantErr: "unsupported remote workspace adapter frame channel", }, { name: "wrong direction", mutate: func(item map[string]any) { item["frames"] = []map[string]any{{"channel": "display", "direction": "client_to_adapter"}} }, wantErr: "unsupported remote workspace adapter frame direction", }, { name: "service mismatch", mutate: func(item map[string]any) { item["service_class"] = FabricServiceClassVPNPackets }, wantErr: "remote workspace frame batch service class mismatch", }, { name: "channel mismatch", mutate: func(item map[string]any) { item["channel_class"] = FabricServiceChannelReliable }, wantErr: "remote workspace frame batch channel class mismatch", }, { name: "unsupported encoding", mutate: func(item map[string]any) { item["frames"] = []map[string]any{{"channel": "input", "direction": "client_to_adapter", "payload_encoding": "raw"}} }, wantErr: "unsupported remote workspace frame payload encoding", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { item := map[string]any{} for key, value := range valid { item[key] = value } tt.mutate(item) raw, err := json.Marshal(item) if err != nil { t.Fatalf("marshal frame batch: %v", err) } _, err = validateRemoteWorkspaceFrameBatchProbe(raw, FabricServiceChannelInteractive) if err == nil || !strings.Contains(err.Error(), tt.wantErr) { t.Fatalf("err = %v, want contains %q", err, tt.wantErr) } }) } } func TestRemoteWorkspaceFrameProbeSinkAppliesBoundedQueuePolicy(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() droppable := RemoteWorkspaceFrameBatchDelivery{ ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: "rap-rw-adapter-session-test", } for i := 0; i < DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity+3; i++ { droppable.Frames = append(droppable.Frames, RemoteWorkspaceFrameProbeRecord{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }) } receipt, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), droppable) if err != nil { t.Fatalf("accept droppable overflow: %v", err) } if !receipt.Accepted || receipt.AcceptedFrames != DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity || receipt.DroppedFrames != 3 || receipt.AckedFrames != receipt.AcceptedFrames || receipt.QueueDepth != 0 { t.Fatalf("droppable overflow receipt = %+v", receipt) } report := sink.Report(time.Unix(10, 0).UTC()) if report["queue_capacity"] != DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity || report["queue_depth"] != 0 || report["total_accepted_frames"] != int64(DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity) || report["total_dropped_frames"] != int64(3) || report["total_acked_frames"] != int64(DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity) || report["active_session_count"] != 1 || report["session_created_total"] != int64(1) || report["session_bound_total"] != int64(1) || report["current_session_lifecycle_state"] != "probe_bound" || report["current_session_delivery_count"] != int64(1) || report["current_session_dropped_frames"] != int64(3) { t.Fatalf("droppable overflow report = %+v", report) } reliable := RemoteWorkspaceFrameBatchDelivery{ ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: "rap-rw-adapter-session-test", } for i := 0; i < DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity+1; i++ { reliable.Frames = append(reliable.Frames, RemoteWorkspaceFrameProbeRecord{ Channel: "input", Direction: "client_to_adapter", Droppable: false, }) } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), reliable); err == nil || !strings.Contains(err.Error(), "backpressure") { t.Fatalf("reliable overflow err = %v, want backpressure", err) } report = sink.Report(time.Unix(11, 0).UTC()) if report["backpressure_count"] != int64(1) || report["last_rejected_frame_count"] != DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity+1 || report["last_rejected_queue_capacity"] != DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity || report["last_rejected_queue_depth"] != DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity || report["last_rejected_adapter_session_id"] != "rap-rw-adapter-session-test" || report["last_rejected_channel_class"] != FabricServiceChannelInteractive || report["last_rejected_adapter_contract_id"] != "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1" || report["session_backpressure_total"] != int64(1) || report["current_session_lifecycle_state"] != "backpressure" || report["current_session_backpressure_count"] != int64(1) { t.Fatalf("backpressure report = %+v", report) } report = sink.Report(time.Now().UTC().Add(DefaultRemoteWorkspaceFrameProbeSinkSessionTTL + time.Second)) if report["active_session_count"] != 0 || report["session_expired_total"] != int64(1) || report["session_closed_total"] != int64(1) { t.Fatalf("expired lifecycle report = %+v", report) } } func TestRemoteWorkspaceAdapterSessionControlEndpointClosesSession(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: "rap-rw-adapter-session-aaaaaaaaaaaaaaaaaaaaaaaa", Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() body := bytes.NewReader([]byte(`{"action":"close","reason":"unit test close"}`)) controlURL := server.URL + "/mesh/v1/remote-workspace/adapter-sessions/rap-rw-adapter-session-aaaaaaaaaaaaaaaaaaaaaaaa/control" resp, err := http.Post(controlURL, "application/json", body) if err != nil { t.Fatalf("post control: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { raw, _ := io.ReadAll(resp.Body) t.Fatalf("status = %d body=%s", resp.StatusCode, string(raw)) } var result RemoteWorkspaceAdapterSessionControlResult if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { t.Fatalf("decode control result: %v", err) } if !result.Accepted || result.Action != "close" || result.AdapterSessionID != "rap-rw-adapter-session-aaaaaaaaaaaaaaaaaaaaaaaa" || result.PreviousState != "probe_bound" || result.SessionState != "closed" || result.ActiveSessions != 0 { t.Fatalf("control result = %+v", result) } report := sink.Report(time.Now().UTC()) if report["active_session_count"] != 0 || report["session_control_total"] != int64(1) || report["session_closed_total"] != int64(1) || report["last_controlled_adapter_session_id"] != "rap-rw-adapter-session-aaaaaaaaaaaaaaaaaaaaaaaa" || report["last_session_control_action"] != "close" || report["last_session_control_state"] != "closed" { t.Fatalf("control report = %+v", report) } resp, err = http.Post(controlURL, "application/json", bytes.NewReader([]byte(`{"action":"close","reason":"repeat close"}`))) if err != nil { t.Fatalf("post repeat control: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { raw, _ := io.ReadAll(resp.Body) t.Fatalf("repeat status = %d body=%s", resp.StatusCode, string(raw)) } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { t.Fatalf("decode repeat control result: %v", err) } if result.PreviousState != "closed" || result.SessionState != "closed" { t.Fatalf("repeat control result = %+v", result) } report = sink.Report(time.Now().UTC()) if report["session_control_total"] != int64(2) || report["session_closed_total"] != int64(1) { t.Fatalf("repeat control report = %+v", report) } } func TestRemoteWorkspaceAdapterSessionControlRejectsInvalidRequests(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() tests := []struct { name string path string body string statusCode int want string }{ { name: "unknown action", path: "/mesh/v1/remote-workspace/adapter-sessions/rap-rw-adapter-session-aaaaaaaaaaaaaaaaaaaaaaaa/control", body: `{"action":"launch"}`, statusCode: http.StatusBadRequest, want: "unsupported remote workspace adapter session control action", }, { name: "invalid id", path: "/mesh/v1/remote-workspace/adapter-sessions/rap-rw-adapter-session-nothex/control", body: `{"action":"close"}`, statusCode: http.StatusBadRequest, want: "invalid remote workspace adapter session id", }, { name: "unknown session", path: "/mesh/v1/remote-workspace/adapter-sessions/rap-rw-adapter-session-bbbbbbbbbbbbbbbbbbbbbbbb/control", body: `{"action":"close"}`, statusCode: http.StatusBadRequest, want: "remote workspace adapter session not found", }, { name: "bad json", path: "/mesh/v1/remote-workspace/adapter-sessions/rap-rw-adapter-session-aaaaaaaaaaaaaaaaaaaaaaaa/control", body: `{`, statusCode: http.StatusBadRequest, want: "invalid remote workspace adapter session control payload", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { resp, err := http.Post(server.URL+tt.path, "application/json", strings.NewReader(tt.body)) if err != nil { t.Fatalf("post control: %v", err) } defer resp.Body.Close() raw, _ := io.ReadAll(resp.Body) if resp.StatusCode != tt.statusCode || !strings.Contains(string(raw), tt.want) { t.Fatalf("status=%d body=%s, want %d containing %q", resp.StatusCode, string(raw), tt.statusCode, tt.want) } }) } } func TestRemoteWorkspaceAdapterSessionSnapshotEndpointListsActiveAndTerminalSessions(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: "rap-rw-adapter-session-cccccccccccccccccccccccc", Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions?include_terminal=true") if err != nil { t.Fatalf("get snapshot: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { raw, _ := io.ReadAll(resp.Body) t.Fatalf("status = %d body=%s", resp.StatusCode, string(raw)) } var snapshot RemoteWorkspaceAdapterSessionSnapshot if err := json.NewDecoder(resp.Body).Decode(&snapshot); err != nil { t.Fatalf("decode snapshot: %v", err) } if snapshot.SchemaVersion != "rap.remote_workspace_adapter_session_snapshot.v1" || snapshot.ActiveSessionCount != 1 || len(snapshot.Sessions) != 1 || snapshot.Sessions[0].AdapterSessionID != "rap-rw-adapter-session-cccccccccccccccccccccccc" || snapshot.Sessions[0].SessionState != "probe_bound" { t.Fatalf("active snapshot = %+v", snapshot) } controlBody := bytes.NewReader([]byte(`{"action":"close","reason":"snapshot terminal"}`)) controlResp, err := http.Post(server.URL+"/mesh/v1/remote-workspace/adapter-sessions/rap-rw-adapter-session-cccccccccccccccccccccccc/control", "application/json", controlBody) if err != nil { t.Fatalf("post control: %v", err) } controlResp.Body.Close() if controlResp.StatusCode != http.StatusOK { t.Fatalf("control status = %d", controlResp.StatusCode) } resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions?include_terminal=true") if err != nil { t.Fatalf("get terminal snapshot: %v", err) } defer resp.Body.Close() if err := json.NewDecoder(resp.Body).Decode(&snapshot); err != nil { t.Fatalf("decode terminal snapshot: %v", err) } if snapshot.ActiveSessionCount != 0 || snapshot.TerminalSessionCount != 1 || len(snapshot.TerminalSessions) != 1 || snapshot.TerminalSessions[0].AdapterSessionID != "rap-rw-adapter-session-cccccccccccccccccccccccc" || snapshot.TerminalSessions[0].SessionState != "closed" { t.Fatalf("terminal snapshot = %+v", snapshot) } } func TestRemoteWorkspaceAdapterSessionMailboxEndpointReadsAndDrainsEvents(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-dddddddddddddddddddddddd" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } reliable := delivery reliable.Frames = nil for i := 0; i < DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity+1; i++ { reliable.Frames = append(reliable.Frames, RemoteWorkspaceFrameProbeRecord{ Channel: "input", Direction: "client_to_adapter", Droppable: false, }) } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), reliable); err == nil || !strings.Contains(err.Error(), "backpressure") { t.Fatalf("reliable overflow err = %v, want backpressure", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?limit=10") if err != nil { t.Fatalf("get mailbox: %v", err) } defer resp.Body.Close() var mailbox RemoteWorkspaceAdapterMailboxSnapshot if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { t.Fatalf("decode mailbox: %v", err) } if mailbox.SchemaVersion != "rap.remote_workspace_adapter_mailbox_snapshot.v1" || mailbox.MailboxDepth != 2 || mailbox.DepthAfter != 2 || len(mailbox.Events) != 2 || mailbox.Events[0].Event != "frame_batch_probe_delivered" || mailbox.Events[1].Event != "backpressure" { t.Fatalf("mailbox = %+v", mailbox) } resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?drain=true&limit=10") if err != nil { t.Fatalf("drain mailbox: %v", err) } defer resp.Body.Close() if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { t.Fatalf("decode drained mailbox: %v", err) } if !mailbox.Drained || mailbox.MailboxDepth != 2 || mailbox.DepthAfter != 0 || mailbox.DrainedTotal != 2 { t.Fatalf("drained mailbox = %+v", mailbox) } report := sink.Report(time.Now().UTC()) if report["current_session_mailbox_depth"] != 0 || report["current_session_mailbox_enqueued_total"] != int64(2) || report["current_session_mailbox_drained_total"] != int64(2) { t.Fatalf("mailbox report = %+v", report) } } func TestRemoteWorkspaceAdapterSessionMailboxEndpointRejectsInvalidRequests(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-eeeeeeeeeeeeeeeeeeeeeeee" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() tests := []struct { name string path string statusCode int want string }{ { name: "invalid id", path: "/mesh/v1/remote-workspace/adapter-sessions/rap-rw-adapter-session-nothex/mailbox", statusCode: http.StatusBadRequest, want: "invalid remote workspace adapter session id", }, { name: "unknown session", path: "/mesh/v1/remote-workspace/adapter-sessions/rap-rw-adapter-session-ffffffffffffffffffffffff/mailbox", statusCode: http.StatusBadRequest, want: "remote workspace adapter session not found", }, { name: "bad limit", path: "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?limit=bad", statusCode: http.StatusBadRequest, want: "invalid remote workspace adapter session mailbox limit", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { resp, err := http.Get(server.URL + tt.path) if err != nil { t.Fatalf("get mailbox: %v", err) } defer resp.Body.Close() raw, _ := io.ReadAll(resp.Body) if resp.StatusCode != tt.statusCode || !strings.Contains(string(raw), tt.want) { t.Fatalf("status=%d body=%s, want %d containing %q", resp.StatusCode, string(raw), tt.statusCode, tt.want) } }) } report := sink.Report(time.Now().UTC()) if report["current_session_mailbox_depth"] != 1 || report["mailbox_drained_total"] != int64(0) { t.Fatalf("invalid mailbox requests mutated report = %+v", report) } } func TestRemoteWorkspaceAdapterSessionMailboxEndpointDrainsWithLimit(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-111111111111111111111111" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } for i := 0; i < 3; i++ { if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch %d: %v", i, err) } } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?drain=true&limit=1") if err != nil { t.Fatalf("drain mailbox: %v", err) } defer resp.Body.Close() var mailbox RemoteWorkspaceAdapterMailboxSnapshot if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { t.Fatalf("decode drained mailbox: %v", err) } if !mailbox.Drained || mailbox.MailboxDepth != 3 || mailbox.DepthAfter != 2 || len(mailbox.Events) != 1 || mailbox.DrainedTotal != 1 { t.Fatalf("partial drained mailbox = %+v", mailbox) } resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?limit=10") if err != nil { t.Fatalf("read mailbox: %v", err) } defer resp.Body.Close() if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { t.Fatalf("decode mailbox: %v", err) } if mailbox.MailboxDepth != 2 || mailbox.DepthAfter != 2 || len(mailbox.Events) != 2 || mailbox.DrainedTotal != 1 { t.Fatalf("remaining mailbox = %+v", mailbox) } } func TestRemoteWorkspaceAdapterSessionMailboxDropsOldestWhenFull(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-eeeeeeeeeeeeeeeeeeeeeeee" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } for i := 0; i < DefaultRemoteWorkspaceAdapterMailboxCapacity+2; i++ { delivery.ResourceID = fmt.Sprintf("workspace-%d", i) if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch %d: %v", i, err) } } mailbox, err := sink.ReadAdapterSessionMailbox(sessionID, false, 50, 0, time.Now().UTC()) if err != nil { t.Fatalf("read mailbox: %v", err) } if mailbox.MailboxCapacity != DefaultRemoteWorkspaceAdapterMailboxCapacity || mailbox.MailboxDepth != DefaultRemoteWorkspaceAdapterMailboxCapacity || mailbox.DepthAfter != DefaultRemoteWorkspaceAdapterMailboxCapacity || mailbox.EnqueuedTotal != int64(DefaultRemoteWorkspaceAdapterMailboxCapacity+2) || mailbox.DroppedTotal != 2 || len(mailbox.Events) != DefaultRemoteWorkspaceAdapterMailboxCapacity { t.Fatalf("mailbox overflow snapshot = %+v", mailbox) } if mailbox.Events[0].Sequence != 3 || mailbox.Events[len(mailbox.Events)-1].Sequence != int64(DefaultRemoteWorkspaceAdapterMailboxCapacity+2) { t.Fatalf("mailbox event window = first %d last %d", mailbox.Events[0].Sequence, mailbox.Events[len(mailbox.Events)-1].Sequence) } report := sink.Report(time.Now().UTC()) if report["mailbox_dropped_total"] != int64(2) || report["current_session_mailbox_dropped_total"] != int64(2) { t.Fatalf("mailbox drop report = %+v", report) } } func TestRemoteWorkspaceAdapterSessionMailboxGuardrailsAndClosedSession(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-ffffffffffffffffffffffff" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/rap-rw-adapter-session-nothex/mailbox") if err != nil { t.Fatalf("get invalid mailbox: %v", err) } resp.Body.Close() if resp.StatusCode != http.StatusBadRequest { t.Fatalf("invalid mailbox status = %d, want 400", resp.StatusCode) } resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?limit=0") if err != nil { t.Fatalf("get invalid limit mailbox: %v", err) } resp.Body.Close() if resp.StatusCode != http.StatusBadRequest { t.Fatalf("invalid limit mailbox status = %d, want 400", resp.StatusCode) } controlBody := bytes.NewReader([]byte(`{"action":"close","reason":"mailbox closed"}`)) controlResp, err := http.Post(server.URL+"/mesh/v1/remote-workspace/adapter-sessions/"+sessionID+"/control", "application/json", controlBody) if err != nil { t.Fatalf("post control: %v", err) } controlResp.Body.Close() if controlResp.StatusCode != http.StatusOK { t.Fatalf("control status = %d", controlResp.StatusCode) } resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox") if err != nil { t.Fatalf("get closed mailbox: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusBadRequest { t.Fatalf("closed mailbox status = %d, want 400", resp.StatusCode) } } func TestRemoteWorkspaceAdapterSessionMailboxEndpointLongPollsUntilEvent(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-222222222222222222222222" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } if _, err := sink.ReadAdapterSessionMailbox(sessionID, true, 10, 0, time.Now().UTC()); err != nil { t.Fatalf("drain initial mailbox: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() result := make(chan RemoteWorkspaceAdapterMailboxSnapshot, 1) errs := make(chan error, 1) go func() { resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?wait_ms=250&limit=10") if err != nil { errs <- err return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { raw, _ := io.ReadAll(resp.Body) errs <- fmt.Errorf("status=%d body=%s", resp.StatusCode, string(raw)) return } var mailbox RemoteWorkspaceAdapterMailboxSnapshot if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { errs <- err return } result <- mailbox }() time.Sleep(50 * time.Millisecond) if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept delayed frame batch: %v", err) } select { case err := <-errs: t.Fatalf("long poll failed: %v", err) case mailbox := <-result: if !mailbox.Waited || mailbox.WaitTimeout || mailbox.WaitMs != 250 || mailbox.Empty || mailbox.MailboxDepth != 1 || len(mailbox.Events) != 1 || mailbox.Events[0].Event != "frame_batch_probe_delivered" { t.Fatalf("long-poll mailbox = %+v", mailbox) } report := sink.Report(time.Now().UTC()) if report["mailbox_read_total"] != int64(1) || report["mailbox_wait_total"] != int64(1) || report["mailbox_wait_timeout_total"] != int64(0) || report["mailbox_empty_read_total"] != int64(0) || report["current_session_mailbox_wait_total"] != int64(1) || report["current_session_last_mailbox_wait_ms"] != 250 { t.Fatalf("long-poll report = %+v", report) } case <-time.After(time.Second): t.Fatal("timed out waiting for long-poll mailbox") } } func TestRemoteWorkspaceAdapterSessionMailboxEndpointReturnsEmptyAfterWaitTimeout(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-333333333333333333333333" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } if _, err := sink.ReadAdapterSessionMailbox(sessionID, true, 10, 0, time.Now().UTC()); err != nil { t.Fatalf("drain initial mailbox: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?wait_ms=20&limit=10") if err != nil { t.Fatalf("get mailbox: %v", err) } defer resp.Body.Close() var mailbox RemoteWorkspaceAdapterMailboxSnapshot if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { t.Fatalf("decode mailbox: %v", err) } if !mailbox.Empty || !mailbox.Waited || !mailbox.WaitTimeout || mailbox.WaitMs != 20 || mailbox.MailboxDepth != 0 || mailbox.DepthAfter != 0 || len(mailbox.Events) != 0 { t.Fatalf("empty timeout mailbox = %+v", mailbox) } report := sink.Report(time.Now().UTC()) if report["mailbox_read_total"] != int64(1) || report["mailbox_wait_total"] != int64(1) || report["mailbox_wait_timeout_total"] != int64(1) || report["mailbox_empty_read_total"] != int64(1) || report["current_session_mailbox_read_total"] != int64(1) || report["current_session_mailbox_wait_timeout_total"] != int64(1) || report["current_session_mailbox_empty_read_total"] != int64(1) || report["current_session_last_mailbox_wait_timeout"] != true || report["current_session_last_mailbox_empty"] != true { t.Fatalf("empty timeout report = %+v", report) } resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?wait_ms=bad") if err != nil { t.Fatalf("get bad wait mailbox: %v", err) } defer resp.Body.Close() raw, _ := io.ReadAll(resp.Body) if resp.StatusCode != http.StatusBadRequest || !strings.Contains(string(raw), "invalid remote workspace adapter session mailbox wait") { t.Fatalf("bad wait status=%d body=%s", resp.StatusCode, string(raw)) } } func TestRemoteWorkspaceAdapterSessionMailboxEndpointFiltersAfterSequence(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-aaaaaaaaaaaaaaaaaaaaaaaa" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } for i := 0; i < 3; i++ { delivery.ResourceID = fmt.Sprintf("workspace-%d", i) if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch %d: %v", i, err) } } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?after_sequence=1&limit=10") if err != nil { t.Fatalf("get filtered mailbox: %v", err) } defer resp.Body.Close() var mailbox RemoteWorkspaceAdapterMailboxSnapshot if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { t.Fatalf("decode filtered mailbox: %v", err) } if mailbox.AfterSequence != 1 || mailbox.SkippedCount != 1 || mailbox.ReturnedCount != 2 || mailbox.MailboxDepth != 3 || mailbox.DepthAfter != 3 || mailbox.Empty || len(mailbox.Events) != 2 || mailbox.Events[0].Sequence != 2 || mailbox.Events[1].Sequence != 3 { t.Fatalf("filtered mailbox = %+v", mailbox) } report := sink.Report(time.Now().UTC()) if report["mailbox_read_total"] != int64(1) || report["current_session_mailbox_depth"] != 3 { t.Fatalf("filtered mailbox report = %+v", report) } } func TestRemoteWorkspaceAdapterSessionMailboxEndpointLongPollsAfterSequence(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-bbbbbbbbbbbbbbbbbbbbbbbb" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept initial frame batch: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() result := make(chan RemoteWorkspaceAdapterMailboxSnapshot, 1) errs := make(chan error, 1) go func() { resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?after_sequence=1&wait_ms=250&limit=10") if err != nil { errs <- err return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { raw, _ := io.ReadAll(resp.Body) errs <- fmt.Errorf("status=%d body=%s", resp.StatusCode, string(raw)) return } var mailbox RemoteWorkspaceAdapterMailboxSnapshot if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { errs <- err return } result <- mailbox }() time.Sleep(50 * time.Millisecond) delivery.ResourceID = "workspace-delayed" if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept delayed frame batch: %v", err) } select { case err := <-errs: t.Fatalf("after-sequence long poll failed: %v", err) case mailbox := <-result: if mailbox.AfterSequence != 1 || !mailbox.Waited || mailbox.WaitTimeout || mailbox.ReturnedCount != 1 || len(mailbox.Events) != 1 || mailbox.Events[0].Sequence != 2 || mailbox.MailboxDepth != 2 || mailbox.DepthAfter != 2 { t.Fatalf("after-sequence long-poll mailbox = %+v", mailbox) } case <-time.After(time.Second): t.Fatal("timed out waiting for after-sequence long-poll mailbox") } } func TestRemoteWorkspaceAdapterSessionMailboxEndpointRejectsInvalidAfterSequence(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-cccccccccccccccccccccccc" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() tests := []struct { name string path string want string }{ { name: "bad after sequence", path: "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?after_sequence=bad", want: "invalid remote workspace adapter session mailbox after sequence", }, { name: "drain after sequence", path: "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?after_sequence=1&drain=true", want: "remote workspace adapter session mailbox after sequence cannot drain", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { resp, err := http.Get(server.URL + tt.path) if err != nil { t.Fatalf("get mailbox: %v", err) } defer resp.Body.Close() raw, _ := io.ReadAll(resp.Body) if resp.StatusCode != http.StatusBadRequest || !strings.Contains(string(raw), tt.want) { t.Fatalf("status=%d body=%s, want 400 containing %q", resp.StatusCode, string(raw), tt.want) } }) } report := sink.Report(time.Now().UTC()) if report["mailbox_read_total"] != int64(0) || report["current_session_mailbox_depth"] != 1 { t.Fatalf("invalid after-sequence requests mutated report = %+v", report) } } func TestRemoteWorkspaceAdapterSessionMailboxEndpointResumesFromConsumerCursor(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-dddddddddddddddddddddddd" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } for i := 0; i < 3; i++ { delivery.ResourceID = fmt.Sprintf("workspace-%d", i) if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch %d: %v", i, err) } } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?consumer_id=rdp-worker-probe&limit=2") if err != nil { t.Fatalf("get consumer mailbox: %v", err) } defer resp.Body.Close() var mailbox RemoteWorkspaceAdapterMailboxSnapshot if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { t.Fatalf("decode consumer mailbox: %v", err) } if mailbox.ConsumerCheckpointSequence != 2 { t.Fatalf("checkpoint mailbox = %+v", mailbox) } resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?consumer_id=rdp-worker-probe&resume_from=checkpoint&limit=10") if err != nil { t.Fatalf("resume checkpoint mailbox: %v", err) } defer resp.Body.Close() mailbox = RemoteWorkspaceAdapterMailboxSnapshot{} if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { t.Fatalf("decode resume checkpoint mailbox: %v", err) } if mailbox.ResumeFrom != "checkpoint" || mailbox.ResumeSequence != 2 || mailbox.AfterSequence != 2 || mailbox.SkippedCount != 2 || mailbox.ReturnedCount != 1 || len(mailbox.Events) != 1 || mailbox.Events[0].Sequence != 3 || mailbox.ConsumerCheckpointSequence != 3 { t.Fatalf("resume checkpoint mailbox = %+v", mailbox) } resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?consumer_id=rdp-worker-probe&ack_sequence=2&resume_from=ack&limit=10") if err != nil { t.Fatalf("resume ack mailbox: %v", err) } defer resp.Body.Close() mailbox = RemoteWorkspaceAdapterMailboxSnapshot{} if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { t.Fatalf("decode resume ack mailbox: %v", err) } if mailbox.ResumeFrom != "ack" || mailbox.ResumeSequence != 0 || mailbox.AfterSequence != 0 || mailbox.ReturnedCount != 3 || mailbox.ConsumerAckSequence != 2 { t.Fatalf("resume ack mailbox = %+v", mailbox) } report := sink.Report(time.Now().UTC()) if report["mailbox_resume_read_total"] != int64(2) || report["mailbox_after_sequence_read_total"] != int64(1) || report["mailbox_returned_total"] != int64(6) || report["mailbox_skipped_total"] != int64(2) || report["last_mailbox_resume_from"] != "ack" || report["last_mailbox_resume_sequence"] != int64(0) || report["last_mailbox_resume_consumer_id"] != "rdp-worker-probe" || report["last_mailbox_after_sequence"] != int64(0) || report["last_mailbox_skipped_count"] != 0 || report["last_mailbox_returned_count"] != 3 || report["current_session_mailbox_resume_read_total"] != int64(2) || report["current_session_mailbox_after_sequence_read_total"] != int64(1) || report["current_session_mailbox_returned_total"] != int64(6) || report["current_session_mailbox_skipped_total"] != int64(2) || report["current_session_last_mailbox_resume_from"] != "ack" || report["current_session_last_mailbox_resume_sequence"] != int64(0) || report["current_session_last_mailbox_resume_consumer_id"] != "rdp-worker-probe" || report["current_session_last_mailbox_after_sequence"] != int64(0) || report["current_session_last_mailbox_skipped_count"] != 0 || report["current_session_last_mailbox_returned_count"] != 3 { t.Fatalf("invalid resume telemetry report = %+v", report) } readiness, ok := report["adapter_runtime_readiness"].(map[string]any) if !ok { t.Fatalf("adapter runtime readiness missing from report = %+v", report) } if readiness["schema_version"] != "rap.remote_workspace_adapter_runtime_readiness.v1" || readiness["status"] != "cursor_ready" || readiness["diagnostic_state"] != "adapter_cursor_ready" || readiness["ready"] != true || readiness["adapter_session_id"] != sessionID || readiness["session_state"] != "probe_bound" || readiness["mailbox_depth"] != 3 || readiness["consumer_count"] != 1 || readiness["last_consumer_id"] != "rdp-worker-probe" || readiness["last_consumer_checkpoint_sequence"] != int64(3) || readiness["last_consumer_ack_sequence"] != int64(2) || readiness["last_consumer_lag_count"] != 1 || readiness["last_resume_from"] != "ack" || readiness["last_resume_sequence"] != int64(0) || readiness["last_resume_consumer_id"] != "rdp-worker-probe" || readiness["last_returned_count"] != 3 || readiness["last_skipped_count"] != 0 { t.Fatalf("invalid adapter runtime readiness = %+v", readiness) } } func TestRemoteWorkspaceAdapterSessionMailboxEndpointRejectsInvalidResumeFrom(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-eeeeeeeeeeeeeeeeeeeeeeee" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() tests := []struct { name string query string want string }{ { name: "invalid resume cursor", query: "consumer_id=rdp-worker-probe&resume_from=bad", want: "invalid remote workspace adapter mailbox resume cursor", }, { name: "resume without consumer", query: "resume_from=ack", want: "remote workspace adapter mailbox consumer required for resume", }, { name: "resume with after", query: "consumer_id=rdp-worker-probe&resume_from=ack&after_sequence=1", want: "remote workspace adapter mailbox resume cannot combine with after sequence", }, { name: "resume with drain", query: "consumer_id=rdp-worker-probe&resume_from=ack&drain=true", want: "remote workspace adapter mailbox resume cannot drain", }, { name: "resume with reset", query: "consumer_id=rdp-worker-probe&resume_from=ack&reset_consumer=true", want: "remote workspace adapter mailbox resume cannot reset consumer", }, { name: "unknown consumer", query: "consumer_id=rdp-worker-probe&resume_from=ack", want: "remote workspace adapter mailbox consumer not found", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?" + tt.query) if err != nil { t.Fatalf("get mailbox: %v", err) } defer resp.Body.Close() raw, _ := io.ReadAll(resp.Body) if resp.StatusCode != http.StatusBadRequest || !strings.Contains(string(raw), tt.want) { t.Fatalf("status=%d body=%s, want 400 containing %q", resp.StatusCode, string(raw), tt.want) } }) } report := sink.Report(time.Now().UTC()) if report["mailbox_read_total"] != int64(0) || report["mailbox_consumer_read_total"] != int64(0) || report["current_session_mailbox_depth"] != 1 { t.Fatalf("invalid resume requests mutated report = %+v", report) } } func TestRemoteWorkspaceAdapterSessionMailboxConsumerCheckpointAndAck(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-444444444444444444444444" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } for i := 0; i < 2; i++ { delivery.ResourceID = fmt.Sprintf("workspace-%d", i) if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch %d: %v", i, err) } } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?consumer_id=rdp-worker-probe&limit=10") if err != nil { t.Fatalf("get consumer mailbox: %v", err) } defer resp.Body.Close() var mailbox RemoteWorkspaceAdapterMailboxSnapshot if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { t.Fatalf("decode consumer mailbox: %v", err) } if mailbox.ConsumerID != "rdp-worker-probe" || mailbox.ConsumerReadTotal != 1 || mailbox.ConsumerAckTotal != 0 || mailbox.ConsumerCheckpointSequence != mailbox.Events[len(mailbox.Events)-1].Sequence || mailbox.ConsumerAckSequence != 0 || mailbox.ConsumerLagCount != 2 || mailbox.ConsumerCount != 1 || mailbox.ConsumerCapacity != DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity || !mailbox.ConsumerCreated || mailbox.ConsumerCreatedAt == "" || mailbox.ConsumerLastReadAt == "" { t.Fatalf("consumer mailbox = %+v", mailbox) } resp, err = http.Get(fmt.Sprintf("%s/mesh/v1/remote-workspace/adapter-sessions/%s/mailbox?consumer_id=rdp-worker-probe&ack_sequence=%d&limit=10", server.URL, sessionID, mailbox.ConsumerCheckpointSequence)) if err != nil { t.Fatalf("ack consumer mailbox: %v", err) } defer resp.Body.Close() if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { t.Fatalf("decode acked consumer mailbox: %v", err) } if mailbox.ConsumerReadTotal != 2 || mailbox.ConsumerAckTotal != 1 || mailbox.ConsumerAckSequence != mailbox.ConsumerCheckpointSequence || mailbox.ConsumerLagCount != 0 { t.Fatalf("acked consumer mailbox = %+v", mailbox) } report := sink.Report(time.Now().UTC()) if report["mailbox_consumer_count"] != 1 || report["mailbox_consumer_read_total"] != int64(2) || report["mailbox_consumer_ack_total"] != int64(1) || report["last_mailbox_consumer_id"] != "rdp-worker-probe" || report["last_mailbox_consumer_checkpoint_sequence"] != mailbox.ConsumerCheckpointSequence || report["last_mailbox_consumer_ack_sequence"] != mailbox.ConsumerAckSequence || report["current_session_mailbox_consumer_count"] != 1 || report["current_session_mailbox_consumer_read_total"] != int64(2) || report["current_session_mailbox_consumer_ack_total"] != int64(1) { t.Fatalf("consumer report = %+v", report) } } func TestRemoteWorkspaceAdapterSessionMailboxConsumerRejectsInvalidCursorInputs(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-555555555555555555555555" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() tests := []struct { name string query string want string }{ { name: "invalid consumer", query: "consumer_id=bad%2Fconsumer", want: "invalid remote workspace adapter mailbox consumer", }, { name: "invalid ack", query: "consumer_id=rdp-worker-probe&ack_sequence=-1", want: "invalid remote workspace adapter mailbox ack sequence", }, { name: "invalid reset", query: "consumer_id=rdp-worker-probe&reset_consumer=maybe", want: "invalid remote workspace adapter mailbox consumer reset", }, { name: "reset without consumer", query: "reset_consumer=true", want: "remote workspace adapter mailbox consumer required for reset", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?" + tt.query) if err != nil { t.Fatalf("get mailbox: %v", err) } defer resp.Body.Close() raw, _ := io.ReadAll(resp.Body) if resp.StatusCode != http.StatusBadRequest || !strings.Contains(string(raw), tt.want) { t.Fatalf("status=%d body=%s, want 400 containing %q", resp.StatusCode, string(raw), tt.want) } }) } report := sink.Report(time.Now().UTC()) if report["mailbox_read_total"] != int64(0) || report["mailbox_consumer_read_total"] != int64(0) || report["current_session_mailbox_depth"] != 1 { t.Fatalf("invalid consumer requests mutated report = %+v", report) } } func TestRemoteWorkspaceAdapterSessionMailboxConsumerResetCursor(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-777777777777777777777777" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?consumer_id=rdp-worker-probe&ack_sequence=1") if err != nil { t.Fatalf("ack consumer mailbox: %v", err) } defer resp.Body.Close() var mailbox RemoteWorkspaceAdapterMailboxSnapshot if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { t.Fatalf("decode ack mailbox: %v", err) } if mailbox.ConsumerReadTotal != 1 || mailbox.ConsumerAckTotal != 1 || mailbox.ConsumerAckSequence != 1 { t.Fatalf("acked consumer mailbox = %+v", mailbox) } resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?consumer_id=rdp-worker-probe&reset_consumer=true") if err != nil { t.Fatalf("reset consumer mailbox: %v", err) } defer resp.Body.Close() if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { t.Fatalf("decode reset mailbox: %v", err) } if !mailbox.ConsumerReset || !mailbox.ConsumerCreated || mailbox.ConsumerReadTotal != 1 || mailbox.ConsumerAckTotal != 0 || mailbox.ConsumerAckSequence != 0 || mailbox.ConsumerResetTotal != 1 || mailbox.ConsumerCount != 1 { t.Fatalf("reset consumer mailbox = %+v", mailbox) } report := sink.Report(time.Now().UTC()) if report["mailbox_consumer_reset_total"] != int64(1) || report["current_session_mailbox_consumer_reset_total"] != int64(1) { t.Fatalf("reset consumer report = %+v", report) } } func TestRemoteWorkspaceAdapterSessionMailboxConsumerStateIsBounded(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-666666666666666666666666" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() var mailbox RemoteWorkspaceAdapterMailboxSnapshot for i := 0; i < DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity+1; i++ { resp, err := http.Get(fmt.Sprintf("%s/mesh/v1/remote-workspace/adapter-sessions/%s/mailbox?consumer_id=consumer-%02d", server.URL, sessionID, i)) if err != nil { t.Fatalf("get consumer mailbox %d: %v", i, err) } if resp.StatusCode != http.StatusOK { resp.Body.Close() t.Fatalf("consumer mailbox %d status = %d", i, resp.StatusCode) } if err := json.NewDecoder(resp.Body).Decode(&mailbox); err != nil { resp.Body.Close() t.Fatalf("decode consumer mailbox %d: %v", i, err) } resp.Body.Close() } report := sink.Report(time.Now().UTC()) if report["mailbox_consumer_count"] != DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity || report["current_session_mailbox_consumer_count"] != DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity || report["mailbox_consumer_evicted_total"] != int64(1) || report["current_session_mailbox_consumer_evicted_total"] != int64(1) || !mailbox.ConsumerEvicted || mailbox.ConsumerEvictedTotal != 1 || mailbox.ConsumerCount != DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity { t.Fatalf("bounded consumer report = %+v", report) } } func TestRemoteWorkspaceAdapterSessionMailboxConsumerSnapshotIsReadOnly(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-888888888888888888888888" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?consumer_id=consumer-b&ack_sequence=1") if err != nil { t.Fatalf("ack consumer b: %v", err) } resp.Body.Close() resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?consumer_id=consumer-a") if err != nil { t.Fatalf("read consumer a: %v", err) } resp.Body.Close() reportBefore := sink.Report(time.Now().UTC()) resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox/consumers?limit=1") if err != nil { t.Fatalf("get consumer snapshot: %v", err) } defer resp.Body.Close() var snapshot RemoteWorkspaceAdapterMailboxConsumerSnapshot if err := json.NewDecoder(resp.Body).Decode(&snapshot); err != nil { t.Fatalf("decode consumer snapshot: %v", err) } if snapshot.SchemaVersion != "rap.remote_workspace_adapter_mailbox_consumer_snapshot.v1" || snapshot.AdapterSessionID != sessionID || snapshot.ConsumerCapacity != DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity || snapshot.ConsumerCount != 2 || snapshot.ConsumerReadTotal != 2 || snapshot.ConsumerAckTotal != 1 || len(snapshot.Consumers) != 1 || snapshot.Consumers[0].ConsumerID != "consumer-a" || snapshot.Consumers[0].CheckpointSequence != 1 || snapshot.Consumers[0].LagCount != 1 { t.Fatalf("consumer snapshot = %+v", snapshot) } reportAfter := sink.Report(time.Now().UTC()) if reportAfter["mailbox_read_total"] != reportBefore["mailbox_read_total"] || reportAfter["mailbox_consumer_read_total"] != reportBefore["mailbox_consumer_read_total"] || reportAfter["mailbox_consumer_ack_total"] != reportBefore["mailbox_consumer_ack_total"] { t.Fatalf("consumer snapshot mutated report before=%+v after=%+v", reportBefore, reportAfter) } } func TestRemoteWorkspaceAdapterSessionMailboxConsumerSnapshotRejectsInvalidRequests(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-999999999999999999999999" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() tests := []struct { name string path string want string }{ { name: "bad limit", path: "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox/consumers?limit=bad", want: "invalid remote workspace adapter mailbox consumer snapshot limit", }, { name: "invalid id", path: "/mesh/v1/remote-workspace/adapter-sessions/rap-rw-adapter-session-nothex/mailbox/consumers", want: "invalid remote workspace adapter session id", }, { name: "unknown session", path: "/mesh/v1/remote-workspace/adapter-sessions/rap-rw-adapter-session-aaaaaaaaaaaaaaaaaaaaaaaa/mailbox/consumers", want: "remote workspace adapter session not found", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { resp, err := http.Get(server.URL + tt.path) if err != nil { t.Fatalf("get consumer snapshot: %v", err) } defer resp.Body.Close() raw, _ := io.ReadAll(resp.Body) if resp.StatusCode != http.StatusBadRequest || !strings.Contains(string(raw), tt.want) { t.Fatalf("status=%d body=%s, want 400 containing %q", resp.StatusCode, string(raw), tt.want) } }) } } func TestRemoteWorkspaceAdapterSessionMailboxPreflightIsReadOnly(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-abababababababababababab" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } for i := 0; i < 3; i++ { delivery.ResourceID = fmt.Sprintf("workspace-preflight-%d", i) if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch %d: %v", i, err) } } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() resp, err := http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?consumer_id=rdp-worker-probe&limit=2") if err != nil { t.Fatalf("seed checkpoint: %v", err) } resp.Body.Close() resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox?consumer_id=rdp-worker-probe&ack_sequence=1&limit=1") if err != nil { t.Fatalf("seed ack: %v", err) } resp.Body.Close() reportBefore := sink.Report(time.Now().UTC()) resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox/preflight?consumer_id=rdp-worker-probe&resume_from=ack&limit=1") if err != nil { t.Fatalf("get preflight ack: %v", err) } defer resp.Body.Close() var preflight RemoteWorkspaceAdapterMailboxPreflightSnapshot if err := json.NewDecoder(resp.Body).Decode(&preflight); err != nil { t.Fatalf("decode preflight ack: %v", err) } if preflight.SchemaVersion != "rap.remote_workspace_adapter_mailbox_preflight.v1" || preflight.AdapterSessionID != sessionID || !preflight.ReadOnly || preflight.ConsumerID != "rdp-worker-probe" || preflight.ResumeFrom != "ack" || preflight.ResumeSequence != 1 || preflight.AfterSequence != 1 || preflight.Limit != 1 || preflight.MailboxDepth != 3 || preflight.ConsumerCheckpointSequence != 2 || preflight.ConsumerAckSequence != 1 || preflight.ConsumerLagCount != 2 || preflight.ExpectedAvailableCount != 2 || preflight.ExpectedReturnedCount != 1 || preflight.ExpectedSkippedCount != 1 || preflight.FirstExpectedSequence != 2 || preflight.LastExpectedSequence != 2 { t.Fatalf("preflight ack = %+v", preflight) } resp, err = http.Get(server.URL + "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox/preflight?consumer_id=rdp-worker-probe&resume_from=checkpoint&limit=10") if err != nil { t.Fatalf("get preflight checkpoint: %v", err) } defer resp.Body.Close() preflight = RemoteWorkspaceAdapterMailboxPreflightSnapshot{} if err := json.NewDecoder(resp.Body).Decode(&preflight); err != nil { t.Fatalf("decode preflight checkpoint: %v", err) } if preflight.ResumeFrom != "checkpoint" || preflight.ResumeSequence != 2 || preflight.ExpectedAvailableCount != 1 || preflight.ExpectedReturnedCount != 1 || preflight.ExpectedSkippedCount != 2 || preflight.FirstExpectedSequence != 3 || preflight.LastExpectedSequence != 3 { t.Fatalf("preflight checkpoint = %+v", preflight) } reportAfter := sink.Report(time.Now().UTC()) if reportAfter["mailbox_read_total"] != reportBefore["mailbox_read_total"] || reportAfter["mailbox_consumer_read_total"] != reportBefore["mailbox_consumer_read_total"] || reportAfter["mailbox_consumer_ack_total"] != reportBefore["mailbox_consumer_ack_total"] || reportAfter["current_session_mailbox_consumer_read_total"] != reportBefore["current_session_mailbox_consumer_read_total"] || reportAfter["current_session_mailbox_consumer_ack_total"] != reportBefore["current_session_mailbox_consumer_ack_total"] { t.Fatalf("preflight mutated report before=%+v after=%+v", reportBefore, reportAfter) } } func TestRemoteWorkspaceAdapterSessionMailboxPreflightRejectsInvalidRequests(t *testing.T) { sink := NewRemoteWorkspaceFrameProbeSink() sessionID := "rap-rw-adapter-session-acacacacacacacacacacacac" delivery := RemoteWorkspaceFrameBatchDelivery{ ClusterID: "cluster-1", ChannelID: "channel-rw", ResourceID: "workspace-1", ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: FabricServiceChannelInteractive, AdapterContractID: "rap.rdp_worker.remote_workspace_adapter_contract_probe.v1", AdapterSessionID: sessionID, Frames: []RemoteWorkspaceFrameProbeRecord{{ Channel: "display", Direction: "adapter_to_client", Droppable: true, }}, } if _, err := sink.AcceptRemoteWorkspaceFrameBatchProbe(context.Background(), delivery); err != nil { t.Fatalf("accept frame batch: %v", err) } server := httptest.NewServer(Server{RemoteWorkspaceFrameSink: sink}.Handler()) defer server.Close() tests := []struct { name string path string want string }{ { name: "missing consumer", path: "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox/preflight", want: "remote workspace adapter mailbox consumer required for preflight", }, { name: "invalid resume", path: "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox/preflight?consumer_id=consumer-a&resume_from=bogus", want: "invalid remote workspace adapter mailbox resume cursor", }, { name: "bad limit", path: "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox/preflight?consumer_id=consumer-a&limit=bad", want: "invalid remote workspace adapter mailbox preflight limit", }, { name: "unknown consumer", path: "/mesh/v1/remote-workspace/adapter-sessions/" + sessionID + "/mailbox/preflight?consumer_id=consumer-a", want: "remote workspace adapter mailbox consumer not found", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { resp, err := http.Get(server.URL + tt.path) if err != nil { t.Fatalf("get preflight: %v", err) } defer resp.Body.Close() raw, _ := io.ReadAll(resp.Body) if resp.StatusCode != http.StatusBadRequest || !strings.Contains(string(raw), tt.want) { t.Fatalf("status=%d body=%s, want 400 containing %q", resp.StatusCode, string(raw), tt.want) } }) } } func TestFabricServiceChannelVPNPacketIngressHonorsDisabledBackendRelayPolicy(t *testing.T) { publicKey, privateKey, err := ed25519.GenerateKey(nil) if err != nil { t.Fatalf("generate key: %v", err) } var backendCalled bool backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { backendCalled = true w.WriteHeader(http.StatusAccepted) })) defer backend.Close() token := "rap_fsc_nobackend" payload := fabricServiceChannelLeaseAuthorityPayload{ SchemaVersion: "rap.fabric_service_channel_lease_authority.v1", ChannelID: "channel-1", ClusterID: "cluster-1", ResourceID: "vpn-1", ServiceClass: FabricServiceClassVPNPackets, SelectedEntryNodeID: "entry-1", SelectedExitNodeID: "exit-1", AllowedChannels: []string{ProductionChannelVPNPacket}, TokenHash: fabricServiceChannelTokenHash(token), ExpiresAt: time.Now().UTC().Add(time.Minute), DataPlane: fabricServiceChannelDataPlaneContract{ SchemaVersion: "rap.fabric_service_channel_data_plane.v1", Mode: "fabric_primary", WorkingDataTransport: "fabric_service_channel", SteadyStateTransport: "fabric_route", BackendRelayPolicy: "disabled", ProductionForwardingRequired: true, ServiceNeutral: true, ProtocolAgnostic: true, LogicalFlowMode: "multi_flow_isolated", RequiredFlowIsolationClasses: []string{"control", ProductionChannelVPNPacket}, }, } payload.PrimaryRoute.RouteID = "route-signed" payload.PrimaryRoute.Status = "authorized" rawPayload, err := json.Marshal(payload) if err != nil { t.Fatalf("marshal payload: %v", err) } canonical, err := authority.CanonicalJSON(rawPayload) if err != nil { t.Fatalf("canonical payload: %v", err) } signature := authority.Signature{ SchemaVersion: authority.SignatureSchemaVersion, Algorithm: authority.AlgorithmEd25519, KeyFingerprint: authority.Fingerprint(publicKey), Signature: base64.StdEncoding.EncodeToString(ed25519.Sign(privateKey, canonical)), } rawSignature, err := json.Marshal(signature) if err != nil { t.Fatalf("marshal signature: %v", err) } server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: failingVPNPacketIngress{sendErr: ErrRouteNotFound}, BackendProxyBaseURL: backend.URL + "/api/v1", ClusterAuthorityPublicKey: base64.StdEncoding.EncodeToString(publicKey), }.Handler()) defer server.Close() req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/clusters/cluster-1/fabric/service-channels/channel-1/vpn-connections/vpn-1/packets", bytes.NewReader([]byte("packet"))) if err != nil { t.Fatalf("new request: %v", err) } req.Header.Set("X-RAP-Service-Channel-Token", token) req.Header.Set("X-RAP-Service-Channel-Authority-Payload", base64.RawURLEncoding.EncodeToString(rawPayload)) req.Header.Set("X-RAP-Service-Channel-Authority-Signature", base64.RawURLEncoding.EncodeToString(rawSignature)) resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("post service channel packet: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusServiceUnavailable { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusServiceUnavailable) } if backendCalled { t.Fatal("backend relay was called despite disabled data-plane policy") } } func TestFabricServiceChannelVPNPacketWebSocketHonorsDisabledBackendRelayPolicy(t *testing.T) { publicKey, privateKey, err := ed25519.GenerateKey(nil) if err != nil { t.Fatalf("generate key: %v", err) } backendCalled := make(chan struct{}, 1) backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { select { case backendCalled <- struct{}{}: default: } w.WriteHeader(http.StatusAccepted) })) defer backend.Close() token := "rap_fsc_nobackend_ws" payload := fabricServiceChannelLeaseAuthorityPayload{ SchemaVersion: "rap.fabric_service_channel_lease_authority.v1", ChannelID: "channel-1", ClusterID: "cluster-1", ResourceID: "vpn-1", ServiceClass: FabricServiceClassVPNPackets, SelectedEntryNodeID: "entry-1", SelectedExitNodeID: "exit-1", AllowedChannels: []string{ProductionChannelVPNPacket}, TokenHash: fabricServiceChannelTokenHash(token), ExpiresAt: time.Now().UTC().Add(time.Minute), DataPlane: fabricServiceChannelDataPlaneContract{ SchemaVersion: "rap.fabric_service_channel_data_plane.v1", Mode: "fabric_primary", WorkingDataTransport: "fabric_service_channel", SteadyStateTransport: "fabric_route", BackendRelayPolicy: "disabled", ProductionForwardingRequired: true, ServiceNeutral: true, ProtocolAgnostic: true, LogicalFlowMode: "multi_flow_isolated", RequiredFlowIsolationClasses: []string{"control", ProductionChannelVPNPacket}, }, } payload.PrimaryRoute.RouteID = "route-signed" payload.PrimaryRoute.Status = "authorized" rawPayload, err := json.Marshal(payload) if err != nil { t.Fatalf("marshal payload: %v", err) } canonical, err := authority.CanonicalJSON(rawPayload) if err != nil { t.Fatalf("canonical payload: %v", err) } signature := authority.Signature{ SchemaVersion: authority.SignatureSchemaVersion, Algorithm: authority.AlgorithmEd25519, KeyFingerprint: authority.Fingerprint(publicKey), Signature: base64.StdEncoding.EncodeToString(ed25519.Sign(privateKey, canonical)), } rawSignature, err := json.Marshal(signature) if err != nil { t.Fatalf("marshal signature: %v", err) } violations := make(chan FabricServiceChannelAccessLogEntry, 2) server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: failingVPNPacketIngress{sendErr: ErrRouteNotFound}, BackendProxyBaseURL: backend.URL + "/api/v1", ClusterAuthorityPublicKey: base64.StdEncoding.EncodeToString(publicKey), FabricServiceChannelLogger: func(entry FabricServiceChannelAccessLogEntry) { if entry.Event == "fabric_service_channel_data_plane_violation" { violations <- entry } }, }.Handler()) defer server.Close() wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/api/v1/clusters/cluster-1/fabric/service-channels/channel-1/vpn-connections/vpn-1/packets/ws" headers := http.Header{} headers.Set("X-RAP-Service-Channel-Token", token) headers.Set("X-RAP-Service-Channel-Authority-Payload", base64.RawURLEncoding.EncodeToString(rawPayload)) headers.Set("X-RAP-Service-Channel-Authority-Signature", base64.RawURLEncoding.EncodeToString(rawSignature)) conn, _, err := websocket.DefaultDialer.Dial(wsURL, headers) if err != nil { t.Fatalf("dial websocket: %v", err) } defer conn.Close() if err := conn.WriteMessage(websocket.BinaryMessage, encodeVPNIngressPacketBatch([][]byte{[]byte("packet")})); err != nil { t.Fatalf("write packet batch: %v", err) } select { case entry := <-violations: if entry.ViolationStatus != "fabric_route_send_failed_backend_fallback_blocked" || entry.BackendRelayPolicy != "disabled" || entry.ChannelID != "channel-1" || entry.ResourceID != "vpn-1" { t.Fatalf("violation = %+v", entry) } case <-time.After(2 * time.Second): t.Fatal("blocked fallback violation was not logged") } select { case <-backendCalled: t.Fatal("backend relay was called despite disabled data-plane policy") default: } } func TestFabricServiceChannelVPNPacketIngressRejectsSignedLeaseForDifferentEntry(t *testing.T) { publicKey, privateKey, err := ed25519.GenerateKey(nil) if err != nil { t.Fatalf("generate key: %v", err) } token := "rap_fsc_signedtest" payload := fabricServiceChannelLeaseAuthorityPayload{ SchemaVersion: "rap.fabric_service_channel_lease_authority.v1", ChannelID: "channel-1", ClusterID: "cluster-1", ResourceID: "vpn-1", ServiceClass: FabricServiceClassVPNPackets, SelectedEntryNodeID: "other-entry", SelectedExitNodeID: "exit-1", AllowedChannels: []string{ProductionChannelVPNPacket}, TokenHash: fabricServiceChannelTokenHash(token), ExpiresAt: time.Now().UTC().Add(time.Minute), } rawPayload, err := json.Marshal(payload) if err != nil { t.Fatalf("marshal payload: %v", err) } canonical, err := authority.CanonicalJSON(rawPayload) if err != nil { t.Fatalf("canonical payload: %v", err) } signature := authority.Signature{ SchemaVersion: authority.SignatureSchemaVersion, Algorithm: authority.AlgorithmEd25519, KeyFingerprint: authority.Fingerprint(publicKey), Signature: base64.StdEncoding.EncodeToString(ed25519.Sign(privateKey, canonical)), } rawSignature, err := json.Marshal(signature) if err != nil { t.Fatalf("marshal signature: %v", err) } server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: &recordingVPNPacketIngress{}, ClusterAuthorityPublicKey: base64.StdEncoding.EncodeToString(publicKey), }.Handler()) defer server.Close() req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/clusters/cluster-1/fabric/service-channels/channel-1/vpn-connections/vpn-1/packets", bytes.NewReader([]byte("packet"))) if err != nil { t.Fatalf("new request: %v", err) } req.Header.Set("X-RAP-Service-Channel-Token", token) req.Header.Set("X-RAP-Service-Channel-Authority-Payload", base64.RawURLEncoding.EncodeToString(rawPayload)) req.Header.Set("X-RAP-Service-Channel-Authority-Signature", base64.RawURLEncoding.EncodeToString(rawSignature)) resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("post service channel packet: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusForbidden { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusForbidden) } } func TestFabricServiceChannelVPNPacketIngressFallsBackToBackendRelay(t *testing.T) { var backendPath string var backendBody []byte backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { backendPath = r.URL.Path var err error backendBody, err = io.ReadAll(r.Body) if err != nil { t.Fatalf("read backend body: %v", err) } w.WriteHeader(http.StatusAccepted) })) defer backend.Close() server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: failingVPNPacketIngress{sendErr: ErrRouteNotFound}, BackendProxyBaseURL: backend.URL + "/api/v1", }.Handler()) defer server.Close() req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/clusters/cluster-1/fabric/service-channels/channel-1/vpn-connections/vpn-1/packets", bytes.NewReader([]byte("packet"))) if err != nil { t.Fatalf("new request: %v", err) } req.Header.Set("X-RAP-Service-Channel-Token", "rap_fsc_testtoken") resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("post service channel packet: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusAccepted { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusAccepted) } if backendPath != "/api/v1/clusters/cluster-1/vpn-connections/vpn-1/tunnel/client/packets" { t.Fatalf("backend path = %s", backendPath) } if string(backendBody) != "packet" { t.Fatalf("backend body = %q", string(backendBody)) } } func TestFabricServiceChannelVPNPacketIngressUsesSignedDegradedFallback(t *testing.T) { publicKey, privateKey, err := ed25519.GenerateKey(nil) if err != nil { t.Fatalf("generate key: %v", err) } token := "rap_fsc_degradedtest" payload := fabricServiceChannelLeaseAuthorityPayload{ SchemaVersion: "rap.fabric_service_channel_lease_authority.v1", ChannelID: "channel-1", ClusterID: "cluster-1", ResourceID: "vpn-1", ServiceClass: FabricServiceClassVPNPackets, Status: "degraded_fallback", SelectedEntryNodeID: "entry-1", SelectedExitNodeID: "exit-1", AllowedChannels: []string{ProductionChannelVPNPacket}, TokenHash: fabricServiceChannelTokenHash(token), ExpiresAt: time.Now().UTC().Add(time.Minute), } payload.PrimaryRoute.Status = "missing_route_intent" rawPayload, err := json.Marshal(payload) if err != nil { t.Fatalf("marshal payload: %v", err) } canonical, err := authority.CanonicalJSON(rawPayload) if err != nil { t.Fatalf("canonical payload: %v", err) } signature := authority.Signature{ SchemaVersion: authority.SignatureSchemaVersion, Algorithm: authority.AlgorithmEd25519, KeyFingerprint: authority.Fingerprint(publicKey), Signature: base64.StdEncoding.EncodeToString(ed25519.Sign(privateKey, canonical)), } rawSignature, err := json.Marshal(signature) if err != nil { t.Fatalf("marshal signature: %v", err) } var backendBody []byte backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/api/v1/clusters/cluster-1/vpn-connections/vpn-1/tunnel/client/packets" { t.Fatalf("backend path = %s", r.URL.Path) } var err error backendBody, err = io.ReadAll(r.Body) if err != nil { t.Fatalf("read backend body: %v", err) } w.WriteHeader(http.StatusAccepted) })) defer backend.Close() ingress := &recordingVPNPacketIngress{} server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: ingress, BackendProxyBaseURL: backend.URL + "/api/v1", ClusterAuthorityPublicKey: base64.StdEncoding.EncodeToString(publicKey), }.Handler()) defer server.Close() req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/clusters/cluster-1/fabric/service-channels/channel-1/vpn-connections/vpn-1/packets", bytes.NewReader([]byte("packet"))) if err != nil { t.Fatalf("new request: %v", err) } req.Header.Set("X-RAP-Service-Channel-Token", token) req.Header.Set("X-RAP-Service-Channel-Authority-Payload", base64.RawURLEncoding.EncodeToString(rawPayload)) req.Header.Set("X-RAP-Service-Channel-Authority-Signature", base64.RawURLEncoding.EncodeToString(rawSignature)) resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("post service channel packet: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusAccepted { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusAccepted) } if string(backendBody) != "packet" { t.Fatalf("backend body = %q", string(backendBody)) } ingress.mu.Lock() defer ingress.mu.Unlock() if len(ingress.sent) != 0 { t.Fatalf("fabric ingress should not receive degraded fallback packets: %#v", ingress.sent) } } func TestVPNPacketIngressWebSocketMovesBatchesBothDirections(t *testing.T) { ingress := &recordingVPNPacketIngress{ receive: [][]byte{[]byte("reply-1"), []byte("reply-2")}, } server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: ingress, }.Handler()) defer server.Close() wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/api/v1/clusters/cluster-1/vpn-connections/vpn-1/tunnel/client/packets/ws" conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) if err != nil { t.Fatalf("dial websocket: %v", err) } defer conn.Close() if err := conn.WriteMessage(websocket.BinaryMessage, encodeVPNIngressPacketBatch([][]byte{[]byte("packet-1"), []byte("packet-2")})); err != nil { t.Fatalf("write packet batch: %v", err) } if err := conn.SetReadDeadline(time.Now().Add(2 * time.Second)); err != nil { t.Fatalf("set read deadline: %v", err) } messageType, payload, err := conn.ReadMessage() if err != nil { t.Fatalf("read packet batch: %v", err) } if messageType != websocket.BinaryMessage { t.Fatalf("message type = %d, want binary", messageType) } packets, err := decodeVPNIngressPacketBatch(payload) if err != nil { t.Fatalf("decode reply batch: %v", err) } if len(packets) != 2 || string(packets[0]) != "reply-1" || string(packets[1]) != "reply-2" { t.Fatalf("reply packets = %#v", packets) } deadline := time.Now().Add(2 * time.Second) for { ingress.mu.Lock() sent := append([][]byte(nil), ingress.sent...) clusterID := ingress.clusterID vpnConnectionID := ingress.vpnConnectionID ingress.mu.Unlock() if len(sent) == 2 { if clusterID != "cluster-1" || vpnConnectionID != "vpn-1" { t.Fatalf("ingress ids = %s %s", clusterID, vpnConnectionID) } if string(sent[0]) != "packet-1" || string(sent[1]) != "packet-2" { t.Fatalf("sent packets = %#v", sent) } break } if time.Now().After(deadline) { t.Fatalf("sent packets = %#v", sent) } time.Sleep(10 * time.Millisecond) } } func TestFabricServiceChannelVPNPacketWebSocketPreservesTrafficClass(t *testing.T) { ingress := &recordingVPNPacketIngress{ receive: [][]byte{[]byte("reply")}, } server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: ingress, }.Handler()) defer server.Close() wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/api/v1/clusters/cluster-1/fabric/service-channels/channel-1/vpn-connections/vpn-1/packets/ws" headers := http.Header{} headers.Set("Authorization", "Bearer rap_fsc_testtoken") headers.Set("X-RAP-Service-Class", FabricServiceClassVPNPackets) headers.Set("X-RAP-Channel-Class", ProductionChannelVPNPacket) headers.Set("X-RAP-Traffic-Class", "interactive") headers.Set("X-RAP-Fabric-Channel-ID", "channel-1") conn, _, err := websocket.DefaultDialer.Dial(wsURL, headers) if err != nil { t.Fatalf("dial websocket: %v", err) } defer conn.Close() if err := conn.WriteMessage(websocket.BinaryMessage, encodeVPNIngressPacketBatch([][]byte{[]byte("packet")})); err != nil { t.Fatalf("write packet batch: %v", err) } if err := conn.SetReadDeadline(time.Now().Add(2 * time.Second)); err != nil { t.Fatalf("set read deadline: %v", err) } if _, _, err := conn.ReadMessage(); err != nil { t.Fatalf("read packet batch: %v", err) } deadline := time.Now().Add(2 * time.Second) for { ingress.mu.Lock() trafficClass := ingress.trafficClass sent := append([][]byte(nil), ingress.sent...) ingress.mu.Unlock() if trafficClass == "interactive" && len(sent) == 1 && string(sent[0]) == "packet" { break } if time.Now().After(deadline) { t.Fatalf("traffic class = %q sent packets = %#v, want interactive packet", trafficClass, sent) } time.Sleep(10 * time.Millisecond) } } func TestVPNPacketIngressWebSocketFallsBackToBackendRelay(t *testing.T) { var backendBody []byte postSeen := make(chan struct{}, 1) backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodPost: if r.URL.Path != "/api/v1/clusters/cluster-1/vpn-connections/vpn-1/tunnel/client/packets" || r.URL.Query().Get("batch") != "true" { t.Fatalf("backend post target = %s?%s", r.URL.Path, r.URL.RawQuery) } var err error backendBody, err = io.ReadAll(r.Body) if err != nil { t.Fatalf("read backend body: %v", err) } select { case postSeen <- struct{}{}: default: } w.WriteHeader(http.StatusAccepted) case http.MethodGet: if r.URL.Path != "/api/v1/clusters/cluster-1/vpn-connections/vpn-1/tunnel/client/packets" || r.URL.Query().Get("batch") != "true" { t.Fatalf("backend get target = %s?%s", r.URL.Path, r.URL.RawQuery) } w.Header().Set("Content-Type", "application/vnd.rap.vpn-packet-batch.v1") _, _ = w.Write(encodeVPNIngressPacketBatch([][]byte{[]byte("backend-reply")})) default: w.WriteHeader(http.StatusMethodNotAllowed) } })) defer backend.Close() server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, VPNPacketIngress: failingVPNPacketIngress{sendErr: ErrRouteNotFound, receiveErr: ErrRouteNotFound}, BackendProxyBaseURL: backend.URL + "/api/v1", }.Handler()) defer server.Close() wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/api/v1/clusters/cluster-1/vpn-connections/vpn-1/tunnel/client/packets/ws" conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) if err != nil { t.Fatalf("dial websocket: %v", err) } defer conn.Close() sentPayload := encodeVPNIngressPacketBatch([][]byte{[]byte("packet")}) if err := conn.WriteMessage(websocket.BinaryMessage, sentPayload); err != nil { t.Fatalf("write packet batch: %v", err) } if err := conn.SetReadDeadline(time.Now().Add(2 * time.Second)); err != nil { t.Fatalf("set read deadline: %v", err) } _, payload, err := conn.ReadMessage() if err != nil { t.Fatalf("read backend packet batch: %v", err) } packets, err := decodeVPNIngressPacketBatch(payload) if err != nil { t.Fatalf("decode backend batch: %v", err) } if len(packets) != 1 || string(packets[0]) != "backend-reply" { t.Fatalf("backend reply packets = %#v", packets) } select { case <-postSeen: case <-time.After(2 * time.Second): t.Fatal("backend POST was not observed") } if !bytes.Equal(backendBody, sentPayload) { t.Fatalf("backend body = %q want %q", string(backendBody), string(sentPayload)) } } func TestNewProductionVPNPacketBatchEnvelopeRoundTripsPayload(t *testing.T) { now := time.Now().UTC() envelope, err := NewProductionVPNPacketBatchEnvelope(ProductionVPNPacketEnvelopeInput{ MessageID: "vpn-message-builder", RouteID: "route-vpn-1", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", CurrentHopNodeID: "entry-1", NextHopNodeID: "exit-1", RoutePath: []string{"entry-1", "exit-1"}, VPNConnectionID: "vpn-1", Direction: "client_to_gateway", Packets: [][]byte{[]byte("packet-1"), []byte("packet-2")}, Now: now, }) if err != nil { t.Fatalf("new vpn packet envelope: %v", err) } if err := ValidateProductionEnvelope(PeerIdentity{ClusterID: "cluster-1", NodeID: "entry-1"}, envelope, now); err != nil { t.Fatalf("validate envelope: %v", err) } payload, err := DecodeProductionVPNPacketBatch(envelope) if err != nil { t.Fatalf("decode vpn packet batch: %v", err) } if payload.VPNConnectionID != "vpn-1" || payload.Direction != "client_to_gateway" || string(payload.Packets[1]) != "packet-2" { t.Fatalf("payload = %+v", payload) } } func TestNewProductionVPNPacketBatchEnvelopeRejectsEmptyPackets(t *testing.T) { _, err := NewProductionVPNPacketBatchEnvelope(ProductionVPNPacketEnvelopeInput{ MessageID: "vpn-message-builder", RouteID: "route-vpn-1", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", CurrentHopNodeID: "entry-1", NextHopNodeID: "exit-1", RoutePath: []string{"entry-1", "exit-1"}, VPNConnectionID: "vpn-1", Direction: "client_to_gateway", Packets: [][]byte{nil, []byte{}}, }) if err == nil { t.Fatal("expected empty packet batch to be rejected") } } func TestEncodeVPNIngressPacketBatchSkipsEmptyPackets(t *testing.T) { encoded := encodeVPNIngressPacketBatch([][]byte{nil, []byte("reply"), []byte{}}) decoded, err := decodeVPNIngressPacketBatch(encoded) if err != nil { t.Fatalf("decode batch: %v", err) } if len(decoded) != 1 || string(decoded[0]) != "reply" { t.Fatalf("decoded = %#v", decoded) } } type failingVPNPacketIngress struct { sendErr error receiveErr error } func (i failingVPNPacketIngress) SendClientPacketBatch(context.Context, string, string, [][]byte) error { return i.sendErr } func (i failingVPNPacketIngress) ReceiveClientPacketBatch(context.Context, string, string, time.Duration) ([][]byte, error) { return nil, i.receiveErr } type recordingVPNPacketIngress struct { mu sync.Mutex clusterID string vpnConnectionID string trafficClass string sent [][]byte receive [][]byte } func (i *recordingVPNPacketIngress) SendClientPacketBatch(_ context.Context, clusterID string, vpnConnectionID string, packets [][]byte) error { i.mu.Lock() defer i.mu.Unlock() i.clusterID = clusterID i.vpnConnectionID = vpnConnectionID i.sent = cleanVPNIngressPacketBatch(packets) return nil } func (i *recordingVPNPacketIngress) SendClientPacketBatchWithTrafficClass(_ context.Context, clusterID string, vpnConnectionID string, trafficClass string, packets [][]byte) error { i.mu.Lock() defer i.mu.Unlock() i.clusterID = clusterID i.vpnConnectionID = vpnConnectionID i.trafficClass = trafficClass i.sent = cleanVPNIngressPacketBatch(packets) return nil } func (i *recordingVPNPacketIngress) ReceiveClientPacketBatch(_ context.Context, clusterID string, vpnConnectionID string, _ time.Duration) ([][]byte, error) { i.mu.Lock() defer i.mu.Unlock() i.clusterID = clusterID i.vpnConnectionID = vpnConnectionID packets := i.receive i.receive = nil return packets, nil } func hasProductionForwardEvent(events []ProductionForwardLogEntry, event string) bool { for _, item := range events { if item.Event == event { return true } } return false } func TestSyntheticEndpointDisabledByDefault(t *testing.T) { server := httptest.NewServer(Server{Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}}.Handler()) defer server.Close() resp, err := http.Post(server.URL+"/mesh/v1/synthetic/probe", "application/json", bytes.NewReader([]byte(`{}`))) if err != nil { t.Fatalf("post synthetic probe: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusServiceUnavailable { t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusServiceUnavailable) } }