803 lines
28 KiB
Go
803 lines
28 KiB
Go
package mesh
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestMeshHealthAcceptsSameCluster(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
server := httptest.NewServer(Server{Local: local}.Handler())
|
|
defer server.Close()
|
|
|
|
client := NewClient(server.URL)
|
|
ack, err := client.SendHealth(context.Background(), NewHealthMessage(
|
|
PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"},
|
|
local,
|
|
))
|
|
if err != nil {
|
|
t.Fatalf("send health: %v", err)
|
|
}
|
|
if !ack.Accepted || ack.By.NodeID != "node-b" {
|
|
t.Fatalf("unexpected ack: %+v", ack)
|
|
}
|
|
}
|
|
|
|
func TestMeshHealthRejectsClusterMismatch(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
server := httptest.NewServer(Server{Local: local}.Handler())
|
|
defer server.Close()
|
|
|
|
message := NewHealthMessage(PeerIdentity{ClusterID: "cluster-2", NodeID: "node-a"}, local)
|
|
payload, err := json.Marshal(message)
|
|
if err != nil {
|
|
t.Fatalf("marshal message: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/health", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post health: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusForbidden {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusForbidden)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingDisabled(t *testing.T) {
|
|
server := httptest.NewServer(Server{Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}}.Handler())
|
|
defer server.Close()
|
|
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/octet-stream", bytes.NewReader([]byte("payload")))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusNotImplemented {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusNotImplemented)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateEnabledStillHasNoProductionRuntime(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
payload, err := json.Marshal(validProductionEnvelope(local))
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusNotImplemented {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusNotImplemented)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateDeliversFabricControlAtDestination(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-c"}
|
|
var events []ProductionForwardLogEntry
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionForwardLogger: func(entry ProductionForwardLogEntry) {
|
|
events = append(events, entry)
|
|
},
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
envelope := validProductionEnvelope(local)
|
|
envelope.SourceNodeID = "node-a"
|
|
envelope.DestinationNodeID = local.NodeID
|
|
envelope.CurrentHopNodeID = local.NodeID
|
|
envelope.NextHopNodeID = local.NodeID
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusOK)
|
|
}
|
|
var result ProductionForwardResult
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
t.Fatalf("decode result: %v", err)
|
|
}
|
|
if !result.Accepted || !result.Delivered || result.Forwarded || result.By.NodeID != local.NodeID {
|
|
t.Fatalf("unexpected result: %+v", result)
|
|
}
|
|
if !hasProductionForwardEvent(events, "production_forward_accepted") || !hasProductionForwardEvent(events, "production_forward_delivered") {
|
|
t.Fatalf("missing production forward events: %+v", events)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateForwardsDirectFabricControlToNextHop(t *testing.T) {
|
|
nodeC := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-c"}
|
|
var deliveredObservation ProductionEnvelopeObservation
|
|
serverC := httptest.NewServer(Server{
|
|
Local: nodeC,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionEnvelopeObserver: func(_ context.Context, observation ProductionEnvelopeObservation) error {
|
|
deliveredObservation = observation
|
|
return nil
|
|
},
|
|
}.Handler())
|
|
defer serverC.Close()
|
|
|
|
nodeB := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
serverB := httptest.NewServer(Server{
|
|
Local: nodeB,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionForwardTransport: NewHTTPProductionForwardTransport(map[string]string{
|
|
nodeC.NodeID: serverC.URL,
|
|
}),
|
|
}.Handler())
|
|
defer serverB.Close()
|
|
|
|
envelope := validProductionEnvelope(nodeB)
|
|
envelope.SourceNodeID = "node-a"
|
|
envelope.DestinationNodeID = nodeC.NodeID
|
|
envelope.CurrentHopNodeID = nodeB.NodeID
|
|
envelope.NextHopNodeID = nodeC.NodeID
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(serverB.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusOK)
|
|
}
|
|
var result ProductionForwardResult
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
t.Fatalf("decode result: %v", err)
|
|
}
|
|
if !result.Accepted || !result.Forwarded || !result.Delivered || result.NextNodeID != nodeC.NodeID || result.By.NodeID != nodeB.NodeID {
|
|
t.Fatalf("unexpected forward result: %+v", result)
|
|
}
|
|
if deliveredObservation.CurrentHopNodeID != nodeC.NodeID || deliveredObservation.MessageID != envelope.MessageID {
|
|
t.Fatalf("destination did not observe forwarded envelope: %+v", deliveredObservation)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateForwardsMultiHopFabricControlByRoutePath(t *testing.T) {
|
|
nodeC := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-c"}
|
|
var deliveredObservation ProductionEnvelopeObservation
|
|
var nodeREvents []ProductionForwardLogEntry
|
|
var nodeBEvents []ProductionForwardLogEntry
|
|
serverC := httptest.NewServer(Server{
|
|
Local: nodeC,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionEnvelopeObserver: func(_ context.Context, observation ProductionEnvelopeObservation) error {
|
|
deliveredObservation = observation
|
|
return nil
|
|
},
|
|
}.Handler())
|
|
defer serverC.Close()
|
|
|
|
nodeR := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-r"}
|
|
serverR := httptest.NewServer(Server{
|
|
Local: nodeR,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionForwardTransport: NewHTTPProductionForwardTransport(map[string]string{
|
|
nodeC.NodeID: serverC.URL,
|
|
}),
|
|
ProductionForwardLogger: func(entry ProductionForwardLogEntry) {
|
|
nodeREvents = append(nodeREvents, entry)
|
|
},
|
|
}.Handler())
|
|
defer serverR.Close()
|
|
|
|
nodeB := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
serverB := httptest.NewServer(Server{
|
|
Local: nodeB,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionForwardTransport: NewHTTPProductionForwardTransport(map[string]string{
|
|
nodeR.NodeID: serverR.URL,
|
|
}),
|
|
ProductionForwardLogger: func(entry ProductionForwardLogEntry) {
|
|
nodeBEvents = append(nodeBEvents, entry)
|
|
},
|
|
}.Handler())
|
|
defer serverB.Close()
|
|
|
|
envelope := validProductionEnvelope(nodeB)
|
|
envelope.SourceNodeID = "node-a"
|
|
envelope.DestinationNodeID = nodeC.NodeID
|
|
envelope.CurrentHopNodeID = nodeB.NodeID
|
|
envelope.NextHopNodeID = nodeR.NodeID
|
|
envelope.RoutePath = []string{"node-a", nodeB.NodeID, nodeR.NodeID, nodeC.NodeID}
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(serverB.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusOK)
|
|
}
|
|
var result ProductionForwardResult
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
t.Fatalf("decode result: %v", err)
|
|
}
|
|
if !result.Accepted || !result.Forwarded || !result.Delivered || result.NextNodeID != nodeR.NodeID || result.By.NodeID != nodeB.NodeID {
|
|
t.Fatalf("unexpected multi-hop result: %+v", result)
|
|
}
|
|
if deliveredObservation.CurrentHopNodeID != nodeC.NodeID || deliveredObservation.NextHopNodeID != nodeC.NodeID {
|
|
t.Fatalf("destination did not observe final hop: %+v", deliveredObservation)
|
|
}
|
|
if len(deliveredObservation.VisitedNodeIDs) != 2 || deliveredObservation.VisitedNodeIDs[0] != nodeB.NodeID || deliveredObservation.VisitedNodeIDs[1] != nodeR.NodeID {
|
|
t.Fatalf("visited path not propagated: %+v", deliveredObservation.VisitedNodeIDs)
|
|
}
|
|
if !hasProductionForwardEvent(nodeBEvents, "production_forward_forwarded") || !hasProductionForwardEvent(nodeREvents, "production_forward_forwarded") {
|
|
t.Fatalf("missing relay forward events: nodeB=%+v nodeR=%+v", nodeBEvents, nodeREvents)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateForwardsConfiguredProductionRoute(t *testing.T) {
|
|
nodeC := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-c"}
|
|
route := configuredProductionRoute("route-1", []string{"node-a", "node-b", "node-r", nodeC.NodeID})
|
|
var deliveredObservation ProductionEnvelopeObservation
|
|
serverC := httptest.NewServer(Server{
|
|
Local: nodeC,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionRoutes: []SyntheticRoute{route},
|
|
ProductionEnvelopeObserver: func(_ context.Context, observation ProductionEnvelopeObservation) error {
|
|
deliveredObservation = observation
|
|
return nil
|
|
},
|
|
}.Handler())
|
|
defer serverC.Close()
|
|
|
|
nodeR := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-r"}
|
|
serverR := httptest.NewServer(Server{
|
|
Local: nodeR,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionRoutes: []SyntheticRoute{route},
|
|
ProductionForwardTransport: NewHTTPProductionForwardTransport(map[string]string{
|
|
nodeC.NodeID: serverC.URL,
|
|
}),
|
|
}.Handler())
|
|
defer serverR.Close()
|
|
|
|
nodeB := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
serverB := httptest.NewServer(Server{
|
|
Local: nodeB,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionRoutes: []SyntheticRoute{route},
|
|
ProductionForwardTransport: NewHTTPProductionForwardTransport(map[string]string{
|
|
nodeR.NodeID: serverR.URL,
|
|
}),
|
|
}.Handler())
|
|
defer serverB.Close()
|
|
|
|
envelope := validProductionEnvelope(nodeB)
|
|
envelope.SourceNodeID = "node-a"
|
|
envelope.DestinationNodeID = nodeC.NodeID
|
|
envelope.CurrentHopNodeID = nodeB.NodeID
|
|
envelope.NextHopNodeID = nodeR.NodeID
|
|
envelope.RoutePath = route.Hops
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(serverB.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusOK)
|
|
}
|
|
if deliveredObservation.RouteID != route.RouteID || deliveredObservation.CurrentHopNodeID != nodeC.NodeID {
|
|
t.Fatalf("configured route was not delivered: %+v", deliveredObservation)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateRejectsUnknownConfiguredProductionRoute(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionRoutes: []SyntheticRoute{
|
|
configuredProductionRoute("route-other", []string{"node-a", local.NodeID, "node-c"}),
|
|
},
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
envelope := validProductionEnvelope(local)
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusNotFound {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusNotFound)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateRejectsConfiguredProductionRouteWrongNextHop(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
route := configuredProductionRoute("route-1", []string{"node-a", local.NodeID, "node-r", "node-c"})
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionRoutes: []SyntheticRoute{route},
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
envelope := validProductionEnvelope(local)
|
|
envelope.SourceNodeID = "node-a"
|
|
envelope.DestinationNodeID = "node-c"
|
|
envelope.CurrentHopNodeID = local.NodeID
|
|
envelope.NextHopNodeID = "node-c"
|
|
envelope.RoutePath = route.Hops
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusBadRequest {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusBadRequest)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateRejectsRoutePathWrongNextHop(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
var events []ProductionForwardLogEntry
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionForwardLogger: func(entry ProductionForwardLogEntry) {
|
|
events = append(events, entry)
|
|
},
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
envelope := validProductionEnvelope(local)
|
|
envelope.SourceNodeID = "node-a"
|
|
envelope.DestinationNodeID = "node-c"
|
|
envelope.CurrentHopNodeID = local.NodeID
|
|
envelope.NextHopNodeID = "node-x"
|
|
envelope.RoutePath = []string{"node-a", local.NodeID, "node-r", "node-c"}
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusBadRequest {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusBadRequest)
|
|
}
|
|
if !hasProductionForwardEvent(events, "production_forward_rejected") {
|
|
t.Fatalf("missing reject event: %+v", events)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateRejectsRoutePathLoop(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
envelope := validProductionEnvelope(local)
|
|
envelope.SourceNodeID = "node-a"
|
|
envelope.DestinationNodeID = "node-c"
|
|
envelope.CurrentHopNodeID = local.NodeID
|
|
envelope.NextHopNodeID = "node-r"
|
|
envelope.RoutePath = []string{"node-a", local.NodeID, "node-r", local.NodeID, "node-c"}
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusForbidden {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusForbidden)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateRejectsInvalidProductionEnvelope(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
envelope := validProductionEnvelope(local)
|
|
envelope.PayloadHash = "bad-hash"
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusBadRequest {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusBadRequest)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateRejectsOversizedProductionEnvelopePayload(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
observed := false
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionEnvelopeObserver: func(context.Context, ProductionEnvelopeObservation) error {
|
|
observed = true
|
|
return nil
|
|
},
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
envelope := validProductionEnvelope(local)
|
|
envelope.Payload = json.RawMessage(`"` + string(bytes.Repeat([]byte("a"), MaxProductionEnvelopePayloadBytes+1)) + `"`)
|
|
sum := sha256.Sum256(envelope.Payload)
|
|
envelope.PayloadLength = len(envelope.Payload)
|
|
envelope.PayloadHash = hex.EncodeToString(sum[:])
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusBadRequest {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusBadRequest)
|
|
}
|
|
if observed {
|
|
t.Fatal("observer called for oversized envelope")
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateRejectsFutureCreatedAt(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
observed := false
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionEnvelopeObserver: func(context.Context, ProductionEnvelopeObservation) error {
|
|
observed = true
|
|
return nil
|
|
},
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
envelope := validProductionEnvelope(local)
|
|
envelope.CreatedAt = time.Now().UTC().Add(MaxProductionEnvelopeFutureSkew + time.Second)
|
|
envelope.ExpiresAt = envelope.CreatedAt.Add(time.Minute)
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusBadRequest {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusBadRequest)
|
|
}
|
|
if observed {
|
|
t.Fatal("observer called for future-created envelope")
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateObservesValidEnvelopeWithoutPayload(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
var observed ProductionEnvelopeObservation
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionEnvelopeObserver: func(_ context.Context, observation ProductionEnvelopeObservation) error {
|
|
observed = observation
|
|
return nil
|
|
},
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
envelope := validProductionEnvelope(local)
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusNotImplemented {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusNotImplemented)
|
|
}
|
|
if observed.MessageID != envelope.MessageID || observed.RouteID != envelope.RouteID {
|
|
t.Fatalf("unexpected observation: %+v", observed)
|
|
}
|
|
if observed.PayloadHash != envelope.PayloadHash || observed.PayloadLength != envelope.PayloadLength {
|
|
t.Fatalf("payload metadata missing from observation: %+v", observed)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateDoesNotObserveRejectedEnvelope(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
observed := false
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionEnvelopeObserver: func(context.Context, ProductionEnvelopeObservation) error {
|
|
observed = true
|
|
return nil
|
|
},
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
envelope := validProductionEnvelope(local)
|
|
envelope.ClusterID = "wrong-cluster"
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusForbidden {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusForbidden)
|
|
}
|
|
if observed {
|
|
t.Fatal("observer called for rejected envelope")
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateFailsClosedWhenObservationFails(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionEnvelopeObserver: func(context.Context, ProductionEnvelopeObservation) error {
|
|
return errors.New("observer down")
|
|
},
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
payload, err := json.Marshal(validProductionEnvelope(local))
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusInternalServerError {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusInternalServerError)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateFailsClosedWhenObservationPanics(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionEnvelopeObserver: func(context.Context, ProductionEnvelopeObservation) error {
|
|
panic("observer panic")
|
|
},
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
payload, err := json.Marshal(validProductionEnvelope(local))
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusInternalServerError {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusInternalServerError)
|
|
}
|
|
}
|
|
|
|
func TestObserveProductionEnvelopeAllowsNilObserver(t *testing.T) {
|
|
if err := observeProductionEnvelope(context.Background(), nil, ProductionEnvelopeObservation{}); err != nil {
|
|
t.Fatalf("observeProductionEnvelope nil observer err = %v", err)
|
|
}
|
|
}
|
|
|
|
func TestProductionEnvelopeObservationSinkKeepsBoundedMetadata(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
sink := NewProductionEnvelopeObservationSink(2)
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
ProductionEnvelopeObserver: sink.Observe,
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
for i := 1; i <= 3; i++ {
|
|
envelope := validProductionEnvelope(local)
|
|
envelope.MessageID = "message-" + string(rune('0'+i))
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
resp.Body.Close()
|
|
if resp.StatusCode != http.StatusNotImplemented {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusNotImplemented)
|
|
}
|
|
}
|
|
|
|
observations := sink.Snapshot()
|
|
if len(observations) != 2 {
|
|
t.Fatalf("observation count = %d, want 2", len(observations))
|
|
}
|
|
if observations[0].MessageID != "message-2" || observations[1].MessageID != "message-3" {
|
|
t.Fatalf("unexpected bounded observations: %+v", observations)
|
|
}
|
|
if observations[0].PayloadHash == "" || observations[0].PayloadLength == 0 {
|
|
t.Fatalf("payload metadata missing from bounded observation: %+v", observations[0])
|
|
}
|
|
metrics := sink.Metrics()
|
|
if metrics.Capacity != 2 || metrics.CurrentDepth != 2 || metrics.AcceptedTotal != 3 || metrics.DroppedOldest != 1 {
|
|
t.Fatalf("unexpected sink metrics: %+v", metrics)
|
|
}
|
|
}
|
|
|
|
func TestProductionEnvelopeObservationSinkMetricsStartEmpty(t *testing.T) {
|
|
sink := NewProductionEnvelopeObservationSink(3)
|
|
metrics := sink.Metrics()
|
|
if metrics.Capacity != 3 || metrics.CurrentDepth != 0 || metrics.AcceptedTotal != 0 || metrics.DroppedOldest != 0 {
|
|
t.Fatalf("unexpected empty metrics: %+v", metrics)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingGateRejectsServiceChannel(t *testing.T) {
|
|
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}
|
|
server := httptest.NewServer(Server{
|
|
Local: local,
|
|
ProductionForwardingEnabled: true,
|
|
}.Handler())
|
|
defer server.Close()
|
|
|
|
envelope := validProductionEnvelope(local)
|
|
envelope.ChannelClass = "render"
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
resp, err := http.Post(server.URL+"/mesh/v1/forward", "application/json", bytes.NewReader(payload))
|
|
if err != nil {
|
|
t.Fatalf("post forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusForbidden {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusForbidden)
|
|
}
|
|
}
|
|
|
|
func TestMeshForwardingRequiresPost(t *testing.T) {
|
|
server := httptest.NewServer(Server{Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}}.Handler())
|
|
defer server.Close()
|
|
|
|
resp, err := http.Get(server.URL + "/mesh/v1/forward")
|
|
if err != nil {
|
|
t.Fatalf("get forward: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusMethodNotAllowed {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusMethodNotAllowed)
|
|
}
|
|
}
|
|
|
|
func validProductionEnvelope(local PeerIdentity) ProductionEnvelope {
|
|
payload := json.RawMessage(`{"kind":"control"}`)
|
|
sum := sha256.Sum256(payload)
|
|
now := time.Now().UTC()
|
|
return ProductionEnvelope{
|
|
FabricProtocolVersion: ProtocolVersion,
|
|
MessageID: "message-1",
|
|
RouteID: "route-1",
|
|
ClusterID: local.ClusterID,
|
|
SourceNodeID: "node-a",
|
|
DestinationNodeID: "node-c",
|
|
CurrentHopNodeID: local.NodeID,
|
|
NextHopNodeID: "node-c",
|
|
ChannelClass: ProductionChannelFabricControl,
|
|
MessageType: ProductionMessageFabricControl,
|
|
TTL: 4,
|
|
HopCount: 1,
|
|
CreatedAt: now,
|
|
ExpiresAt: now.Add(time.Minute),
|
|
PayloadLength: len(payload),
|
|
PayloadHash: hex.EncodeToString(sum[:]),
|
|
Payload: payload,
|
|
}
|
|
}
|
|
|
|
func configuredProductionRoute(routeID string, hops []string) SyntheticRoute {
|
|
return SyntheticRoute{
|
|
RouteID: routeID,
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: hops[0],
|
|
DestinationNodeID: hops[len(hops)-1],
|
|
Hops: append([]string{}, hops...),
|
|
AllowedChannels: []string{ProductionChannelFabricControl},
|
|
ExpiresAt: time.Now().UTC().Add(time.Hour),
|
|
MaxTTL: 8,
|
|
MaxHops: 8,
|
|
}
|
|
}
|
|
|
|
func hasProductionForwardEvent(events []ProductionForwardLogEntry, event string) bool {
|
|
for _, item := range events {
|
|
if item.Event == event {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func TestSyntheticEndpointDisabledByDefault(t *testing.T) {
|
|
server := httptest.NewServer(Server{Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}}.Handler())
|
|
defer server.Close()
|
|
|
|
resp, err := http.Post(server.URL+"/mesh/v1/synthetic/probe", "application/json", bytes.NewReader([]byte(`{}`)))
|
|
if err != nil {
|
|
t.Fatalf("post synthetic probe: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusServiceUnavailable {
|
|
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusServiceUnavailable)
|
|
}
|
|
}
|