Handle fabric session protocol frames

This commit is contained in:
2026-05-16 00:07:57 +03:00
parent c711bdc67c
commit 6ac41052e6
4 changed files with 311 additions and 1 deletions
@@ -56,6 +56,7 @@ type SessionMetrics struct {
StreamsReset uint64
FramesEnqueued uint64
FramesDequeued uint64
FramesReceived uint64
FramesAcked uint64
FramesDropped uint64
CreditExhausted uint64
@@ -73,6 +74,7 @@ type StreamMetrics struct {
QueueDepth int
Enqueued uint64
Dequeued uint64
Received uint64
Acked uint64
Dropped uint64
CreditBlocked uint64
@@ -0,0 +1,135 @@
package fabricproto
import "errors"
var (
ErrUnsupportedSessionFrame = errors.New("unsupported fabric session frame")
ErrTrafficClassMismatch = errors.New("fabric stream traffic class mismatch")
)
type SessionEventType string
const (
SessionEventNone SessionEventType = ""
SessionEventHello SessionEventType = "hello"
SessionEventAuth SessionEventType = "auth"
SessionEventReady SessionEventType = "session_ready"
SessionEventStreamOpened SessionEventType = "stream_opened"
SessionEventData SessionEventType = "data"
SessionEventAck SessionEventType = "ack"
SessionEventCredit SessionEventType = "credit"
SessionEventPing SessionEventType = "ping"
SessionEventPong SessionEventType = "pong"
SessionEventRouteUpdate SessionEventType = "route_update"
SessionEventPressure SessionEventType = "node_pressure"
SessionEventStreamClosed SessionEventType = "stream_closed"
SessionEventStreamReset SessionEventType = "stream_reset"
SessionEventGoAway SessionEventType = "goaway"
)
type SessionEvent struct {
Type SessionEventType
StreamID uint64
Sequence uint64
TrafficClass TrafficClass
Payload []byte
}
func (s *Session) HandleFrame(frame Frame) (SessionEvent, []Frame, error) {
if err := ValidateFrame(frame, DefaultMaxPayload); err != nil {
return SessionEvent{}, nil, err
}
switch frame.Type {
case FrameHello:
return sessionEvent(frame, SessionEventHello), nil, nil
case FrameAuth:
return sessionEvent(frame, SessionEventAuth), nil, nil
case FrameSessionReady:
return sessionEvent(frame, SessionEventReady), nil, nil
case FrameOpenStream:
if err := s.OpenStream(frame.StreamID, frame.TrafficClass); err != nil {
return SessionEvent{}, nil, err
}
return sessionEvent(frame, SessionEventStreamOpened), nil, nil
case FrameData:
event, err := s.handleDataFrame(frame)
if err != nil {
return SessionEvent{}, nil, err
}
return event, []Frame{{
Type: FrameAck,
TrafficClass: frame.TrafficClass,
StreamID: frame.StreamID,
Sequence: frame.Sequence,
}}, nil
case FrameAck:
if err := s.Ack(frame.StreamID, frame.Sequence); err != nil {
return SessionEvent{}, nil, err
}
return sessionEvent(frame, SessionEventAck), nil, nil
case FramePing:
return sessionEvent(frame, SessionEventPing), []Frame{{
Type: FramePong,
Sequence: frame.Sequence,
Payload: append([]byte(nil), frame.Payload...),
}}, nil
case FramePong:
return sessionEvent(frame, SessionEventPong), nil, nil
case FrameRouteUpdate:
return sessionEvent(frame, SessionEventRouteUpdate), nil, nil
case FrameStreamCredit:
if err := s.AddCredit(frame.StreamID, int(frame.Sequence)); err != nil {
return SessionEvent{}, nil, err
}
return sessionEvent(frame, SessionEventCredit), nil, nil
case FrameNodePressure:
return sessionEvent(frame, SessionEventPressure), nil, nil
case FrameCloseStream:
if err := s.CloseStream(frame.StreamID); err != nil {
return SessionEvent{}, nil, err
}
return sessionEvent(frame, SessionEventStreamClosed), nil, nil
case FrameResetStream:
if err := s.ResetStream(frame.StreamID); err != nil {
return SessionEvent{}, nil, err
}
return sessionEvent(frame, SessionEventStreamReset), nil, nil
case FrameGoAway:
s.Close()
return sessionEvent(frame, SessionEventGoAway), nil, nil
default:
return SessionEvent{}, nil, ErrUnsupportedSessionFrame
}
}
func (s *Session) handleDataFrame(frame Frame) (SessionEvent, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return SessionEvent{}, ErrSessionClosed
}
st, err := s.openStreamLocked(frame.StreamID)
if err != nil {
return SessionEvent{}, err
}
if frame.TrafficClass == 0 {
frame.TrafficClass = st.trafficClass
}
if frame.TrafficClass != st.trafficClass {
return SessionEvent{}, ErrTrafficClassMismatch
}
st.metrics.Received++
s.metrics.FramesReceived++
return sessionEvent(frame, SessionEventData), nil
}
func sessionEvent(frame Frame, eventType SessionEventType) SessionEvent {
return SessionEvent{
Type: eventType,
StreamID: frame.StreamID,
Sequence: frame.Sequence,
TrafficClass: frame.TrafficClass,
Payload: append([]byte(nil), frame.Payload...),
}
}
@@ -0,0 +1,172 @@
package fabricproto
import (
"errors"
"testing"
)
func TestHandleFrameOpensStreamAndReceivesData(t *testing.T) {
session := NewSession(SessionConfig{})
event, responses, err := session.HandleFrame(Frame{
Type: FrameOpenStream,
TrafficClass: TrafficClassInteractive,
StreamID: 7,
})
if err != nil {
t.Fatalf("handle open stream: %v", err)
}
if event.Type != SessionEventStreamOpened || event.StreamID != 7 || len(responses) != 0 {
t.Fatalf("open event/responses = %+v / %+v", event, responses)
}
event, responses, err = session.HandleFrame(Frame{
Type: FrameData,
TrafficClass: TrafficClassInteractive,
StreamID: 7,
Sequence: 11,
Payload: []byte("rdp-input"),
})
if err != nil {
t.Fatalf("handle data: %v", err)
}
if event.Type != SessionEventData || string(event.Payload) != "rdp-input" {
t.Fatalf("data event = %+v", event)
}
if len(responses) != 1 || responses[0].Type != FrameAck || responses[0].StreamID != 7 || responses[0].Sequence != 11 {
t.Fatalf("responses = %+v, want ack for stream 7 seq 11", responses)
}
snapshot := session.Snapshot()
if snapshot.FramesReceived != 1 || snapshot.Streams[7].Received != 1 {
t.Fatalf("received metrics = %+v stream=%+v", snapshot, snapshot.Streams[7])
}
}
func TestHandleFramePingReturnsPong(t *testing.T) {
session := NewSession(SessionConfig{})
event, responses, err := session.HandleFrame(Frame{
Type: FramePing,
Sequence: 99,
Payload: []byte("probe"),
})
if err != nil {
t.Fatalf("handle ping: %v", err)
}
if event.Type != SessionEventPing {
t.Fatalf("event = %+v, want ping", event)
}
if len(responses) != 1 || responses[0].Type != FramePong || responses[0].Sequence != 99 || string(responses[0].Payload) != "probe" {
t.Fatalf("responses = %+v, want pong echo", responses)
}
}
func TestHandleFrameCreditUnblocksOutgoingStream(t *testing.T) {
session := NewSession(SessionConfig{InitialStreamCredit: 1})
mustOpenStream(t, session, 1, TrafficClassReliable)
mustEnqueue(t, session, 1, "one")
if _, err := session.EnqueueData(1, []byte("blocked")); !errors.Is(err, ErrStreamCreditExhausted) {
t.Fatalf("credit error = %v, want %v", err, ErrStreamCreditExhausted)
}
event, responses, err := session.HandleFrame(Frame{
Type: FrameStreamCredit,
StreamID: 1,
Sequence: 2,
})
if err != nil {
t.Fatalf("handle credit: %v", err)
}
if event.Type != SessionEventCredit || len(responses) != 0 {
t.Fatalf("credit event/responses = %+v / %+v", event, responses)
}
mustEnqueue(t, session, 1, "two")
}
func TestHandleFrameAckUpdatesSession(t *testing.T) {
session := NewSession(SessionConfig{})
mustOpenStream(t, session, 1, TrafficClassReliable)
mustEnqueue(t, session, 1, "one")
mustEnqueue(t, session, 1, "two")
event, responses, err := session.HandleFrame(Frame{
Type: FrameAck,
StreamID: 1,
Sequence: 2,
})
if err != nil {
t.Fatalf("handle ack: %v", err)
}
if event.Type != SessionEventAck || len(responses) != 0 {
t.Fatalf("ack event/responses = %+v / %+v", event, responses)
}
snapshot := session.Snapshot()
if snapshot.Streams[1].Acked != 2 || snapshot.FramesAcked != 2 {
t.Fatalf("ack metrics = %+v stream=%+v", snapshot, snapshot.Streams[1])
}
}
func TestHandleFrameResetDoesNotAffectOtherStreams(t *testing.T) {
session := NewSession(SessionConfig{})
mustOpenStream(t, session, 1, TrafficClassBulk)
mustOpenStream(t, session, 2, TrafficClassControl)
mustEnqueue(t, session, 1, "bulk")
mustEnqueue(t, session, 2, "control")
event, _, err := session.HandleFrame(Frame{Type: FrameResetStream, StreamID: 1})
if err != nil {
t.Fatalf("handle reset: %v", err)
}
if event.Type != SessionEventStreamReset {
t.Fatalf("event = %+v, want reset", event)
}
frame, ok := session.DequeueNext()
if !ok || frame.StreamID != 2 {
t.Fatalf("dequeued = %+v ok=%v, want control stream 2", frame, ok)
}
}
func TestHandleFrameGoAwayClosesSession(t *testing.T) {
session := NewSession(SessionConfig{})
mustOpenStream(t, session, 1, TrafficClassReliable)
event, _, err := session.HandleFrame(Frame{Type: FrameGoAway})
if err != nil {
t.Fatalf("handle goaway: %v", err)
}
if event.Type != SessionEventGoAway {
t.Fatalf("event = %+v, want goaway", event)
}
if _, err := session.EnqueueData(1, []byte("after-goaway")); !errors.Is(err, ErrSessionClosed) {
t.Fatalf("enqueue err = %v, want %v", err, ErrSessionClosed)
}
}
func TestHandleFrameRejectsDataForMissingStream(t *testing.T) {
session := NewSession(SessionConfig{})
_, _, err := session.HandleFrame(Frame{
Type: FrameData,
TrafficClass: TrafficClassReliable,
StreamID: 44,
Sequence: 1,
})
if !errors.Is(err, ErrStreamNotFound) {
t.Fatalf("error = %v, want %v", err, ErrStreamNotFound)
}
}
func TestHandleFrameRejectsTrafficClassMismatch(t *testing.T) {
session := NewSession(SessionConfig{})
mustOpenStream(t, session, 3, TrafficClassInteractive)
_, _, err := session.HandleFrame(Frame{
Type: FrameData,
TrafficClass: TrafficClassBulk,
StreamID: 3,
Sequence: 1,
})
if !errors.Is(err, ErrTrafficClassMismatch) {
t.Fatalf("error = %v, want %v", err, ErrTrafficClassMismatch)
}
}
@@ -244,12 +244,13 @@ Deliverables:
### Stage FNP-2: Persistent Session Runtime Skeleton
Status: started in `agents/rap-node-agent/internal/fabricproto`.
Status: in progress in `agents/rap-node-agent/internal/fabricproto`.
Deliverables:
- implement in-memory session runtime with streams, sequence numbers, ACK,
stream credit, reset, and close;
- handle protocol frames for open/data/ack/credit/reset/close/ping/goaway;
- prove that a blocked bulk stream does not block control/interactive streams;
- expose per-stream metrics.