diff --git a/agents/rap-node-agent/internal/fabricproto/session.go b/agents/rap-node-agent/internal/fabricproto/session.go index 4364915..b007864 100644 --- a/agents/rap-node-agent/internal/fabricproto/session.go +++ b/agents/rap-node-agent/internal/fabricproto/session.go @@ -56,6 +56,7 @@ type SessionMetrics struct { StreamsReset uint64 FramesEnqueued uint64 FramesDequeued uint64 + FramesReceived uint64 FramesAcked uint64 FramesDropped uint64 CreditExhausted uint64 @@ -73,6 +74,7 @@ type StreamMetrics struct { QueueDepth int Enqueued uint64 Dequeued uint64 + Received uint64 Acked uint64 Dropped uint64 CreditBlocked uint64 diff --git a/agents/rap-node-agent/internal/fabricproto/session_frames.go b/agents/rap-node-agent/internal/fabricproto/session_frames.go new file mode 100644 index 0000000..646f555 --- /dev/null +++ b/agents/rap-node-agent/internal/fabricproto/session_frames.go @@ -0,0 +1,135 @@ +package fabricproto + +import "errors" + +var ( + ErrUnsupportedSessionFrame = errors.New("unsupported fabric session frame") + ErrTrafficClassMismatch = errors.New("fabric stream traffic class mismatch") +) + +type SessionEventType string + +const ( + SessionEventNone SessionEventType = "" + SessionEventHello SessionEventType = "hello" + SessionEventAuth SessionEventType = "auth" + SessionEventReady SessionEventType = "session_ready" + SessionEventStreamOpened SessionEventType = "stream_opened" + SessionEventData SessionEventType = "data" + SessionEventAck SessionEventType = "ack" + SessionEventCredit SessionEventType = "credit" + SessionEventPing SessionEventType = "ping" + SessionEventPong SessionEventType = "pong" + SessionEventRouteUpdate SessionEventType = "route_update" + SessionEventPressure SessionEventType = "node_pressure" + SessionEventStreamClosed SessionEventType = "stream_closed" + SessionEventStreamReset SessionEventType = "stream_reset" + SessionEventGoAway SessionEventType = "goaway" +) + +type SessionEvent struct { + Type SessionEventType + StreamID uint64 + Sequence uint64 + TrafficClass TrafficClass + Payload []byte +} + +func (s *Session) HandleFrame(frame Frame) (SessionEvent, []Frame, error) { + if err := ValidateFrame(frame, DefaultMaxPayload); err != nil { + return SessionEvent{}, nil, err + } + + switch frame.Type { + case FrameHello: + return sessionEvent(frame, SessionEventHello), nil, nil + case FrameAuth: + return sessionEvent(frame, SessionEventAuth), nil, nil + case FrameSessionReady: + return sessionEvent(frame, SessionEventReady), nil, nil + case FrameOpenStream: + if err := s.OpenStream(frame.StreamID, frame.TrafficClass); err != nil { + return SessionEvent{}, nil, err + } + return sessionEvent(frame, SessionEventStreamOpened), nil, nil + case FrameData: + event, err := s.handleDataFrame(frame) + if err != nil { + return SessionEvent{}, nil, err + } + return event, []Frame{{ + Type: FrameAck, + TrafficClass: frame.TrafficClass, + StreamID: frame.StreamID, + Sequence: frame.Sequence, + }}, nil + case FrameAck: + if err := s.Ack(frame.StreamID, frame.Sequence); err != nil { + return SessionEvent{}, nil, err + } + return sessionEvent(frame, SessionEventAck), nil, nil + case FramePing: + return sessionEvent(frame, SessionEventPing), []Frame{{ + Type: FramePong, + Sequence: frame.Sequence, + Payload: append([]byte(nil), frame.Payload...), + }}, nil + case FramePong: + return sessionEvent(frame, SessionEventPong), nil, nil + case FrameRouteUpdate: + return sessionEvent(frame, SessionEventRouteUpdate), nil, nil + case FrameStreamCredit: + if err := s.AddCredit(frame.StreamID, int(frame.Sequence)); err != nil { + return SessionEvent{}, nil, err + } + return sessionEvent(frame, SessionEventCredit), nil, nil + case FrameNodePressure: + return sessionEvent(frame, SessionEventPressure), nil, nil + case FrameCloseStream: + if err := s.CloseStream(frame.StreamID); err != nil { + return SessionEvent{}, nil, err + } + return sessionEvent(frame, SessionEventStreamClosed), nil, nil + case FrameResetStream: + if err := s.ResetStream(frame.StreamID); err != nil { + return SessionEvent{}, nil, err + } + return sessionEvent(frame, SessionEventStreamReset), nil, nil + case FrameGoAway: + s.Close() + return sessionEvent(frame, SessionEventGoAway), nil, nil + default: + return SessionEvent{}, nil, ErrUnsupportedSessionFrame + } +} + +func (s *Session) handleDataFrame(frame Frame) (SessionEvent, error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return SessionEvent{}, ErrSessionClosed + } + st, err := s.openStreamLocked(frame.StreamID) + if err != nil { + return SessionEvent{}, err + } + if frame.TrafficClass == 0 { + frame.TrafficClass = st.trafficClass + } + if frame.TrafficClass != st.trafficClass { + return SessionEvent{}, ErrTrafficClassMismatch + } + st.metrics.Received++ + s.metrics.FramesReceived++ + return sessionEvent(frame, SessionEventData), nil +} + +func sessionEvent(frame Frame, eventType SessionEventType) SessionEvent { + return SessionEvent{ + Type: eventType, + StreamID: frame.StreamID, + Sequence: frame.Sequence, + TrafficClass: frame.TrafficClass, + Payload: append([]byte(nil), frame.Payload...), + } +} diff --git a/agents/rap-node-agent/internal/fabricproto/session_frames_test.go b/agents/rap-node-agent/internal/fabricproto/session_frames_test.go new file mode 100644 index 0000000..506afb3 --- /dev/null +++ b/agents/rap-node-agent/internal/fabricproto/session_frames_test.go @@ -0,0 +1,172 @@ +package fabricproto + +import ( + "errors" + "testing" +) + +func TestHandleFrameOpensStreamAndReceivesData(t *testing.T) { + session := NewSession(SessionConfig{}) + + event, responses, err := session.HandleFrame(Frame{ + Type: FrameOpenStream, + TrafficClass: TrafficClassInteractive, + StreamID: 7, + }) + if err != nil { + t.Fatalf("handle open stream: %v", err) + } + if event.Type != SessionEventStreamOpened || event.StreamID != 7 || len(responses) != 0 { + t.Fatalf("open event/responses = %+v / %+v", event, responses) + } + + event, responses, err = session.HandleFrame(Frame{ + Type: FrameData, + TrafficClass: TrafficClassInteractive, + StreamID: 7, + Sequence: 11, + Payload: []byte("rdp-input"), + }) + if err != nil { + t.Fatalf("handle data: %v", err) + } + if event.Type != SessionEventData || string(event.Payload) != "rdp-input" { + t.Fatalf("data event = %+v", event) + } + if len(responses) != 1 || responses[0].Type != FrameAck || responses[0].StreamID != 7 || responses[0].Sequence != 11 { + t.Fatalf("responses = %+v, want ack for stream 7 seq 11", responses) + } + snapshot := session.Snapshot() + if snapshot.FramesReceived != 1 || snapshot.Streams[7].Received != 1 { + t.Fatalf("received metrics = %+v stream=%+v", snapshot, snapshot.Streams[7]) + } +} + +func TestHandleFramePingReturnsPong(t *testing.T) { + session := NewSession(SessionConfig{}) + + event, responses, err := session.HandleFrame(Frame{ + Type: FramePing, + Sequence: 99, + Payload: []byte("probe"), + }) + if err != nil { + t.Fatalf("handle ping: %v", err) + } + if event.Type != SessionEventPing { + t.Fatalf("event = %+v, want ping", event) + } + if len(responses) != 1 || responses[0].Type != FramePong || responses[0].Sequence != 99 || string(responses[0].Payload) != "probe" { + t.Fatalf("responses = %+v, want pong echo", responses) + } +} + +func TestHandleFrameCreditUnblocksOutgoingStream(t *testing.T) { + session := NewSession(SessionConfig{InitialStreamCredit: 1}) + mustOpenStream(t, session, 1, TrafficClassReliable) + mustEnqueue(t, session, 1, "one") + if _, err := session.EnqueueData(1, []byte("blocked")); !errors.Is(err, ErrStreamCreditExhausted) { + t.Fatalf("credit error = %v, want %v", err, ErrStreamCreditExhausted) + } + + event, responses, err := session.HandleFrame(Frame{ + Type: FrameStreamCredit, + StreamID: 1, + Sequence: 2, + }) + if err != nil { + t.Fatalf("handle credit: %v", err) + } + if event.Type != SessionEventCredit || len(responses) != 0 { + t.Fatalf("credit event/responses = %+v / %+v", event, responses) + } + mustEnqueue(t, session, 1, "two") +} + +func TestHandleFrameAckUpdatesSession(t *testing.T) { + session := NewSession(SessionConfig{}) + mustOpenStream(t, session, 1, TrafficClassReliable) + mustEnqueue(t, session, 1, "one") + mustEnqueue(t, session, 1, "two") + + event, responses, err := session.HandleFrame(Frame{ + Type: FrameAck, + StreamID: 1, + Sequence: 2, + }) + if err != nil { + t.Fatalf("handle ack: %v", err) + } + if event.Type != SessionEventAck || len(responses) != 0 { + t.Fatalf("ack event/responses = %+v / %+v", event, responses) + } + snapshot := session.Snapshot() + if snapshot.Streams[1].Acked != 2 || snapshot.FramesAcked != 2 { + t.Fatalf("ack metrics = %+v stream=%+v", snapshot, snapshot.Streams[1]) + } +} + +func TestHandleFrameResetDoesNotAffectOtherStreams(t *testing.T) { + session := NewSession(SessionConfig{}) + mustOpenStream(t, session, 1, TrafficClassBulk) + mustOpenStream(t, session, 2, TrafficClassControl) + mustEnqueue(t, session, 1, "bulk") + mustEnqueue(t, session, 2, "control") + + event, _, err := session.HandleFrame(Frame{Type: FrameResetStream, StreamID: 1}) + if err != nil { + t.Fatalf("handle reset: %v", err) + } + if event.Type != SessionEventStreamReset { + t.Fatalf("event = %+v, want reset", event) + } + frame, ok := session.DequeueNext() + if !ok || frame.StreamID != 2 { + t.Fatalf("dequeued = %+v ok=%v, want control stream 2", frame, ok) + } +} + +func TestHandleFrameGoAwayClosesSession(t *testing.T) { + session := NewSession(SessionConfig{}) + mustOpenStream(t, session, 1, TrafficClassReliable) + + event, _, err := session.HandleFrame(Frame{Type: FrameGoAway}) + if err != nil { + t.Fatalf("handle goaway: %v", err) + } + if event.Type != SessionEventGoAway { + t.Fatalf("event = %+v, want goaway", event) + } + if _, err := session.EnqueueData(1, []byte("after-goaway")); !errors.Is(err, ErrSessionClosed) { + t.Fatalf("enqueue err = %v, want %v", err, ErrSessionClosed) + } +} + +func TestHandleFrameRejectsDataForMissingStream(t *testing.T) { + session := NewSession(SessionConfig{}) + + _, _, err := session.HandleFrame(Frame{ + Type: FrameData, + TrafficClass: TrafficClassReliable, + StreamID: 44, + Sequence: 1, + }) + if !errors.Is(err, ErrStreamNotFound) { + t.Fatalf("error = %v, want %v", err, ErrStreamNotFound) + } +} + +func TestHandleFrameRejectsTrafficClassMismatch(t *testing.T) { + session := NewSession(SessionConfig{}) + mustOpenStream(t, session, 3, TrafficClassInteractive) + + _, _, err := session.HandleFrame(Frame{ + Type: FrameData, + TrafficClass: TrafficClassBulk, + StreamID: 3, + Sequence: 1, + }) + if !errors.Is(err, ErrTrafficClassMismatch) { + t.Fatalf("error = %v, want %v", err, ErrTrafficClassMismatch) + } +} diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 84c4219..2308f14 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -244,12 +244,13 @@ Deliverables: ### Stage FNP-2: Persistent Session Runtime Skeleton -Status: started in `agents/rap-node-agent/internal/fabricproto`. +Status: in progress in `agents/rap-node-agent/internal/fabricproto`. Deliverables: - implement in-memory session runtime with streams, sequence numbers, ACK, stream credit, reset, and close; +- handle protocol frames for open/data/ack/credit/reset/close/ping/goaway; - prove that a blocked bulk stream does not block control/interactive streams; - expose per-stream metrics.