diff --git a/agents/rap-node-agent/cmd/mesh-live-smoke/main.go b/agents/rap-node-agent/cmd/mesh-live-smoke/main.go index 76a84aa..2b8ca32 100644 --- a/agents/rap-node-agent/cmd/mesh-live-smoke/main.go +++ b/agents/rap-node-agent/cmd/mesh-live-smoke/main.go @@ -22,19 +22,20 @@ type smokeNode struct { } type smokeReport struct { - Stage string `json:"stage"` - ProductionForwarding bool `json:"production_forwarding"` - ScopedConfigLoaded bool `json:"scoped_config_loaded"` - DirectProbeAccepted bool `json:"direct_probe_accepted"` - DirectPath []string `json:"direct_path"` - RelayProbeAccepted bool `json:"relay_probe_accepted"` - RelayPath []string `json:"relay_path"` - TestServiceAccepted bool `json:"test_service_accepted"` - TestServiceEchoPayload string `json:"test_service_echo_payload"` - FabricSessionAccepted bool `json:"fabric_session_accepted"` - FabricSessionLatencyMS int64 `json:"fabric_session_latency_ms"` - FabricSessionEndpoint string `json:"fabric_session_endpoint"` - PeerEndpoints map[string]any `json:"peer_endpoints"` + Stage string `json:"stage"` + ProductionForwarding bool `json:"production_forwarding"` + ScopedConfigLoaded bool `json:"scoped_config_loaded"` + DirectProbeAccepted bool `json:"direct_probe_accepted"` + DirectPath []string `json:"direct_path"` + RelayProbeAccepted bool `json:"relay_probe_accepted"` + RelayPath []string `json:"relay_path"` + TestServiceAccepted bool `json:"test_service_accepted"` + TestServiceEchoPayload string `json:"test_service_echo_payload"` + FabricSessionAccepted bool `json:"fabric_session_accepted"` + FabricSessionRoundTrips int `json:"fabric_session_round_trips"` + FabricSessionLatencyMS int64 `json:"fabric_session_latency_ms"` + FabricSessionEndpoint string `json:"fabric_session_endpoint"` + PeerEndpoints map[string]any `json:"peer_endpoints"` } func main() { @@ -99,32 +100,50 @@ func run(ctx context.Context) (smokeReport, error) { return smokeReport{}, fmt.Errorf("test service: %w", err) } fabricSessionStartedAt := time.Now() - fabricSessionResponse, err := mesh.NewClient(nodeB.URL).SendFabricSessionFrame(ctx, mesh.FabricSessionDialOptions{ + fabricSession, _, err := mesh.NewClient(nodeB.URL).OpenFabricSession(ctx, mesh.FabricSessionDialOptions{ Token: "rap_fsn_mesh_live_smoke", Timeout: 3 * time.Second, - }, fabricproto.Frame{ + }) + if err != nil { + return smokeReport{}, fmt.Errorf("fabric session open: %w", err) + } + defer fabricSession.Close() + firstFabricSessionResponse, err := fabricSession.RoundTrip(ctx, fabricproto.Frame{ Type: fabricproto.FramePing, Sequence: uint64(fabricSessionStartedAt.UnixNano()), Payload: []byte("mesh-live-smoke-fabric-session"), }) if err != nil { - return smokeReport{}, fmt.Errorf("fabric session smoke: %w", err) + return smokeReport{}, fmt.Errorf("fabric session first round trip: %w", err) + } + secondFabricSessionResponse, err := fabricSession.RoundTrip(ctx, fabricproto.Frame{ + Type: fabricproto.FramePing, + Sequence: uint64(fabricSessionStartedAt.UnixNano()) + 1, + Payload: []byte("mesh-live-smoke-fabric-session-2"), + }) + if err != nil { + return smokeReport{}, fmt.Errorf("fabric session second round trip: %w", err) } fabricSessionLatency := time.Since(fabricSessionStartedAt) + fabricSessionAccepted := firstFabricSessionResponse.Type == fabricproto.FramePong && + string(firstFabricSessionResponse.Payload) == "mesh-live-smoke-fabric-session" && + secondFabricSessionResponse.Type == fabricproto.FramePong && + string(secondFabricSessionResponse.Payload) == "mesh-live-smoke-fabric-session-2" return smokeReport{ - Stage: "C17F scoped synthetic config plus live HTTP transport", - ProductionForwarding: false, - ScopedConfigLoaded: nodeAConfig.ConfigVersion == "smoke-config-v1", - DirectProbeAccepted: directAck.MessageType == mesh.SyntheticMessageProbeAck, - DirectPath: decodeProbePath(directAck), - RelayProbeAccepted: relayAck.MessageType == mesh.SyntheticMessageProbeAck, - RelayPath: decodeProbePath(relayAck), - TestServiceAccepted: testService.Ack.MessageType == mesh.SyntheticMessageTestServiceAck, - TestServiceEchoPayload: testService.Response.EchoPayload, - FabricSessionAccepted: fabricSessionResponse.Type == fabricproto.FramePong && string(fabricSessionResponse.Payload) == "mesh-live-smoke-fabric-session", - FabricSessionLatencyMS: fabricSessionLatency.Milliseconds(), - FabricSessionEndpoint: nodeB.URL + "/mesh/v1/fabric/session/ws", + Stage: "C17F scoped synthetic config plus live HTTP transport", + ProductionForwarding: false, + ScopedConfigLoaded: nodeAConfig.ConfigVersion == "smoke-config-v1", + DirectProbeAccepted: directAck.MessageType == mesh.SyntheticMessageProbeAck, + DirectPath: decodeProbePath(directAck), + RelayProbeAccepted: relayAck.MessageType == mesh.SyntheticMessageProbeAck, + RelayPath: decodeProbePath(relayAck), + TestServiceAccepted: testService.Ack.MessageType == mesh.SyntheticMessageTestServiceAck, + TestServiceEchoPayload: testService.Response.EchoPayload, + FabricSessionAccepted: fabricSessionAccepted, + FabricSessionRoundTrips: 2, + FabricSessionLatencyMS: fabricSessionLatency.Milliseconds(), + FabricSessionEndpoint: nodeB.URL + "/mesh/v1/fabric/session/ws", PeerEndpoints: map[string]any{ "node-a": nodeA.URL, "node-r": nodeR.URL, diff --git a/agents/rap-node-agent/internal/mesh/client.go b/agents/rap-node-agent/internal/mesh/client.go index a0598ee..fadbcc9 100644 --- a/agents/rap-node-agent/internal/mesh/client.go +++ b/agents/rap-node-agent/internal/mesh/client.go @@ -20,10 +20,17 @@ type Client struct { } type FabricSessionDialOptions struct { - Token string - Header http.Header - Dialer *websocket.Dialer - Timeout time.Duration + Token string + Header http.Header + Dialer *websocket.Dialer + Timeout time.Duration + MaxPayload int +} + +type FabricSessionClient struct { + conn *websocket.Conn + timeout time.Duration + maxPayload int } func NewClient(baseURL string) Client { @@ -142,35 +149,89 @@ func (c Client) DialFabricSession(ctx context.Context, opts FabricSessionDialOpt return dialer.DialContext(ctx, target, header) } -func (c Client) SendFabricSessionFrame(ctx context.Context, opts FabricSessionDialOptions, frame fabricproto.Frame) (fabricproto.Frame, error) { +func (c Client) OpenFabricSession(ctx context.Context, opts FabricSessionDialOptions) (*FabricSessionClient, *http.Response, error) { conn, resp, err := c.DialFabricSession(ctx, opts) if err != nil { if resp != nil { - return fabricproto.Frame{}, fmt.Errorf("fabric session websocket rejected with status %d: %w", resp.StatusCode, err) + return nil, resp, fmt.Errorf("fabric session websocket rejected with status %d: %w", resp.StatusCode, err) } - return fabricproto.Frame{}, err + return nil, resp, err } - defer conn.Close() - payload, err := fabricproto.MarshalFrame(frame) + maxPayload := opts.MaxPayload + if maxPayload <= 0 { + maxPayload = fabricproto.DefaultMaxPayload + } + return &FabricSessionClient{ + conn: conn, + timeout: opts.Timeout, + maxPayload: maxPayload, + }, resp, nil +} + +func (c Client) SendFabricSessionFrame(ctx context.Context, opts FabricSessionDialOptions, frame fabricproto.Frame) (fabricproto.Frame, error) { + session, _, err := c.OpenFabricSession(ctx, opts) if err != nil { return fabricproto.Frame{}, err } - if err := conn.WriteMessage(websocket.BinaryMessage, payload); err != nil { - return fabricproto.Frame{}, err + defer session.Close() + return session.RoundTrip(ctx, frame) +} + +func (c *FabricSessionClient) Close() error { + if c == nil || c.conn == nil { + return nil } - if deadline, ok := ctx.Deadline(); ok { - _ = conn.SetReadDeadline(deadline) - } else if opts.Timeout > 0 { - _ = conn.SetReadDeadline(time.Now().Add(opts.Timeout)) + return c.conn.Close() +} + +func (c *FabricSessionClient) WriteFrame(ctx context.Context, frame fabricproto.Frame) error { + if c == nil || c.conn == nil { + return fmt.Errorf("fabric session client is closed") } - messageType, responsePayload, err := conn.ReadMessage() + payload, err := fabricproto.MarshalFrame(frame) + if err != nil { + return err + } + c.applyWriteDeadline(ctx) + return c.conn.WriteMessage(websocket.BinaryMessage, payload) +} + +func (c *FabricSessionClient) ReadFrame(ctx context.Context) (fabricproto.Frame, error) { + if c == nil || c.conn == nil { + return fabricproto.Frame{}, fmt.Errorf("fabric session client is closed") + } + c.applyReadDeadline(ctx) + messageType, responsePayload, err := c.conn.ReadMessage() if err != nil { return fabricproto.Frame{}, err } if messageType != websocket.BinaryMessage { return fabricproto.Frame{}, fmt.Errorf("fabric session websocket returned non-binary message type %d", messageType) } - return fabricproto.UnmarshalFrame(responsePayload, fabricproto.DefaultMaxPayload) + return fabricproto.UnmarshalFrame(responsePayload, c.maxPayload) +} + +func (c *FabricSessionClient) RoundTrip(ctx context.Context, frame fabricproto.Frame) (fabricproto.Frame, error) { + if err := c.WriteFrame(ctx, frame); err != nil { + return fabricproto.Frame{}, err + } + return c.ReadFrame(ctx) +} + +func (c *FabricSessionClient) applyReadDeadline(ctx context.Context) { + if deadline, ok := ctx.Deadline(); ok { + _ = c.conn.SetReadDeadline(deadline) + } else if c.timeout > 0 { + _ = c.conn.SetReadDeadline(time.Now().Add(c.timeout)) + } +} + +func (c *FabricSessionClient) applyWriteDeadline(ctx context.Context) { + if deadline, ok := ctx.Deadline(); ok { + _ = c.conn.SetWriteDeadline(deadline) + } else if c.timeout > 0 { + _ = c.conn.SetWriteDeadline(time.Now().Add(c.timeout)) + } } func (c Client) fabricSessionWebSocketURL() (string, error) { diff --git a/agents/rap-node-agent/internal/mesh/client_test.go b/agents/rap-node-agent/internal/mesh/client_test.go index ecf5784..d7855d8 100644 --- a/agents/rap-node-agent/internal/mesh/client_test.go +++ b/agents/rap-node-agent/internal/mesh/client_test.go @@ -35,6 +35,104 @@ func TestClientFabricSessionFrameRoundTrip(t *testing.T) { } } +func TestClientFabricSessionPersistentRoundTrips(t *testing.T) { + server := httptest.NewServer(Server{ + Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, + FabricSessionEnabled: true, + }.Handler()) + defer server.Close() + + client := NewClient(server.URL) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + session, _, err := client.OpenFabricSession(ctx, FabricSessionDialOptions{ + Token: "rap_fsn_persistent", + Timeout: time.Second, + }) + if err != nil { + t.Fatalf("open fabric session: %v", err) + } + defer session.Close() + + first, err := session.RoundTrip(ctx, fabricproto.Frame{ + Type: fabricproto.FramePing, + Sequence: 1, + Payload: []byte("first"), + }) + if err != nil { + t.Fatalf("first round trip: %v", err) + } + second, err := session.RoundTrip(ctx, fabricproto.Frame{ + Type: fabricproto.FramePing, + Sequence: 2, + Payload: []byte("second"), + }) + if err != nil { + t.Fatalf("second round trip: %v", err) + } + if first.Type != fabricproto.FramePong || first.Sequence != 1 || string(first.Payload) != "first" { + t.Fatalf("first response = %+v, want pong seq 1", first) + } + if second.Type != fabricproto.FramePong || second.Sequence != 2 || string(second.Payload) != "second" { + t.Fatalf("second response = %+v, want pong seq 2", second) + } +} + +func TestClientFabricSessionPersistentDataAcks(t *testing.T) { + server := httptest.NewServer(Server{ + Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, + FabricSessionEnabled: true, + }.Handler()) + defer server.Close() + + client := NewClient(server.URL) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + session, _, err := client.OpenFabricSession(ctx, FabricSessionDialOptions{ + Token: "rap_fsn_dataacks", + Timeout: time.Second, + }) + if err != nil { + t.Fatalf("open fabric session: %v", err) + } + defer session.Close() + + if err := session.WriteFrame(ctx, fabricproto.Frame{ + Type: fabricproto.FrameOpenStream, + StreamID: 77, + TrafficClass: fabricproto.TrafficClassInteractive, + }); err != nil { + t.Fatalf("open stream frame: %v", err) + } + + first, err := session.RoundTrip(ctx, fabricproto.Frame{ + Type: fabricproto.FrameData, + StreamID: 77, + Sequence: 10, + TrafficClass: fabricproto.TrafficClassInteractive, + Payload: []byte("first payload"), + }) + if err != nil { + t.Fatalf("first data round trip: %v", err) + } + second, err := session.RoundTrip(ctx, fabricproto.Frame{ + Type: fabricproto.FrameData, + StreamID: 77, + Sequence: 11, + TrafficClass: fabricproto.TrafficClassInteractive, + Payload: []byte("second payload"), + }) + if err != nil { + t.Fatalf("second data round trip: %v", err) + } + if first.Type != fabricproto.FrameAck || first.StreamID != 77 || first.Sequence != 10 { + t.Fatalf("first ack = %+v, want stream 77 seq 10", first) + } + if second.Type != fabricproto.FrameAck || second.StreamID != 77 || second.Sequence != 11 { + t.Fatalf("second ack = %+v, want stream 77 seq 11", second) + } +} + func TestClientFabricSessionReportsRejectedStatus(t *testing.T) { server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 3d3d3cf..aa32e0a 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -266,7 +266,9 @@ Node-agent exposes the endpoint only when `RAP_MESH_FABRIC_SESSION_ENABLED` / `-mesh-fabric-session-enabled` is set, and reports the enabled endpoint in heartbeat metadata. `mesh-live-smoke` includes a fabric-session `PING`/`PONG` check alongside the -existing route and test-service probes. +existing route and test-service probes. Mesh client code now has a reusable +`FabricSessionClient` for multiple frame exchanges over one WebSocket session, +and live smoke verifies two `PING`/`PONG` round trips on the same connection. Deliverables: