124 lines
2.5 KiB
Go
124 lines
2.5 KiB
Go
package broker
|
|
|
|
import (
|
|
"encoding/json"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestSubscribePublishUnsubscribe(t *testing.T) {
|
|
b := New()
|
|
|
|
sub := b.Subscribe()
|
|
|
|
evt := Event{Type: "test.event", AgentID: "agent1", Payload: map[string]string{"k": "v"}}
|
|
b.Publish(evt)
|
|
|
|
select {
|
|
case raw := <-sub:
|
|
var got Event
|
|
if err := json.Unmarshal(raw, &got); err != nil {
|
|
t.Fatalf("unmarshal: %v", err)
|
|
}
|
|
if got.Type != "test.event" || got.AgentID != "agent1" {
|
|
t.Errorf("unexpected event: %+v", got)
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatal("timed out waiting for event")
|
|
}
|
|
|
|
b.Unsubscribe(sub)
|
|
|
|
// channel must be closed after unsubscribe
|
|
select {
|
|
case _, ok := <-sub:
|
|
if ok {
|
|
t.Error("expected channel to be closed")
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatal("timed out waiting for channel close")
|
|
}
|
|
}
|
|
|
|
func TestMultipleSubscribers(t *testing.T) {
|
|
b := New()
|
|
|
|
sub1 := b.Subscribe()
|
|
sub2 := b.Subscribe()
|
|
defer b.Unsubscribe(sub1)
|
|
defer b.Unsubscribe(sub2)
|
|
|
|
b.Publish(Event{Type: "ping", Payload: nil})
|
|
|
|
for i, sub := range []subscriber{sub1, sub2} {
|
|
select {
|
|
case <-sub:
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("subscriber %d did not receive event", i)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestPublishDropsWhenSubscriberSlow(t *testing.T) {
|
|
b := New()
|
|
|
|
// Channel size is 32; fill it up and then publish one more — it must not block.
|
|
sub := b.Subscribe()
|
|
defer b.Unsubscribe(sub)
|
|
|
|
// Fill the buffer
|
|
for i := 0; i < 32; i++ {
|
|
b.Publish(Event{Type: "flood", Payload: i})
|
|
}
|
|
|
|
// This extra publish must return immediately (dropped, not block).
|
|
done := make(chan struct{})
|
|
go func() {
|
|
b.Publish(Event{Type: "dropped", Payload: nil})
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("Publish blocked on slow subscriber")
|
|
}
|
|
}
|
|
|
|
func TestPublishNoSubscribers(t *testing.T) {
|
|
b := New()
|
|
// Should not panic or block
|
|
b.Publish(Event{Type: "nobody", Payload: nil})
|
|
}
|
|
|
|
func TestPublishInvalidPayload(t *testing.T) {
|
|
b := New()
|
|
sub := b.Subscribe()
|
|
defer b.Unsubscribe(sub)
|
|
|
|
// json.Marshal of a channel fails — Publish must not send anything.
|
|
b.Publish(Event{Type: "bad", Payload: make(chan int)})
|
|
|
|
select {
|
|
case <-sub:
|
|
t.Error("should not have received a message for an unmarshalable event")
|
|
default:
|
|
// correct: nothing sent
|
|
}
|
|
}
|
|
|
|
func TestUnsubscribeRemovesFromBroker(t *testing.T) {
|
|
b := New()
|
|
sub := b.Subscribe()
|
|
b.Unsubscribe(sub)
|
|
|
|
// After unsubscribe the broker's map should be empty.
|
|
b.mu.RLock()
|
|
n := len(b.subs)
|
|
b.mu.RUnlock()
|
|
|
|
if n != 0 {
|
|
t.Errorf("expected 0 subscribers after unsubscribe, got %d", n)
|
|
}
|
|
}
|