Add persistent fabric session client
This commit is contained in:
@@ -20,10 +20,17 @@ type Client struct {
|
||||
}
|
||||
|
||||
type FabricSessionDialOptions struct {
|
||||
Token string
|
||||
Header http.Header
|
||||
Dialer *websocket.Dialer
|
||||
Timeout time.Duration
|
||||
Token string
|
||||
Header http.Header
|
||||
Dialer *websocket.Dialer
|
||||
Timeout time.Duration
|
||||
MaxPayload int
|
||||
}
|
||||
|
||||
type FabricSessionClient struct {
|
||||
conn *websocket.Conn
|
||||
timeout time.Duration
|
||||
maxPayload int
|
||||
}
|
||||
|
||||
func NewClient(baseURL string) Client {
|
||||
@@ -142,35 +149,89 @@ func (c Client) DialFabricSession(ctx context.Context, opts FabricSessionDialOpt
|
||||
return dialer.DialContext(ctx, target, header)
|
||||
}
|
||||
|
||||
func (c Client) SendFabricSessionFrame(ctx context.Context, opts FabricSessionDialOptions, frame fabricproto.Frame) (fabricproto.Frame, error) {
|
||||
func (c Client) OpenFabricSession(ctx context.Context, opts FabricSessionDialOptions) (*FabricSessionClient, *http.Response, error) {
|
||||
conn, resp, err := c.DialFabricSession(ctx, opts)
|
||||
if err != nil {
|
||||
if resp != nil {
|
||||
return fabricproto.Frame{}, fmt.Errorf("fabric session websocket rejected with status %d: %w", resp.StatusCode, err)
|
||||
return nil, resp, fmt.Errorf("fabric session websocket rejected with status %d: %w", resp.StatusCode, err)
|
||||
}
|
||||
return fabricproto.Frame{}, err
|
||||
return nil, resp, err
|
||||
}
|
||||
defer conn.Close()
|
||||
payload, err := fabricproto.MarshalFrame(frame)
|
||||
maxPayload := opts.MaxPayload
|
||||
if maxPayload <= 0 {
|
||||
maxPayload = fabricproto.DefaultMaxPayload
|
||||
}
|
||||
return &FabricSessionClient{
|
||||
conn: conn,
|
||||
timeout: opts.Timeout,
|
||||
maxPayload: maxPayload,
|
||||
}, resp, nil
|
||||
}
|
||||
|
||||
func (c Client) SendFabricSessionFrame(ctx context.Context, opts FabricSessionDialOptions, frame fabricproto.Frame) (fabricproto.Frame, error) {
|
||||
session, _, err := c.OpenFabricSession(ctx, opts)
|
||||
if err != nil {
|
||||
return fabricproto.Frame{}, err
|
||||
}
|
||||
if err := conn.WriteMessage(websocket.BinaryMessage, payload); err != nil {
|
||||
return fabricproto.Frame{}, err
|
||||
defer session.Close()
|
||||
return session.RoundTrip(ctx, frame)
|
||||
}
|
||||
|
||||
func (c *FabricSessionClient) Close() error {
|
||||
if c == nil || c.conn == nil {
|
||||
return nil
|
||||
}
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
_ = conn.SetReadDeadline(deadline)
|
||||
} else if opts.Timeout > 0 {
|
||||
_ = conn.SetReadDeadline(time.Now().Add(opts.Timeout))
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
func (c *FabricSessionClient) WriteFrame(ctx context.Context, frame fabricproto.Frame) error {
|
||||
if c == nil || c.conn == nil {
|
||||
return fmt.Errorf("fabric session client is closed")
|
||||
}
|
||||
messageType, responsePayload, err := conn.ReadMessage()
|
||||
payload, err := fabricproto.MarshalFrame(frame)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.applyWriteDeadline(ctx)
|
||||
return c.conn.WriteMessage(websocket.BinaryMessage, payload)
|
||||
}
|
||||
|
||||
func (c *FabricSessionClient) ReadFrame(ctx context.Context) (fabricproto.Frame, error) {
|
||||
if c == nil || c.conn == nil {
|
||||
return fabricproto.Frame{}, fmt.Errorf("fabric session client is closed")
|
||||
}
|
||||
c.applyReadDeadline(ctx)
|
||||
messageType, responsePayload, err := c.conn.ReadMessage()
|
||||
if err != nil {
|
||||
return fabricproto.Frame{}, err
|
||||
}
|
||||
if messageType != websocket.BinaryMessage {
|
||||
return fabricproto.Frame{}, fmt.Errorf("fabric session websocket returned non-binary message type %d", messageType)
|
||||
}
|
||||
return fabricproto.UnmarshalFrame(responsePayload, fabricproto.DefaultMaxPayload)
|
||||
return fabricproto.UnmarshalFrame(responsePayload, c.maxPayload)
|
||||
}
|
||||
|
||||
func (c *FabricSessionClient) RoundTrip(ctx context.Context, frame fabricproto.Frame) (fabricproto.Frame, error) {
|
||||
if err := c.WriteFrame(ctx, frame); err != nil {
|
||||
return fabricproto.Frame{}, err
|
||||
}
|
||||
return c.ReadFrame(ctx)
|
||||
}
|
||||
|
||||
func (c *FabricSessionClient) applyReadDeadline(ctx context.Context) {
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
_ = c.conn.SetReadDeadline(deadline)
|
||||
} else if c.timeout > 0 {
|
||||
_ = c.conn.SetReadDeadline(time.Now().Add(c.timeout))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *FabricSessionClient) applyWriteDeadline(ctx context.Context) {
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
_ = c.conn.SetWriteDeadline(deadline)
|
||||
} else if c.timeout > 0 {
|
||||
_ = c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
|
||||
}
|
||||
}
|
||||
|
||||
func (c Client) fabricSessionWebSocketURL() (string, error) {
|
||||
|
||||
@@ -35,6 +35,104 @@ func TestClientFabricSessionFrameRoundTrip(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientFabricSessionPersistentRoundTrips(t *testing.T) {
|
||||
server := httptest.NewServer(Server{
|
||||
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"},
|
||||
FabricSessionEnabled: true,
|
||||
}.Handler())
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient(server.URL)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
session, _, err := client.OpenFabricSession(ctx, FabricSessionDialOptions{
|
||||
Token: "rap_fsn_persistent",
|
||||
Timeout: time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("open fabric session: %v", err)
|
||||
}
|
||||
defer session.Close()
|
||||
|
||||
first, err := session.RoundTrip(ctx, fabricproto.Frame{
|
||||
Type: fabricproto.FramePing,
|
||||
Sequence: 1,
|
||||
Payload: []byte("first"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("first round trip: %v", err)
|
||||
}
|
||||
second, err := session.RoundTrip(ctx, fabricproto.Frame{
|
||||
Type: fabricproto.FramePing,
|
||||
Sequence: 2,
|
||||
Payload: []byte("second"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("second round trip: %v", err)
|
||||
}
|
||||
if first.Type != fabricproto.FramePong || first.Sequence != 1 || string(first.Payload) != "first" {
|
||||
t.Fatalf("first response = %+v, want pong seq 1", first)
|
||||
}
|
||||
if second.Type != fabricproto.FramePong || second.Sequence != 2 || string(second.Payload) != "second" {
|
||||
t.Fatalf("second response = %+v, want pong seq 2", second)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientFabricSessionPersistentDataAcks(t *testing.T) {
|
||||
server := httptest.NewServer(Server{
|
||||
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"},
|
||||
FabricSessionEnabled: true,
|
||||
}.Handler())
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient(server.URL)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
session, _, err := client.OpenFabricSession(ctx, FabricSessionDialOptions{
|
||||
Token: "rap_fsn_dataacks",
|
||||
Timeout: time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("open fabric session: %v", err)
|
||||
}
|
||||
defer session.Close()
|
||||
|
||||
if err := session.WriteFrame(ctx, fabricproto.Frame{
|
||||
Type: fabricproto.FrameOpenStream,
|
||||
StreamID: 77,
|
||||
TrafficClass: fabricproto.TrafficClassInteractive,
|
||||
}); err != nil {
|
||||
t.Fatalf("open stream frame: %v", err)
|
||||
}
|
||||
|
||||
first, err := session.RoundTrip(ctx, fabricproto.Frame{
|
||||
Type: fabricproto.FrameData,
|
||||
StreamID: 77,
|
||||
Sequence: 10,
|
||||
TrafficClass: fabricproto.TrafficClassInteractive,
|
||||
Payload: []byte("first payload"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("first data round trip: %v", err)
|
||||
}
|
||||
second, err := session.RoundTrip(ctx, fabricproto.Frame{
|
||||
Type: fabricproto.FrameData,
|
||||
StreamID: 77,
|
||||
Sequence: 11,
|
||||
TrafficClass: fabricproto.TrafficClassInteractive,
|
||||
Payload: []byte("second payload"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("second data round trip: %v", err)
|
||||
}
|
||||
if first.Type != fabricproto.FrameAck || first.StreamID != 77 || first.Sequence != 10 {
|
||||
t.Fatalf("first ack = %+v, want stream 77 seq 10", first)
|
||||
}
|
||||
if second.Type != fabricproto.FrameAck || second.StreamID != 77 || second.Sequence != 11 {
|
||||
t.Fatalf("second ack = %+v, want stream 77 seq 11", second)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientFabricSessionReportsRejectedStatus(t *testing.T) {
|
||||
server := httptest.NewServer(Server{
|
||||
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"},
|
||||
|
||||
Reference in New Issue
Block a user