From 057e3b65a77c94e743b663d04db82724ff38a88f Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 09:54:02 +0300 Subject: [PATCH] Reopen closed fabric peer sessions --- agents/rap-node-agent/internal/mesh/client.go | 12 +++++ .../internal/mesh/fabric_session_manager.go | 18 +++++-- .../mesh/fabric_session_manager_test.go | 48 +++++++++++++++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 2 + 4 files changed, 75 insertions(+), 5 deletions(-) diff --git a/agents/rap-node-agent/internal/mesh/client.go b/agents/rap-node-agent/internal/mesh/client.go index 0f17963..57bdbe1 100644 --- a/agents/rap-node-agent/internal/mesh/client.go +++ b/agents/rap-node-agent/internal/mesh/client.go @@ -293,6 +293,18 @@ func (p *FabricSessionPump) Errors() <-chan error { return p.errors } +func (p *FabricSessionPump) Closed() bool { + if p == nil { + return true + } + select { + case <-p.done: + return true + default: + return false + } +} + func (p *FabricSessionPump) Close() error { if p == nil { return nil diff --git a/agents/rap-node-agent/internal/mesh/fabric_session_manager.go b/agents/rap-node-agent/internal/mesh/fabric_session_manager.go index b7ad282..4b83a82 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_session_manager.go +++ b/agents/rap-node-agent/internal/mesh/fabric_session_manager.go @@ -35,8 +35,12 @@ func (m *FabricSessionPeerManager) Get(ctx context.Context, target FabricSession } m.mu.Lock() if pump := m.sessions[key]; pump != nil { - m.mu.Unlock() - return pump, nil + if pump.Closed() { + delete(m.sessions, key) + } else { + m.mu.Unlock() + return pump, nil + } } m.mu.Unlock() @@ -48,9 +52,13 @@ func (m *FabricSessionPeerManager) Get(ctx context.Context, target FabricSession m.mu.Lock() if existing := m.sessions[key]; existing != nil { - m.mu.Unlock() - _ = pump.Close() - return existing, nil + if existing.Closed() { + delete(m.sessions, key) + } else { + m.mu.Unlock() + _ = pump.Close() + return existing, nil + } } if m.sessions == nil { m.sessions = map[string]*FabricSessionPump{} diff --git a/agents/rap-node-agent/internal/mesh/fabric_session_manager_test.go b/agents/rap-node-agent/internal/mesh/fabric_session_manager_test.go index 0a421c9..9848a67 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_session_manager_test.go +++ b/agents/rap-node-agent/internal/mesh/fabric_session_manager_test.go @@ -117,6 +117,54 @@ func TestFabricSessionPeerManagerClosePeerReopens(t *testing.T) { } } +func TestFabricSessionPeerManagerReopensClosedPump(t *testing.T) { + var opened int + server := httptest.NewServer(Server{ + Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, + FabricSessionEnabled: true, + FabricSessionLogger: func(entry FabricSessionEventLogEntry) { + if entry.Event == "fabric_session_websocket_opened" { + opened++ + } + }, + }.Handler()) + defer server.Close() + + manager := NewFabricSessionPeerManager() + defer manager.Close() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + target := FabricSessionPeerTarget{ + PeerID: "node-a", + BaseURL: server.URL, + Options: FabricSessionDialOptions{ + Token: "rap_fsn_manager_closed", + Timeout: time.Second, + }, + } + + first, err := manager.Get(ctx, target) + if err != nil { + t.Fatalf("first get: %v", err) + } + if err := first.Close(); err != nil { + t.Fatalf("close first pump: %v", err) + } + if !first.Closed() { + t.Fatal("first pump should report closed") + } + second, err := manager.Get(ctx, target) + if err != nil { + t.Fatalf("second get: %v", err) + } + if first == second { + t.Fatal("manager reused closed pump") + } + if opened != 2 { + t.Fatalf("opened sessions = %d, want 2", opened) + } +} + func TestFabricSessionPeerManagerRejectsIncompleteTarget(t *testing.T) { manager := NewFabricSessionPeerManager() _, err := manager.Get(context.Background(), FabricSessionPeerTarget{PeerID: "node-a"}) diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 5985945..70cd011 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -287,6 +287,8 @@ rollout independently controllable. When the VPN fabric-session switch is enabled, node-agent now attempts to use a long-lived peer session for gateway packet transport and falls back to the existing HTTP production envelope path when the peer session is unavailable. +Peer session reuse now evicts closed pumps before reuse, so failed WebSocket +sessions can be reopened on the next transport acquisition. Deliverables: