Reopen closed fabric peer sessions

This commit is contained in:
2026-05-16 09:54:02 +03:00
parent 03efff6770
commit 057e3b65a7
4 changed files with 75 additions and 5 deletions
@@ -293,6 +293,18 @@ func (p *FabricSessionPump) Errors() <-chan error {
return p.errors return p.errors
} }
func (p *FabricSessionPump) Closed() bool {
if p == nil {
return true
}
select {
case <-p.done:
return true
default:
return false
}
}
func (p *FabricSessionPump) Close() error { func (p *FabricSessionPump) Close() error {
if p == nil { if p == nil {
return nil return nil
@@ -35,9 +35,13 @@ func (m *FabricSessionPeerManager) Get(ctx context.Context, target FabricSession
} }
m.mu.Lock() m.mu.Lock()
if pump := m.sessions[key]; pump != nil { if pump := m.sessions[key]; pump != nil {
if pump.Closed() {
delete(m.sessions, key)
} else {
m.mu.Unlock() m.mu.Unlock()
return pump, nil return pump, nil
} }
}
m.mu.Unlock() m.mu.Unlock()
session, _, err := NewClient(target.BaseURL).OpenFabricSession(ctx, target.Options) session, _, err := NewClient(target.BaseURL).OpenFabricSession(ctx, target.Options)
@@ -48,10 +52,14 @@ func (m *FabricSessionPeerManager) Get(ctx context.Context, target FabricSession
m.mu.Lock() m.mu.Lock()
if existing := m.sessions[key]; existing != nil { if existing := m.sessions[key]; existing != nil {
if existing.Closed() {
delete(m.sessions, key)
} else {
m.mu.Unlock() m.mu.Unlock()
_ = pump.Close() _ = pump.Close()
return existing, nil return existing, nil
} }
}
if m.sessions == nil { if m.sessions == nil {
m.sessions = map[string]*FabricSessionPump{} m.sessions = map[string]*FabricSessionPump{}
} }
@@ -117,6 +117,54 @@ func TestFabricSessionPeerManagerClosePeerReopens(t *testing.T) {
} }
} }
func TestFabricSessionPeerManagerReopensClosedPump(t *testing.T) {
var opened int
server := httptest.NewServer(Server{
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"},
FabricSessionEnabled: true,
FabricSessionLogger: func(entry FabricSessionEventLogEntry) {
if entry.Event == "fabric_session_websocket_opened" {
opened++
}
},
}.Handler())
defer server.Close()
manager := NewFabricSessionPeerManager()
defer manager.Close()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
target := FabricSessionPeerTarget{
PeerID: "node-a",
BaseURL: server.URL,
Options: FabricSessionDialOptions{
Token: "rap_fsn_manager_closed",
Timeout: time.Second,
},
}
first, err := manager.Get(ctx, target)
if err != nil {
t.Fatalf("first get: %v", err)
}
if err := first.Close(); err != nil {
t.Fatalf("close first pump: %v", err)
}
if !first.Closed() {
t.Fatal("first pump should report closed")
}
second, err := manager.Get(ctx, target)
if err != nil {
t.Fatalf("second get: %v", err)
}
if first == second {
t.Fatal("manager reused closed pump")
}
if opened != 2 {
t.Fatalf("opened sessions = %d, want 2", opened)
}
}
func TestFabricSessionPeerManagerRejectsIncompleteTarget(t *testing.T) { func TestFabricSessionPeerManagerRejectsIncompleteTarget(t *testing.T) {
manager := NewFabricSessionPeerManager() manager := NewFabricSessionPeerManager()
_, err := manager.Get(context.Background(), FabricSessionPeerTarget{PeerID: "node-a"}) _, err := manager.Get(context.Background(), FabricSessionPeerTarget{PeerID: "node-a"})
@@ -287,6 +287,8 @@ rollout independently controllable.
When the VPN fabric-session switch is enabled, node-agent now attempts to use a When the VPN fabric-session switch is enabled, node-agent now attempts to use a
long-lived peer session for gateway packet transport and falls back to the long-lived peer session for gateway packet transport and falls back to the
existing HTTP production envelope path when the peer session is unavailable. existing HTTP production envelope path when the peer session is unavailable.
Peer session reuse now evicts closed pumps before reuse, so failed WebSocket
sessions can be reopened on the next transport acquisition.
Deliverables: Deliverables: