diff --git a/agents/rap-node-agent/internal/fabricproto/session.go b/agents/rap-node-agent/internal/fabricproto/session.go new file mode 100644 index 0000000..4364915 --- /dev/null +++ b/agents/rap-node-agent/internal/fabricproto/session.go @@ -0,0 +1,342 @@ +package fabricproto + +import ( + "errors" + "sort" + "sync" +) + +const ( + DefaultInitialStreamCredit = 32 +) + +var ( + ErrSessionClosed = errors.New("fabric session is closed") + ErrStreamExists = errors.New("fabric stream already exists") + ErrStreamNotFound = errors.New("fabric stream not found") + ErrStreamClosed = errors.New("fabric stream is closed") + ErrStreamQueueFull = errors.New("fabric stream queue is full") + ErrStreamCreditExhausted = errors.New("fabric stream credit is exhausted") +) + +type StreamState string + +const ( + StreamStateOpen StreamState = "open" + StreamStateClosed StreamState = "closed" + StreamStateReset StreamState = "reset" +) + +type SessionConfig struct { + InitialStreamCredit int + ClassQueueCapacity map[TrafficClass]int +} + +type Session struct { + mu sync.Mutex + closed bool + streams map[uint64]*stream + metrics SessionMetrics + cfg SessionConfig +} + +type stream struct { + id uint64 + trafficClass TrafficClass + state StreamState + nextSequence uint64 + credit int + queue []Frame + metrics StreamMetrics +} + +type SessionMetrics struct { + StreamsOpened uint64 + StreamsClosed uint64 + StreamsReset uint64 + FramesEnqueued uint64 + FramesDequeued uint64 + FramesAcked uint64 + FramesDropped uint64 + CreditExhausted uint64 + QueueFull uint64 + QueueDepths map[uint64]int + Streams map[uint64]StreamMetrics +} + +type StreamMetrics struct { + StreamID uint64 + TrafficClass TrafficClass + State StreamState + NextSequence uint64 + Credit int + QueueDepth int + Enqueued uint64 + Dequeued uint64 + Acked uint64 + Dropped uint64 + CreditBlocked uint64 + QueueBlocked uint64 +} + +func NewSession(cfg SessionConfig) *Session { + return &Session{ + streams: map[uint64]*stream{}, + metrics: SessionMetrics{ + QueueDepths: map[uint64]int{}, + Streams: map[uint64]StreamMetrics{}, + }, + cfg: normalizeSessionConfig(cfg), + } +} + +func (s *Session) OpenStream(streamID uint64, trafficClass TrafficClass) error { + if streamID == 0 { + return ErrInvalidStreamID + } + if !KnownTrafficClass(trafficClass) { + return ErrUnknownTraffic + } + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return ErrSessionClosed + } + if _, ok := s.streams[streamID]; ok { + return ErrStreamExists + } + s.streams[streamID] = &stream{ + id: streamID, + trafficClass: trafficClass, + state: StreamStateOpen, + credit: s.cfg.InitialStreamCredit, + } + s.metrics.StreamsOpened++ + return nil +} + +func (s *Session) EnqueueData(streamID uint64, payload []byte) (Frame, error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return Frame{}, ErrSessionClosed + } + st, err := s.openStreamLocked(streamID) + if err != nil { + return Frame{}, err + } + if st.credit <= 0 { + st.metrics.CreditBlocked++ + s.metrics.CreditExhausted++ + s.metrics.FramesDropped++ + return Frame{}, ErrStreamCreditExhausted + } + if len(st.queue) >= s.queueCapacity(st.trafficClass) { + st.metrics.QueueBlocked++ + st.metrics.Dropped++ + s.metrics.QueueFull++ + s.metrics.FramesDropped++ + return Frame{}, ErrStreamQueueFull + } + st.nextSequence++ + st.credit-- + frame := Frame{ + Type: FrameData, + TrafficClass: st.trafficClass, + StreamID: streamID, + Sequence: st.nextSequence, + Payload: append([]byte(nil), payload...), + } + st.queue = append(st.queue, frame) + st.metrics.Enqueued++ + s.metrics.FramesEnqueued++ + return frame, nil +} + +func (s *Session) DequeueNext() (Frame, bool) { + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return Frame{}, false + } + for _, trafficClass := range priorityOrder() { + streams := s.streamsByClassLocked(trafficClass) + for _, st := range streams { + if len(st.queue) == 0 || st.state != StreamStateOpen { + continue + } + frame := st.queue[0] + st.queue = st.queue[1:] + st.metrics.Dequeued++ + s.metrics.FramesDequeued++ + return frame, true + } + } + return Frame{}, false +} + +func (s *Session) Ack(streamID uint64, sequence uint64) error { + s.mu.Lock() + defer s.mu.Unlock() + st, ok := s.streams[streamID] + if !ok { + return ErrStreamNotFound + } + if sequence > st.metrics.Acked { + delta := sequence - st.metrics.Acked + st.metrics.Acked = sequence + s.metrics.FramesAcked += delta + } + return nil +} + +func (s *Session) AddCredit(streamID uint64, frames int) error { + if frames <= 0 { + return nil + } + s.mu.Lock() + defer s.mu.Unlock() + st, ok := s.streams[streamID] + if !ok { + return ErrStreamNotFound + } + if st.state != StreamStateOpen { + return ErrStreamClosed + } + st.credit += frames + return nil +} + +func (s *Session) CloseStream(streamID uint64) error { + s.mu.Lock() + defer s.mu.Unlock() + st, ok := s.streams[streamID] + if !ok { + return ErrStreamNotFound + } + if st.state == StreamStateClosed { + return nil + } + st.queue = nil + st.state = StreamStateClosed + s.metrics.StreamsClosed++ + return nil +} + +func (s *Session) ResetStream(streamID uint64) error { + s.mu.Lock() + defer s.mu.Unlock() + st, ok := s.streams[streamID] + if !ok { + return ErrStreamNotFound + } + if st.state == StreamStateReset { + return nil + } + dropped := len(st.queue) + st.queue = nil + st.state = StreamStateReset + st.metrics.Dropped += uint64(dropped) + s.metrics.FramesDropped += uint64(dropped) + s.metrics.StreamsReset++ + return nil +} + +func (s *Session) Close() { + s.mu.Lock() + defer s.mu.Unlock() + s.closed = true + for _, st := range s.streams { + st.queue = nil + if st.state == StreamStateOpen { + st.state = StreamStateClosed + } + } +} + +func (s *Session) Snapshot() SessionMetrics { + s.mu.Lock() + defer s.mu.Unlock() + out := s.metrics + out.QueueDepths = map[uint64]int{} + out.Streams = map[uint64]StreamMetrics{} + for streamID, st := range s.streams { + metrics := st.metrics + metrics.StreamID = streamID + metrics.TrafficClass = st.trafficClass + metrics.State = st.state + metrics.NextSequence = st.nextSequence + metrics.Credit = st.credit + metrics.QueueDepth = len(st.queue) + out.QueueDepths[streamID] = len(st.queue) + out.Streams[streamID] = metrics + } + return out +} + +func (s *Session) openStreamLocked(streamID uint64) (*stream, error) { + st, ok := s.streams[streamID] + if !ok { + return nil, ErrStreamNotFound + } + if st.state != StreamStateOpen { + return nil, ErrStreamClosed + } + return st, nil +} + +func (s *Session) queueCapacity(trafficClass TrafficClass) int { + if capacity := s.cfg.ClassQueueCapacity[trafficClass]; capacity > 0 { + return capacity + } + return defaultClassQueueCapacity(trafficClass) +} + +func (s *Session) streamsByClassLocked(trafficClass TrafficClass) []*stream { + var out []*stream + for _, st := range s.streams { + if st.trafficClass == trafficClass { + out = append(out, st) + } + } + sort.Slice(out, func(i, j int) bool { + return out[i].id < out[j].id + }) + return out +} + +func normalizeSessionConfig(cfg SessionConfig) SessionConfig { + if cfg.InitialStreamCredit <= 0 { + cfg.InitialStreamCredit = DefaultInitialStreamCredit + } + if cfg.ClassQueueCapacity == nil { + cfg.ClassQueueCapacity = map[TrafficClass]int{} + } + return cfg +} + +func priorityOrder() []TrafficClass { + return []TrafficClass{ + TrafficClassControl, + TrafficClassDNS, + TrafficClassInteractive, + TrafficClassReliable, + TrafficClassBulk, + TrafficClassDroppable, + } +} + +func defaultClassQueueCapacity(trafficClass TrafficClass) int { + switch trafficClass { + case TrafficClassControl, TrafficClassDNS, TrafficClassInteractive: + return 128 + case TrafficClassReliable: + return 64 + case TrafficClassBulk: + return 16 + case TrafficClassDroppable: + return 8 + default: + return 32 + } +} diff --git a/agents/rap-node-agent/internal/fabricproto/session_test.go b/agents/rap-node-agent/internal/fabricproto/session_test.go new file mode 100644 index 0000000..f28f93b --- /dev/null +++ b/agents/rap-node-agent/internal/fabricproto/session_test.go @@ -0,0 +1,169 @@ +package fabricproto + +import ( + "errors" + "testing" +) + +func TestSessionDequeuesByTrafficPriority(t *testing.T) { + session := NewSession(SessionConfig{}) + mustOpenStream(t, session, 10, TrafficClassBulk) + mustOpenStream(t, session, 20, TrafficClassInteractive) + mustOpenStream(t, session, 30, TrafficClassControl) + + mustEnqueue(t, session, 10, "bulk") + mustEnqueue(t, session, 20, "interactive") + mustEnqueue(t, session, 30, "control") + + frame, ok := session.DequeueNext() + if !ok || frame.StreamID != 30 { + t.Fatalf("first frame = %+v ok=%v, want control stream 30", frame, ok) + } + frame, ok = session.DequeueNext() + if !ok || frame.StreamID != 20 { + t.Fatalf("second frame = %+v ok=%v, want interactive stream 20", frame, ok) + } + frame, ok = session.DequeueNext() + if !ok || frame.StreamID != 10 { + t.Fatalf("third frame = %+v ok=%v, want bulk stream 10", frame, ok) + } +} + +func TestSessionBulkQueueFullDoesNotBlockControlOrInteractive(t *testing.T) { + session := NewSession(SessionConfig{ + ClassQueueCapacity: map[TrafficClass]int{ + TrafficClassBulk: 1, + TrafficClassControl: 4, + TrafficClassInteractive: 4, + }, + }) + mustOpenStream(t, session, 1, TrafficClassBulk) + mustOpenStream(t, session, 2, TrafficClassControl) + mustOpenStream(t, session, 3, TrafficClassInteractive) + + mustEnqueue(t, session, 1, "bulk-1") + if _, err := session.EnqueueData(1, []byte("bulk-2")); !errors.Is(err, ErrStreamQueueFull) { + t.Fatalf("bulk enqueue error = %v, want %v", err, ErrStreamQueueFull) + } + mustEnqueue(t, session, 2, "control") + mustEnqueue(t, session, 3, "interactive") + + first, ok := session.DequeueNext() + if !ok || first.StreamID != 2 { + t.Fatalf("first frame = %+v ok=%v, want control", first, ok) + } + second, ok := session.DequeueNext() + if !ok || second.StreamID != 3 { + t.Fatalf("second frame = %+v ok=%v, want interactive", second, ok) + } + + snapshot := session.Snapshot() + if snapshot.QueueFull != 1 || snapshot.FramesDropped != 1 { + t.Fatalf("snapshot queue full/dropped = %d/%d, want 1/1", snapshot.QueueFull, snapshot.FramesDropped) + } + if snapshot.Streams[2].Enqueued != 1 || snapshot.Streams[3].Enqueued != 1 { + t.Fatalf("control/interactive metrics = %+v / %+v", snapshot.Streams[2], snapshot.Streams[3]) + } +} + +func TestSessionCreditBlocksOnlyOneStream(t *testing.T) { + session := NewSession(SessionConfig{InitialStreamCredit: 1}) + mustOpenStream(t, session, 1, TrafficClassReliable) + mustOpenStream(t, session, 2, TrafficClassReliable) + + mustEnqueue(t, session, 1, "one") + if _, err := session.EnqueueData(1, []byte("two")); !errors.Is(err, ErrStreamCreditExhausted) { + t.Fatalf("credit error = %v, want %v", err, ErrStreamCreditExhausted) + } + mustEnqueue(t, session, 2, "other") + + snapshot := session.Snapshot() + if snapshot.CreditExhausted != 1 || snapshot.Streams[1].CreditBlocked != 1 { + t.Fatalf("credit metrics = %+v stream=%+v", snapshot, snapshot.Streams[1]) + } + if snapshot.Streams[2].Enqueued != 1 { + t.Fatalf("second stream metrics = %+v, want enqueued", snapshot.Streams[2]) + } +} + +func TestSessionAddCreditAllowsMoreFrames(t *testing.T) { + session := NewSession(SessionConfig{InitialStreamCredit: 1}) + mustOpenStream(t, session, 1, TrafficClassReliable) + mustEnqueue(t, session, 1, "one") + if _, err := session.EnqueueData(1, []byte("blocked")); !errors.Is(err, ErrStreamCreditExhausted) { + t.Fatalf("credit error = %v, want %v", err, ErrStreamCreditExhausted) + } + if err := session.AddCredit(1, 2); err != nil { + t.Fatalf("add credit: %v", err) + } + mustEnqueue(t, session, 1, "two") + + snapshot := session.Snapshot() + if snapshot.Streams[1].Credit != 1 || snapshot.Streams[1].Enqueued != 2 { + t.Fatalf("stream metrics = %+v, want credit 1 and enqueued 2", snapshot.Streams[1]) + } +} + +func TestSessionResetDropsOnlySelectedStream(t *testing.T) { + session := NewSession(SessionConfig{}) + mustOpenStream(t, session, 1, TrafficClassBulk) + mustOpenStream(t, session, 2, TrafficClassControl) + mustEnqueue(t, session, 1, "bulk") + mustEnqueue(t, session, 2, "control") + + if err := session.ResetStream(1); err != nil { + t.Fatalf("reset stream: %v", err) + } + frame, ok := session.DequeueNext() + if !ok || frame.StreamID != 2 { + t.Fatalf("dequeued frame = %+v ok=%v, want control stream 2", frame, ok) + } + if _, ok := session.DequeueNext(); ok { + t.Fatal("unexpected frame after reset dropped bulk queue") + } + + snapshot := session.Snapshot() + if snapshot.StreamsReset != 1 || snapshot.Streams[1].State != StreamStateReset || snapshot.Streams[2].Dequeued != 1 { + t.Fatalf("snapshot = %+v", snapshot) + } +} + +func TestSessionAckUpdatesMetrics(t *testing.T) { + session := NewSession(SessionConfig{}) + mustOpenStream(t, session, 1, TrafficClassReliable) + mustEnqueue(t, session, 1, "one") + mustEnqueue(t, session, 1, "two") + + if err := session.Ack(1, 2); err != nil { + t.Fatalf("ack: %v", err) + } + snapshot := session.Snapshot() + if snapshot.FramesAcked != 2 || snapshot.Streams[1].Acked != 2 { + t.Fatalf("ack metrics = %+v stream=%+v", snapshot, snapshot.Streams[1]) + } +} + +func TestSessionCloseRejectsNewData(t *testing.T) { + session := NewSession(SessionConfig{}) + mustOpenStream(t, session, 1, TrafficClassReliable) + session.Close() + if _, err := session.EnqueueData(1, []byte("after-close")); !errors.Is(err, ErrSessionClosed) { + t.Fatalf("enqueue after close err = %v, want %v", err, ErrSessionClosed) + } +} + +func mustOpenStream(t *testing.T, session *Session, streamID uint64, trafficClass TrafficClass) { + t.Helper() + if err := session.OpenStream(streamID, trafficClass); err != nil { + t.Fatalf("open stream %d: %v", streamID, err) + } +} + +func mustEnqueue(t *testing.T, session *Session, streamID uint64, payload string) Frame { + t.Helper() + frame, err := session.EnqueueData(streamID, []byte(payload)) + if err != nil { + t.Fatalf("enqueue stream %d: %v", streamID, err) + } + return frame +} diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 31fe29e..84c4219 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -244,6 +244,8 @@ Deliverables: ### Stage FNP-2: Persistent Session Runtime Skeleton +Status: started in `agents/rap-node-agent/internal/fabricproto`. + Deliverables: - implement in-memory session runtime with streams, sequence numbers, ACK,