Add fabric session websocket client smoke API
This commit is contained in:
@@ -6,7 +6,12 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
@@ -14,6 +19,13 @@ type Client struct {
|
||||
HTTPClient *http.Client
|
||||
}
|
||||
|
||||
type FabricSessionDialOptions struct {
|
||||
Token string
|
||||
Header http.Header
|
||||
Dialer *websocket.Dialer
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
func NewClient(baseURL string) Client {
|
||||
return Client{
|
||||
BaseURL: baseURL,
|
||||
@@ -109,3 +121,88 @@ func (c Client) SendProduction(ctx context.Context, envelope ProductionEnvelope)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c Client) DialFabricSession(ctx context.Context, opts FabricSessionDialOptions) (*websocket.Conn, *http.Response, error) {
|
||||
target, err := c.fabricSessionWebSocketURL()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
header := cloneHeader(opts.Header)
|
||||
if strings.TrimSpace(opts.Token) != "" {
|
||||
header.Set("X-RAP-Fabric-Session-Token", strings.TrimSpace(opts.Token))
|
||||
}
|
||||
dialer := opts.Dialer
|
||||
if dialer == nil {
|
||||
base := *websocket.DefaultDialer
|
||||
if opts.Timeout > 0 {
|
||||
base.HandshakeTimeout = opts.Timeout
|
||||
}
|
||||
dialer = &base
|
||||
}
|
||||
return dialer.DialContext(ctx, target, header)
|
||||
}
|
||||
|
||||
func (c Client) SendFabricSessionFrame(ctx context.Context, opts FabricSessionDialOptions, frame fabricproto.Frame) (fabricproto.Frame, error) {
|
||||
conn, resp, err := c.DialFabricSession(ctx, opts)
|
||||
if err != nil {
|
||||
if resp != nil {
|
||||
return fabricproto.Frame{}, fmt.Errorf("fabric session websocket rejected with status %d: %w", resp.StatusCode, err)
|
||||
}
|
||||
return fabricproto.Frame{}, err
|
||||
}
|
||||
defer conn.Close()
|
||||
payload, err := fabricproto.MarshalFrame(frame)
|
||||
if err != nil {
|
||||
return fabricproto.Frame{}, err
|
||||
}
|
||||
if err := conn.WriteMessage(websocket.BinaryMessage, payload); err != nil {
|
||||
return fabricproto.Frame{}, err
|
||||
}
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
_ = conn.SetReadDeadline(deadline)
|
||||
} else if opts.Timeout > 0 {
|
||||
_ = conn.SetReadDeadline(time.Now().Add(opts.Timeout))
|
||||
}
|
||||
messageType, responsePayload, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
return fabricproto.Frame{}, err
|
||||
}
|
||||
if messageType != websocket.BinaryMessage {
|
||||
return fabricproto.Frame{}, fmt.Errorf("fabric session websocket returned non-binary message type %d", messageType)
|
||||
}
|
||||
return fabricproto.UnmarshalFrame(responsePayload, fabricproto.DefaultMaxPayload)
|
||||
}
|
||||
|
||||
func (c Client) fabricSessionWebSocketURL() (string, error) {
|
||||
base := strings.TrimSpace(c.BaseURL)
|
||||
if base == "" {
|
||||
return "", fmt.Errorf("mesh base url is required")
|
||||
}
|
||||
parsed, err := url.Parse(base)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
switch parsed.Scheme {
|
||||
case "http":
|
||||
parsed.Scheme = "ws"
|
||||
case "https":
|
||||
parsed.Scheme = "wss"
|
||||
case "ws", "wss":
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported mesh base url scheme %q", parsed.Scheme)
|
||||
}
|
||||
parsed.Path = strings.TrimRight(parsed.Path, "/") + "/mesh/v1/fabric/session/ws"
|
||||
parsed.RawQuery = ""
|
||||
parsed.Fragment = ""
|
||||
return parsed.String(), nil
|
||||
}
|
||||
|
||||
func cloneHeader(header http.Header) http.Header {
|
||||
out := http.Header{}
|
||||
for key, values := range header {
|
||||
for _, value := range values {
|
||||
out.Add(key, value)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -0,0 +1,73 @@
|
||||
package mesh
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
|
||||
)
|
||||
|
||||
func TestClientFabricSessionFrameRoundTrip(t *testing.T) {
|
||||
server := httptest.NewServer(Server{
|
||||
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"},
|
||||
FabricSessionEnabled: true,
|
||||
}.Handler())
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient(server.URL)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
response, err := client.SendFabricSessionFrame(ctx, FabricSessionDialOptions{
|
||||
Token: "rap_fsn_clienttest",
|
||||
Timeout: time.Second,
|
||||
}, fabricproto.Frame{
|
||||
Type: fabricproto.FramePing,
|
||||
Sequence: 12,
|
||||
Payload: []byte("probe"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("send fabric session frame: %v", err)
|
||||
}
|
||||
if response.Type != fabricproto.FramePong || response.Sequence != 12 || string(response.Payload) != "probe" {
|
||||
t.Fatalf("response = %+v, want pong seq 12", response)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientFabricSessionReportsRejectedStatus(t *testing.T) {
|
||||
server := httptest.NewServer(Server{
|
||||
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"},
|
||||
FabricSessionEnabled: true,
|
||||
}.Handler())
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient(server.URL)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
_, err := client.SendFabricSessionFrame(ctx, FabricSessionDialOptions{}, fabricproto.Frame{Type: fabricproto.FramePing})
|
||||
if err == nil {
|
||||
t.Fatal("send fabric session without token unexpectedly succeeded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientFabricSessionWebSocketURL(t *testing.T) {
|
||||
cases := []struct {
|
||||
base string
|
||||
want string
|
||||
}{
|
||||
{base: "http://node.example", want: "ws://node.example/mesh/v1/fabric/session/ws"},
|
||||
{base: "https://node.example/base/", want: "wss://node.example/base/mesh/v1/fabric/session/ws"},
|
||||
{base: "ws://node.example", want: "ws://node.example/mesh/v1/fabric/session/ws"},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
client := NewClient(tc.base)
|
||||
got, err := client.fabricSessionWebSocketURL()
|
||||
if err != nil {
|
||||
t.Fatalf("fabricSessionWebSocketURL(%q): %v", tc.base, err)
|
||||
}
|
||||
if got != tc.want {
|
||||
t.Fatalf("fabricSessionWebSocketURL(%q) = %q, want %q", tc.base, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -258,7 +258,7 @@ Deliverables:
|
||||
|
||||
Status: started with a transport-neutral `io.Reader`/`io.Writer` frame loop,
|
||||
WebSocket frame adapter in `agents/rap-node-agent/internal/fabricproto`, and a
|
||||
gated/authenticated mesh smoke endpoint at `/mesh/v1/fabric/session/ws`.
|
||||
gated/authenticated mesh smoke endpoint/client at `/mesh/v1/fabric/session/ws`.
|
||||
|
||||
Deliverables:
|
||||
|
||||
|
||||
Reference in New Issue
Block a user