go项目脚手架
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
sorbet/pkg/bus/emitter.go

141 lines
2.5 KiB

package bus
import (
"context"
"github.com/rs/xid"
"reflect"
"sync"
"time"
)
type Emitter struct {
mu sync.RWMutex
topics map[string][]ListenerObject
}
func New() *Emitter {
return &Emitter{
mu: sync.RWMutex{},
topics: make(map[string][]ListenerObject),
}
}
func (e *Emitter) Emit(ctx context.Context, topic string, data any) {
e.mu.RLock()
listeners, ok := e.topics[topic]
e.mu.RUnlock()
if !ok {
return
}
stop := make(chan struct{})
txID, _ := ctx.Value("txid-key").(string)
source, _ := ctx.Value("source-key").(string)
ctx, cancel := context.WithCancel(ctx)
if txID == "" {
txID = xid.New().String()
}
event := Event{
ID: xid.New().String(),
TxID: txID,
Topic: topic,
Source: source,
OccurredAt: time.Now(),
Data: data,
stopPropagation: func() {
select {
case <-stop:
default:
close(stop)
cancel()
}
},
}
for _, listener := range listeners {
select {
case <-stop:
return
default:
if listener.once != nil {
listener.once.Do(func() {
if listener.async {
go listener.do(ctx, event)
} else {
listener.do(ctx, event)
}
e.Cancel(topic, listener.do)
})
} else if listener.async {
go listener.do(ctx, event)
} else {
listener.do(ctx, event)
}
}
}
}
func (e *Emitter) Listen(topic string, listener Listener, options ...ListenOption) {
if listener == nil {
return
}
e.mu.Lock()
defer e.mu.Unlock()
if e.topics == nil {
e.topics = make(map[string][]ListenerObject)
}
listenerObject := ListenerObject{
async: false,
once: nil,
do: listener,
ptr: reflect.ValueOf(listener).Pointer(),
}
for _, option := range options {
option(&listenerObject)
}
if listeners, has := e.topics[topic]; !has {
e.topics[topic] = []ListenerObject{listenerObject}
} else {
e.topics[topic] = append(listeners, listenerObject)
}
}
func (e *Emitter) Cancel(topic string, listeners ...Listener) {
e.mu.Lock()
defer e.mu.Unlock()
if e.topics == nil {
return
}
ls, has := e.topics[topic]
if !has || len(ls) == 0 {
return
}
if len(listeners) == 0 {
delete(e.topics, topic)
return
}
for _, listener := range listeners {
for i, l := range ls {
if l.ptr == 0 {
l.ptr = reflect.ValueOf(l).Pointer()
}
if l.ptr == reflect.ValueOf(listener).Pointer() {
if i == 0 {
ls = ls[1:]
} else if i == len(ls)-1 {
ls = ls[:i]
} else {
ls = append(ls[:i], ls[i+1:]...)
}
}
}
}
if len(ls) == 0 {
delete(e.topics, topic)
} else {
e.topics[topic] = ls
}
}