You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
141 lines
2.5 KiB
141 lines
2.5 KiB
package bus
|
|
|
|
import (
|
|
"context"
|
|
"github.com/rs/xid"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type Emitter struct {
|
|
mu sync.RWMutex
|
|
topics map[string][]ListenerObject
|
|
}
|
|
|
|
func New() *Emitter {
|
|
return &Emitter{
|
|
mu: sync.RWMutex{},
|
|
topics: make(map[string][]ListenerObject),
|
|
}
|
|
}
|
|
|
|
func (e *Emitter) Emit(ctx context.Context, topic string, data any) {
|
|
e.mu.RLock()
|
|
listeners, ok := e.topics[topic]
|
|
e.mu.RUnlock()
|
|
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
stop := make(chan struct{})
|
|
txID, _ := ctx.Value("txid-key").(string)
|
|
source, _ := ctx.Value("source-key").(string)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
if txID == "" {
|
|
txID = xid.New().String()
|
|
}
|
|
|
|
event := Event{
|
|
ID: xid.New().String(),
|
|
TxID: txID,
|
|
Topic: topic,
|
|
Source: source,
|
|
OccurredAt: time.Now(),
|
|
Data: data,
|
|
stopPropagation: func() {
|
|
select {
|
|
case <-stop:
|
|
default:
|
|
close(stop)
|
|
cancel()
|
|
}
|
|
},
|
|
}
|
|
|
|
for _, listener := range listeners {
|
|
select {
|
|
case <-stop:
|
|
return
|
|
default:
|
|
if listener.once != nil {
|
|
listener.once.Do(func() {
|
|
if listener.async {
|
|
go listener.do(ctx, event)
|
|
} else {
|
|
listener.do(ctx, event)
|
|
}
|
|
e.Cancel(topic, listener.do)
|
|
})
|
|
} else if listener.async {
|
|
go listener.do(ctx, event)
|
|
} else {
|
|
listener.do(ctx, event)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Emitter) Listen(topic string, listener Listener, options ...ListenOption) {
|
|
if listener == nil {
|
|
return
|
|
}
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
if e.topics == nil {
|
|
e.topics = make(map[string][]ListenerObject)
|
|
}
|
|
listenerObject := ListenerObject{
|
|
async: false,
|
|
once: nil,
|
|
do: listener,
|
|
ptr: reflect.ValueOf(listener).Pointer(),
|
|
}
|
|
for _, option := range options {
|
|
option(&listenerObject)
|
|
}
|
|
if listeners, has := e.topics[topic]; !has {
|
|
e.topics[topic] = []ListenerObject{listenerObject}
|
|
} else {
|
|
e.topics[topic] = append(listeners, listenerObject)
|
|
}
|
|
}
|
|
|
|
func (e *Emitter) Cancel(topic string, listeners ...Listener) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
if e.topics == nil {
|
|
return
|
|
}
|
|
ls, has := e.topics[topic]
|
|
if !has || len(ls) == 0 {
|
|
return
|
|
}
|
|
if len(listeners) == 0 {
|
|
delete(e.topics, topic)
|
|
return
|
|
}
|
|
for _, listener := range listeners {
|
|
for i, l := range ls {
|
|
if l.ptr == 0 {
|
|
l.ptr = reflect.ValueOf(l).Pointer()
|
|
}
|
|
if l.ptr == reflect.ValueOf(listener).Pointer() {
|
|
if i == 0 {
|
|
ls = ls[1:]
|
|
} else if i == len(ls)-1 {
|
|
ls = ls[:i]
|
|
} else {
|
|
ls = append(ls[:i], ls[i+1:]...)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if len(ls) == 0 {
|
|
delete(e.topics, topic)
|
|
} else {
|
|
e.topics[topic] = ls
|
|
}
|
|
}
|
|
|