From 4881b5e7028f46a4ab2ba57f28ded715886ae519 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 00:22:01 +0300 Subject: [PATCH] Add fabric session websocket client smoke API --- agents/rap-node-agent/internal/mesh/client.go | 97 +++++++++++++++++++ .../internal/mesh/client_test.go | 73 ++++++++++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 2 +- 3 files changed, 171 insertions(+), 1 deletion(-) create mode 100644 agents/rap-node-agent/internal/mesh/client_test.go diff --git a/agents/rap-node-agent/internal/mesh/client.go b/agents/rap-node-agent/internal/mesh/client.go index b389c61..a0598ee 100644 --- a/agents/rap-node-agent/internal/mesh/client.go +++ b/agents/rap-node-agent/internal/mesh/client.go @@ -6,7 +6,12 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" + "strings" "time" + + "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" + "github.com/gorilla/websocket" ) type Client struct { @@ -14,6 +19,13 @@ type Client struct { HTTPClient *http.Client } +type FabricSessionDialOptions struct { + Token string + Header http.Header + Dialer *websocket.Dialer + Timeout time.Duration +} + func NewClient(baseURL string) Client { return Client{ BaseURL: baseURL, @@ -109,3 +121,88 @@ func (c Client) SendProduction(ctx context.Context, envelope ProductionEnvelope) } return result, nil } + +func (c Client) DialFabricSession(ctx context.Context, opts FabricSessionDialOptions) (*websocket.Conn, *http.Response, error) { + target, err := c.fabricSessionWebSocketURL() + if err != nil { + return nil, nil, err + } + header := cloneHeader(opts.Header) + if strings.TrimSpace(opts.Token) != "" { + header.Set("X-RAP-Fabric-Session-Token", strings.TrimSpace(opts.Token)) + } + dialer := opts.Dialer + if dialer == nil { + base := *websocket.DefaultDialer + if opts.Timeout > 0 { + base.HandshakeTimeout = opts.Timeout + } + dialer = &base + } + return dialer.DialContext(ctx, target, header) +} + +func (c Client) SendFabricSessionFrame(ctx context.Context, opts FabricSessionDialOptions, frame fabricproto.Frame) (fabricproto.Frame, error) { + conn, resp, err := c.DialFabricSession(ctx, opts) + if err != nil { + if resp != nil { + return fabricproto.Frame{}, fmt.Errorf("fabric session websocket rejected with status %d: %w", resp.StatusCode, err) + } + return fabricproto.Frame{}, err + } + defer conn.Close() + payload, err := fabricproto.MarshalFrame(frame) + if err != nil { + return fabricproto.Frame{}, err + } + if err := conn.WriteMessage(websocket.BinaryMessage, payload); err != nil { + return fabricproto.Frame{}, err + } + if deadline, ok := ctx.Deadline(); ok { + _ = conn.SetReadDeadline(deadline) + } else if opts.Timeout > 0 { + _ = conn.SetReadDeadline(time.Now().Add(opts.Timeout)) + } + messageType, responsePayload, err := conn.ReadMessage() + if err != nil { + return fabricproto.Frame{}, err + } + if messageType != websocket.BinaryMessage { + return fabricproto.Frame{}, fmt.Errorf("fabric session websocket returned non-binary message type %d", messageType) + } + return fabricproto.UnmarshalFrame(responsePayload, fabricproto.DefaultMaxPayload) +} + +func (c Client) fabricSessionWebSocketURL() (string, error) { + base := strings.TrimSpace(c.BaseURL) + if base == "" { + return "", fmt.Errorf("mesh base url is required") + } + parsed, err := url.Parse(base) + if err != nil { + return "", err + } + switch parsed.Scheme { + case "http": + parsed.Scheme = "ws" + case "https": + parsed.Scheme = "wss" + case "ws", "wss": + default: + return "", fmt.Errorf("unsupported mesh base url scheme %q", parsed.Scheme) + } + parsed.Path = strings.TrimRight(parsed.Path, "/") + "/mesh/v1/fabric/session/ws" + parsed.RawQuery = "" + parsed.Fragment = "" + return parsed.String(), nil +} + +func cloneHeader(header http.Header) http.Header { + out := http.Header{} + for key, values := range header { + for _, value := range values { + out.Add(key, value) + } + } + return out +} diff --git a/agents/rap-node-agent/internal/mesh/client_test.go b/agents/rap-node-agent/internal/mesh/client_test.go new file mode 100644 index 0000000..ecf5784 --- /dev/null +++ b/agents/rap-node-agent/internal/mesh/client_test.go @@ -0,0 +1,73 @@ +package mesh + +import ( + "context" + "net/http/httptest" + "testing" + "time" + + "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" +) + +func TestClientFabricSessionFrameRoundTrip(t *testing.T) { + server := httptest.NewServer(Server{ + Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, + FabricSessionEnabled: true, + }.Handler()) + defer server.Close() + + client := NewClient(server.URL) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + response, err := client.SendFabricSessionFrame(ctx, FabricSessionDialOptions{ + Token: "rap_fsn_clienttest", + Timeout: time.Second, + }, fabricproto.Frame{ + Type: fabricproto.FramePing, + Sequence: 12, + Payload: []byte("probe"), + }) + if err != nil { + t.Fatalf("send fabric session frame: %v", err) + } + if response.Type != fabricproto.FramePong || response.Sequence != 12 || string(response.Payload) != "probe" { + t.Fatalf("response = %+v, want pong seq 12", response) + } +} + +func TestClientFabricSessionReportsRejectedStatus(t *testing.T) { + server := httptest.NewServer(Server{ + Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, + FabricSessionEnabled: true, + }.Handler()) + defer server.Close() + + client := NewClient(server.URL) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _, err := client.SendFabricSessionFrame(ctx, FabricSessionDialOptions{}, fabricproto.Frame{Type: fabricproto.FramePing}) + if err == nil { + t.Fatal("send fabric session without token unexpectedly succeeded") + } +} + +func TestClientFabricSessionWebSocketURL(t *testing.T) { + cases := []struct { + base string + want string + }{ + {base: "http://node.example", want: "ws://node.example/mesh/v1/fabric/session/ws"}, + {base: "https://node.example/base/", want: "wss://node.example/base/mesh/v1/fabric/session/ws"}, + {base: "ws://node.example", want: "ws://node.example/mesh/v1/fabric/session/ws"}, + } + for _, tc := range cases { + client := NewClient(tc.base) + got, err := client.fabricSessionWebSocketURL() + if err != nil { + t.Fatalf("fabricSessionWebSocketURL(%q): %v", tc.base, err) + } + if got != tc.want { + t.Fatalf("fabricSessionWebSocketURL(%q) = %q, want %q", tc.base, got, tc.want) + } + } +} diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 0158477..7d9b5c7 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -258,7 +258,7 @@ Deliverables: Status: started with a transport-neutral `io.Reader`/`io.Writer` frame loop, WebSocket frame adapter in `agents/rap-node-agent/internal/fabricproto`, and a -gated/authenticated mesh smoke endpoint at `/mesh/v1/fabric/session/ws`. +gated/authenticated mesh smoke endpoint/client at `/mesh/v1/fabric/session/ws`. Deliverables: