Add fabric session stream runtime skeleton

This commit is contained in:
2026-05-16 00:02:05 +03:00
parent 2cedc2e2f3
commit c711bdc67c
3 changed files with 513 additions and 0 deletions
@@ -0,0 +1,342 @@
package fabricproto
import (
"errors"
"sort"
"sync"
)
const (
DefaultInitialStreamCredit = 32
)
var (
ErrSessionClosed = errors.New("fabric session is closed")
ErrStreamExists = errors.New("fabric stream already exists")
ErrStreamNotFound = errors.New("fabric stream not found")
ErrStreamClosed = errors.New("fabric stream is closed")
ErrStreamQueueFull = errors.New("fabric stream queue is full")
ErrStreamCreditExhausted = errors.New("fabric stream credit is exhausted")
)
type StreamState string
const (
StreamStateOpen StreamState = "open"
StreamStateClosed StreamState = "closed"
StreamStateReset StreamState = "reset"
)
type SessionConfig struct {
InitialStreamCredit int
ClassQueueCapacity map[TrafficClass]int
}
type Session struct {
mu sync.Mutex
closed bool
streams map[uint64]*stream
metrics SessionMetrics
cfg SessionConfig
}
type stream struct {
id uint64
trafficClass TrafficClass
state StreamState
nextSequence uint64
credit int
queue []Frame
metrics StreamMetrics
}
type SessionMetrics struct {
StreamsOpened uint64
StreamsClosed uint64
StreamsReset uint64
FramesEnqueued uint64
FramesDequeued uint64
FramesAcked uint64
FramesDropped uint64
CreditExhausted uint64
QueueFull uint64
QueueDepths map[uint64]int
Streams map[uint64]StreamMetrics
}
type StreamMetrics struct {
StreamID uint64
TrafficClass TrafficClass
State StreamState
NextSequence uint64
Credit int
QueueDepth int
Enqueued uint64
Dequeued uint64
Acked uint64
Dropped uint64
CreditBlocked uint64
QueueBlocked uint64
}
func NewSession(cfg SessionConfig) *Session {
return &Session{
streams: map[uint64]*stream{},
metrics: SessionMetrics{
QueueDepths: map[uint64]int{},
Streams: map[uint64]StreamMetrics{},
},
cfg: normalizeSessionConfig(cfg),
}
}
func (s *Session) OpenStream(streamID uint64, trafficClass TrafficClass) error {
if streamID == 0 {
return ErrInvalidStreamID
}
if !KnownTrafficClass(trafficClass) {
return ErrUnknownTraffic
}
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return ErrSessionClosed
}
if _, ok := s.streams[streamID]; ok {
return ErrStreamExists
}
s.streams[streamID] = &stream{
id: streamID,
trafficClass: trafficClass,
state: StreamStateOpen,
credit: s.cfg.InitialStreamCredit,
}
s.metrics.StreamsOpened++
return nil
}
func (s *Session) EnqueueData(streamID uint64, payload []byte) (Frame, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return Frame{}, ErrSessionClosed
}
st, err := s.openStreamLocked(streamID)
if err != nil {
return Frame{}, err
}
if st.credit <= 0 {
st.metrics.CreditBlocked++
s.metrics.CreditExhausted++
s.metrics.FramesDropped++
return Frame{}, ErrStreamCreditExhausted
}
if len(st.queue) >= s.queueCapacity(st.trafficClass) {
st.metrics.QueueBlocked++
st.metrics.Dropped++
s.metrics.QueueFull++
s.metrics.FramesDropped++
return Frame{}, ErrStreamQueueFull
}
st.nextSequence++
st.credit--
frame := Frame{
Type: FrameData,
TrafficClass: st.trafficClass,
StreamID: streamID,
Sequence: st.nextSequence,
Payload: append([]byte(nil), payload...),
}
st.queue = append(st.queue, frame)
st.metrics.Enqueued++
s.metrics.FramesEnqueued++
return frame, nil
}
func (s *Session) DequeueNext() (Frame, bool) {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return Frame{}, false
}
for _, trafficClass := range priorityOrder() {
streams := s.streamsByClassLocked(trafficClass)
for _, st := range streams {
if len(st.queue) == 0 || st.state != StreamStateOpen {
continue
}
frame := st.queue[0]
st.queue = st.queue[1:]
st.metrics.Dequeued++
s.metrics.FramesDequeued++
return frame, true
}
}
return Frame{}, false
}
func (s *Session) Ack(streamID uint64, sequence uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
st, ok := s.streams[streamID]
if !ok {
return ErrStreamNotFound
}
if sequence > st.metrics.Acked {
delta := sequence - st.metrics.Acked
st.metrics.Acked = sequence
s.metrics.FramesAcked += delta
}
return nil
}
func (s *Session) AddCredit(streamID uint64, frames int) error {
if frames <= 0 {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
st, ok := s.streams[streamID]
if !ok {
return ErrStreamNotFound
}
if st.state != StreamStateOpen {
return ErrStreamClosed
}
st.credit += frames
return nil
}
func (s *Session) CloseStream(streamID uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
st, ok := s.streams[streamID]
if !ok {
return ErrStreamNotFound
}
if st.state == StreamStateClosed {
return nil
}
st.queue = nil
st.state = StreamStateClosed
s.metrics.StreamsClosed++
return nil
}
func (s *Session) ResetStream(streamID uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
st, ok := s.streams[streamID]
if !ok {
return ErrStreamNotFound
}
if st.state == StreamStateReset {
return nil
}
dropped := len(st.queue)
st.queue = nil
st.state = StreamStateReset
st.metrics.Dropped += uint64(dropped)
s.metrics.FramesDropped += uint64(dropped)
s.metrics.StreamsReset++
return nil
}
func (s *Session) Close() {
s.mu.Lock()
defer s.mu.Unlock()
s.closed = true
for _, st := range s.streams {
st.queue = nil
if st.state == StreamStateOpen {
st.state = StreamStateClosed
}
}
}
func (s *Session) Snapshot() SessionMetrics {
s.mu.Lock()
defer s.mu.Unlock()
out := s.metrics
out.QueueDepths = map[uint64]int{}
out.Streams = map[uint64]StreamMetrics{}
for streamID, st := range s.streams {
metrics := st.metrics
metrics.StreamID = streamID
metrics.TrafficClass = st.trafficClass
metrics.State = st.state
metrics.NextSequence = st.nextSequence
metrics.Credit = st.credit
metrics.QueueDepth = len(st.queue)
out.QueueDepths[streamID] = len(st.queue)
out.Streams[streamID] = metrics
}
return out
}
func (s *Session) openStreamLocked(streamID uint64) (*stream, error) {
st, ok := s.streams[streamID]
if !ok {
return nil, ErrStreamNotFound
}
if st.state != StreamStateOpen {
return nil, ErrStreamClosed
}
return st, nil
}
func (s *Session) queueCapacity(trafficClass TrafficClass) int {
if capacity := s.cfg.ClassQueueCapacity[trafficClass]; capacity > 0 {
return capacity
}
return defaultClassQueueCapacity(trafficClass)
}
func (s *Session) streamsByClassLocked(trafficClass TrafficClass) []*stream {
var out []*stream
for _, st := range s.streams {
if st.trafficClass == trafficClass {
out = append(out, st)
}
}
sort.Slice(out, func(i, j int) bool {
return out[i].id < out[j].id
})
return out
}
func normalizeSessionConfig(cfg SessionConfig) SessionConfig {
if cfg.InitialStreamCredit <= 0 {
cfg.InitialStreamCredit = DefaultInitialStreamCredit
}
if cfg.ClassQueueCapacity == nil {
cfg.ClassQueueCapacity = map[TrafficClass]int{}
}
return cfg
}
func priorityOrder() []TrafficClass {
return []TrafficClass{
TrafficClassControl,
TrafficClassDNS,
TrafficClassInteractive,
TrafficClassReliable,
TrafficClassBulk,
TrafficClassDroppable,
}
}
func defaultClassQueueCapacity(trafficClass TrafficClass) int {
switch trafficClass {
case TrafficClassControl, TrafficClassDNS, TrafficClassInteractive:
return 128
case TrafficClassReliable:
return 64
case TrafficClassBulk:
return 16
case TrafficClassDroppable:
return 8
default:
return 32
}
}
@@ -0,0 +1,169 @@
package fabricproto
import (
"errors"
"testing"
)
func TestSessionDequeuesByTrafficPriority(t *testing.T) {
session := NewSession(SessionConfig{})
mustOpenStream(t, session, 10, TrafficClassBulk)
mustOpenStream(t, session, 20, TrafficClassInteractive)
mustOpenStream(t, session, 30, TrafficClassControl)
mustEnqueue(t, session, 10, "bulk")
mustEnqueue(t, session, 20, "interactive")
mustEnqueue(t, session, 30, "control")
frame, ok := session.DequeueNext()
if !ok || frame.StreamID != 30 {
t.Fatalf("first frame = %+v ok=%v, want control stream 30", frame, ok)
}
frame, ok = session.DequeueNext()
if !ok || frame.StreamID != 20 {
t.Fatalf("second frame = %+v ok=%v, want interactive stream 20", frame, ok)
}
frame, ok = session.DequeueNext()
if !ok || frame.StreamID != 10 {
t.Fatalf("third frame = %+v ok=%v, want bulk stream 10", frame, ok)
}
}
func TestSessionBulkQueueFullDoesNotBlockControlOrInteractive(t *testing.T) {
session := NewSession(SessionConfig{
ClassQueueCapacity: map[TrafficClass]int{
TrafficClassBulk: 1,
TrafficClassControl: 4,
TrafficClassInteractive: 4,
},
})
mustOpenStream(t, session, 1, TrafficClassBulk)
mustOpenStream(t, session, 2, TrafficClassControl)
mustOpenStream(t, session, 3, TrafficClassInteractive)
mustEnqueue(t, session, 1, "bulk-1")
if _, err := session.EnqueueData(1, []byte("bulk-2")); !errors.Is(err, ErrStreamQueueFull) {
t.Fatalf("bulk enqueue error = %v, want %v", err, ErrStreamQueueFull)
}
mustEnqueue(t, session, 2, "control")
mustEnqueue(t, session, 3, "interactive")
first, ok := session.DequeueNext()
if !ok || first.StreamID != 2 {
t.Fatalf("first frame = %+v ok=%v, want control", first, ok)
}
second, ok := session.DequeueNext()
if !ok || second.StreamID != 3 {
t.Fatalf("second frame = %+v ok=%v, want interactive", second, ok)
}
snapshot := session.Snapshot()
if snapshot.QueueFull != 1 || snapshot.FramesDropped != 1 {
t.Fatalf("snapshot queue full/dropped = %d/%d, want 1/1", snapshot.QueueFull, snapshot.FramesDropped)
}
if snapshot.Streams[2].Enqueued != 1 || snapshot.Streams[3].Enqueued != 1 {
t.Fatalf("control/interactive metrics = %+v / %+v", snapshot.Streams[2], snapshot.Streams[3])
}
}
func TestSessionCreditBlocksOnlyOneStream(t *testing.T) {
session := NewSession(SessionConfig{InitialStreamCredit: 1})
mustOpenStream(t, session, 1, TrafficClassReliable)
mustOpenStream(t, session, 2, TrafficClassReliable)
mustEnqueue(t, session, 1, "one")
if _, err := session.EnqueueData(1, []byte("two")); !errors.Is(err, ErrStreamCreditExhausted) {
t.Fatalf("credit error = %v, want %v", err, ErrStreamCreditExhausted)
}
mustEnqueue(t, session, 2, "other")
snapshot := session.Snapshot()
if snapshot.CreditExhausted != 1 || snapshot.Streams[1].CreditBlocked != 1 {
t.Fatalf("credit metrics = %+v stream=%+v", snapshot, snapshot.Streams[1])
}
if snapshot.Streams[2].Enqueued != 1 {
t.Fatalf("second stream metrics = %+v, want enqueued", snapshot.Streams[2])
}
}
func TestSessionAddCreditAllowsMoreFrames(t *testing.T) {
session := NewSession(SessionConfig{InitialStreamCredit: 1})
mustOpenStream(t, session, 1, TrafficClassReliable)
mustEnqueue(t, session, 1, "one")
if _, err := session.EnqueueData(1, []byte("blocked")); !errors.Is(err, ErrStreamCreditExhausted) {
t.Fatalf("credit error = %v, want %v", err, ErrStreamCreditExhausted)
}
if err := session.AddCredit(1, 2); err != nil {
t.Fatalf("add credit: %v", err)
}
mustEnqueue(t, session, 1, "two")
snapshot := session.Snapshot()
if snapshot.Streams[1].Credit != 1 || snapshot.Streams[1].Enqueued != 2 {
t.Fatalf("stream metrics = %+v, want credit 1 and enqueued 2", snapshot.Streams[1])
}
}
func TestSessionResetDropsOnlySelectedStream(t *testing.T) {
session := NewSession(SessionConfig{})
mustOpenStream(t, session, 1, TrafficClassBulk)
mustOpenStream(t, session, 2, TrafficClassControl)
mustEnqueue(t, session, 1, "bulk")
mustEnqueue(t, session, 2, "control")
if err := session.ResetStream(1); err != nil {
t.Fatalf("reset stream: %v", err)
}
frame, ok := session.DequeueNext()
if !ok || frame.StreamID != 2 {
t.Fatalf("dequeued frame = %+v ok=%v, want control stream 2", frame, ok)
}
if _, ok := session.DequeueNext(); ok {
t.Fatal("unexpected frame after reset dropped bulk queue")
}
snapshot := session.Snapshot()
if snapshot.StreamsReset != 1 || snapshot.Streams[1].State != StreamStateReset || snapshot.Streams[2].Dequeued != 1 {
t.Fatalf("snapshot = %+v", snapshot)
}
}
func TestSessionAckUpdatesMetrics(t *testing.T) {
session := NewSession(SessionConfig{})
mustOpenStream(t, session, 1, TrafficClassReliable)
mustEnqueue(t, session, 1, "one")
mustEnqueue(t, session, 1, "two")
if err := session.Ack(1, 2); err != nil {
t.Fatalf("ack: %v", err)
}
snapshot := session.Snapshot()
if snapshot.FramesAcked != 2 || snapshot.Streams[1].Acked != 2 {
t.Fatalf("ack metrics = %+v stream=%+v", snapshot, snapshot.Streams[1])
}
}
func TestSessionCloseRejectsNewData(t *testing.T) {
session := NewSession(SessionConfig{})
mustOpenStream(t, session, 1, TrafficClassReliable)
session.Close()
if _, err := session.EnqueueData(1, []byte("after-close")); !errors.Is(err, ErrSessionClosed) {
t.Fatalf("enqueue after close err = %v, want %v", err, ErrSessionClosed)
}
}
func mustOpenStream(t *testing.T, session *Session, streamID uint64, trafficClass TrafficClass) {
t.Helper()
if err := session.OpenStream(streamID, trafficClass); err != nil {
t.Fatalf("open stream %d: %v", streamID, err)
}
}
func mustEnqueue(t *testing.T, session *Session, streamID uint64, payload string) Frame {
t.Helper()
frame, err := session.EnqueueData(streamID, []byte(payload))
if err != nil {
t.Fatalf("enqueue stream %d: %v", streamID, err)
}
return frame
}
@@ -244,6 +244,8 @@ Deliverables:
### Stage FNP-2: Persistent Session Runtime Skeleton
Status: started in `agents/rap-node-agent/internal/fabricproto`.
Deliverables:
- implement in-memory session runtime with streams, sequence numbers, ACK,