345 lines
7.4 KiB
Go
345 lines
7.4 KiB
Go
package fabricproto
|
|
|
|
import (
|
|
"errors"
|
|
"sort"
|
|
"sync"
|
|
)
|
|
|
|
const (
|
|
DefaultInitialStreamCredit = 32
|
|
)
|
|
|
|
var (
|
|
ErrSessionClosed = errors.New("fabric session is closed")
|
|
ErrStreamExists = errors.New("fabric stream already exists")
|
|
ErrStreamNotFound = errors.New("fabric stream not found")
|
|
ErrStreamClosed = errors.New("fabric stream is closed")
|
|
ErrStreamQueueFull = errors.New("fabric stream queue is full")
|
|
ErrStreamCreditExhausted = errors.New("fabric stream credit is exhausted")
|
|
)
|
|
|
|
type StreamState string
|
|
|
|
const (
|
|
StreamStateOpen StreamState = "open"
|
|
StreamStateClosed StreamState = "closed"
|
|
StreamStateReset StreamState = "reset"
|
|
)
|
|
|
|
type SessionConfig struct {
|
|
InitialStreamCredit int
|
|
ClassQueueCapacity map[TrafficClass]int
|
|
}
|
|
|
|
type Session struct {
|
|
mu sync.Mutex
|
|
closed bool
|
|
streams map[uint64]*stream
|
|
metrics SessionMetrics
|
|
cfg SessionConfig
|
|
}
|
|
|
|
type stream struct {
|
|
id uint64
|
|
trafficClass TrafficClass
|
|
state StreamState
|
|
nextSequence uint64
|
|
credit int
|
|
queue []Frame
|
|
metrics StreamMetrics
|
|
}
|
|
|
|
type SessionMetrics struct {
|
|
StreamsOpened uint64
|
|
StreamsClosed uint64
|
|
StreamsReset uint64
|
|
FramesEnqueued uint64
|
|
FramesDequeued uint64
|
|
FramesReceived uint64
|
|
FramesAcked uint64
|
|
FramesDropped uint64
|
|
CreditExhausted uint64
|
|
QueueFull uint64
|
|
QueueDepths map[uint64]int
|
|
Streams map[uint64]StreamMetrics
|
|
}
|
|
|
|
type StreamMetrics struct {
|
|
StreamID uint64
|
|
TrafficClass TrafficClass
|
|
State StreamState
|
|
NextSequence uint64
|
|
Credit int
|
|
QueueDepth int
|
|
Enqueued uint64
|
|
Dequeued uint64
|
|
Received uint64
|
|
Acked uint64
|
|
Dropped uint64
|
|
CreditBlocked uint64
|
|
QueueBlocked uint64
|
|
}
|
|
|
|
func NewSession(cfg SessionConfig) *Session {
|
|
return &Session{
|
|
streams: map[uint64]*stream{},
|
|
metrics: SessionMetrics{
|
|
QueueDepths: map[uint64]int{},
|
|
Streams: map[uint64]StreamMetrics{},
|
|
},
|
|
cfg: normalizeSessionConfig(cfg),
|
|
}
|
|
}
|
|
|
|
func (s *Session) OpenStream(streamID uint64, trafficClass TrafficClass) error {
|
|
if streamID == 0 {
|
|
return ErrInvalidStreamID
|
|
}
|
|
if !KnownTrafficClass(trafficClass) {
|
|
return ErrUnknownTraffic
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.closed {
|
|
return ErrSessionClosed
|
|
}
|
|
if _, ok := s.streams[streamID]; ok {
|
|
return ErrStreamExists
|
|
}
|
|
s.streams[streamID] = &stream{
|
|
id: streamID,
|
|
trafficClass: trafficClass,
|
|
state: StreamStateOpen,
|
|
credit: s.cfg.InitialStreamCredit,
|
|
}
|
|
s.metrics.StreamsOpened++
|
|
return nil
|
|
}
|
|
|
|
func (s *Session) EnqueueData(streamID uint64, payload []byte) (Frame, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.closed {
|
|
return Frame{}, ErrSessionClosed
|
|
}
|
|
st, err := s.openStreamLocked(streamID)
|
|
if err != nil {
|
|
return Frame{}, err
|
|
}
|
|
if st.credit <= 0 {
|
|
st.metrics.CreditBlocked++
|
|
s.metrics.CreditExhausted++
|
|
s.metrics.FramesDropped++
|
|
return Frame{}, ErrStreamCreditExhausted
|
|
}
|
|
if len(st.queue) >= s.queueCapacity(st.trafficClass) {
|
|
st.metrics.QueueBlocked++
|
|
st.metrics.Dropped++
|
|
s.metrics.QueueFull++
|
|
s.metrics.FramesDropped++
|
|
return Frame{}, ErrStreamQueueFull
|
|
}
|
|
st.nextSequence++
|
|
st.credit--
|
|
frame := Frame{
|
|
Type: FrameData,
|
|
TrafficClass: st.trafficClass,
|
|
StreamID: streamID,
|
|
Sequence: st.nextSequence,
|
|
Payload: append([]byte(nil), payload...),
|
|
}
|
|
st.queue = append(st.queue, frame)
|
|
st.metrics.Enqueued++
|
|
s.metrics.FramesEnqueued++
|
|
return frame, nil
|
|
}
|
|
|
|
func (s *Session) DequeueNext() (Frame, bool) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.closed {
|
|
return Frame{}, false
|
|
}
|
|
for _, trafficClass := range priorityOrder() {
|
|
streams := s.streamsByClassLocked(trafficClass)
|
|
for _, st := range streams {
|
|
if len(st.queue) == 0 || st.state != StreamStateOpen {
|
|
continue
|
|
}
|
|
frame := st.queue[0]
|
|
st.queue = st.queue[1:]
|
|
st.metrics.Dequeued++
|
|
s.metrics.FramesDequeued++
|
|
return frame, true
|
|
}
|
|
}
|
|
return Frame{}, false
|
|
}
|
|
|
|
func (s *Session) Ack(streamID uint64, sequence uint64) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
st, ok := s.streams[streamID]
|
|
if !ok {
|
|
return ErrStreamNotFound
|
|
}
|
|
if sequence > st.metrics.Acked {
|
|
delta := sequence - st.metrics.Acked
|
|
st.metrics.Acked = sequence
|
|
s.metrics.FramesAcked += delta
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Session) AddCredit(streamID uint64, frames int) error {
|
|
if frames <= 0 {
|
|
return nil
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
st, ok := s.streams[streamID]
|
|
if !ok {
|
|
return ErrStreamNotFound
|
|
}
|
|
if st.state != StreamStateOpen {
|
|
return ErrStreamClosed
|
|
}
|
|
st.credit += frames
|
|
return nil
|
|
}
|
|
|
|
func (s *Session) CloseStream(streamID uint64) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
st, ok := s.streams[streamID]
|
|
if !ok {
|
|
return ErrStreamNotFound
|
|
}
|
|
if st.state == StreamStateClosed {
|
|
return nil
|
|
}
|
|
st.queue = nil
|
|
st.state = StreamStateClosed
|
|
s.metrics.StreamsClosed++
|
|
return nil
|
|
}
|
|
|
|
func (s *Session) ResetStream(streamID uint64) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
st, ok := s.streams[streamID]
|
|
if !ok {
|
|
return ErrStreamNotFound
|
|
}
|
|
if st.state == StreamStateReset {
|
|
return nil
|
|
}
|
|
dropped := len(st.queue)
|
|
st.queue = nil
|
|
st.state = StreamStateReset
|
|
st.metrics.Dropped += uint64(dropped)
|
|
s.metrics.FramesDropped += uint64(dropped)
|
|
s.metrics.StreamsReset++
|
|
return nil
|
|
}
|
|
|
|
func (s *Session) Close() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.closed = true
|
|
for _, st := range s.streams {
|
|
st.queue = nil
|
|
if st.state == StreamStateOpen {
|
|
st.state = StreamStateClosed
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Session) Snapshot() SessionMetrics {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
out := s.metrics
|
|
out.QueueDepths = map[uint64]int{}
|
|
out.Streams = map[uint64]StreamMetrics{}
|
|
for streamID, st := range s.streams {
|
|
metrics := st.metrics
|
|
metrics.StreamID = streamID
|
|
metrics.TrafficClass = st.trafficClass
|
|
metrics.State = st.state
|
|
metrics.NextSequence = st.nextSequence
|
|
metrics.Credit = st.credit
|
|
metrics.QueueDepth = len(st.queue)
|
|
out.QueueDepths[streamID] = len(st.queue)
|
|
out.Streams[streamID] = metrics
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (s *Session) openStreamLocked(streamID uint64) (*stream, error) {
|
|
st, ok := s.streams[streamID]
|
|
if !ok {
|
|
return nil, ErrStreamNotFound
|
|
}
|
|
if st.state != StreamStateOpen {
|
|
return nil, ErrStreamClosed
|
|
}
|
|
return st, nil
|
|
}
|
|
|
|
func (s *Session) queueCapacity(trafficClass TrafficClass) int {
|
|
if capacity := s.cfg.ClassQueueCapacity[trafficClass]; capacity > 0 {
|
|
return capacity
|
|
}
|
|
return defaultClassQueueCapacity(trafficClass)
|
|
}
|
|
|
|
func (s *Session) streamsByClassLocked(trafficClass TrafficClass) []*stream {
|
|
var out []*stream
|
|
for _, st := range s.streams {
|
|
if st.trafficClass == trafficClass {
|
|
out = append(out, st)
|
|
}
|
|
}
|
|
sort.Slice(out, func(i, j int) bool {
|
|
return out[i].id < out[j].id
|
|
})
|
|
return out
|
|
}
|
|
|
|
func normalizeSessionConfig(cfg SessionConfig) SessionConfig {
|
|
if cfg.InitialStreamCredit <= 0 {
|
|
cfg.InitialStreamCredit = DefaultInitialStreamCredit
|
|
}
|
|
if cfg.ClassQueueCapacity == nil {
|
|
cfg.ClassQueueCapacity = map[TrafficClass]int{}
|
|
}
|
|
return cfg
|
|
}
|
|
|
|
func priorityOrder() []TrafficClass {
|
|
return []TrafficClass{
|
|
TrafficClassControl,
|
|
TrafficClassDNS,
|
|
TrafficClassInteractive,
|
|
TrafficClassReliable,
|
|
TrafficClassBulk,
|
|
TrafficClassDroppable,
|
|
}
|
|
}
|
|
|
|
func defaultClassQueueCapacity(trafficClass TrafficClass) int {
|
|
switch trafficClass {
|
|
case TrafficClassControl, TrafficClassDNS, TrafficClassInteractive:
|
|
return 128
|
|
case TrafficClassReliable:
|
|
return 64
|
|
case TrafficClassBulk:
|
|
return 16
|
|
case TrafficClassDroppable:
|
|
return 8
|
|
default:
|
|
return 32
|
|
}
|
|
}
|