You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
257 lines
4.5 KiB
257 lines
4.5 KiB
package evio
|
|
|
|
import (
|
|
"slices"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
type subscription struct {
|
|
id int
|
|
priority null[int]
|
|
once null[bool]
|
|
topic string
|
|
evio *simple
|
|
unsubscribed int32
|
|
handler Handler
|
|
}
|
|
|
|
func (s *subscription) SetPriority(priority int) {
|
|
s.priority.val = priority
|
|
s.priority.ok = true
|
|
}
|
|
|
|
func (s *subscription) SetOnce(once bool) {
|
|
s.once.val = once
|
|
s.once.ok = true
|
|
}
|
|
|
|
func (s *subscription) Priority() int {
|
|
if p, ok := s.handler.(Prioritizer); ok {
|
|
return p.Priority()
|
|
}
|
|
if s.priority.ok {
|
|
return s.priority.val
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (s *subscription) Once() bool {
|
|
if o, ok := s.handler.(Oncer); ok {
|
|
return o.Once()
|
|
}
|
|
if s.once.ok {
|
|
return s.once.val
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (s *subscription) Handle(data any) error {
|
|
if atomic.LoadInt32(&s.unsubscribed) > 0 {
|
|
return nil
|
|
}
|
|
return s.handler.Handle(data)
|
|
}
|
|
|
|
func unsubscribe(s *subscription) bool {
|
|
return atomic.SwapInt32(&s.unsubscribed, 1) == 0
|
|
}
|
|
|
|
type event struct {
|
|
topic string
|
|
data any
|
|
}
|
|
|
|
type simple struct {
|
|
mutex sync.RWMutex
|
|
pool sync.Pool
|
|
idgen int32
|
|
subs []*subscription
|
|
events chan event
|
|
closed chan struct{}
|
|
newMatcher func(topic string) Matcher
|
|
}
|
|
|
|
func New(newMatcher func(string) Matcher) Evio {
|
|
return &simple{
|
|
pool: sync.Pool{
|
|
New: func() any {
|
|
return make(chan any)
|
|
},
|
|
},
|
|
events: make(chan event, 1024),
|
|
closed: make(chan struct{}),
|
|
newMatcher: newMatcher,
|
|
}
|
|
}
|
|
|
|
type mistaker struct {
|
|
defers []func(data any)
|
|
}
|
|
|
|
func (m *mistaker) captch(s *simple, err error, topic string) {
|
|
m.defers = append(m.defers, func(data any) {
|
|
switch x := err.(type) {
|
|
case *Error:
|
|
s.Call("error", x)
|
|
case interface{ Unwrap() error }:
|
|
if err = x.Unwrap(); err != nil {
|
|
s.Call("error", &Error{err, topic, data})
|
|
}
|
|
case interface{ Unwrap() []error }:
|
|
for _, err := range x.Unwrap() {
|
|
s.Call("error", &Error{err, topic, data})
|
|
}
|
|
default:
|
|
s.Call("error", &Error{err, topic, data})
|
|
}
|
|
})
|
|
}
|
|
|
|
func (m *mistaker) flush(data any) {
|
|
for _, deferFunc := range m.defers {
|
|
deferFunc(data)
|
|
}
|
|
}
|
|
|
|
func (s *simple) Call(topic string, data any) {
|
|
matcher := s.newMatcher(topic)
|
|
|
|
s.mutex.RLock()
|
|
var subs []*subscription
|
|
var bads []*subscription
|
|
for _, sub := range s.subs {
|
|
if atomic.LoadInt32(&sub.unsubscribed) > 0 {
|
|
bads = append(bads, sub)
|
|
continue
|
|
}
|
|
if matcher.Match(sub.topic) {
|
|
subs = append(subs, sub)
|
|
}
|
|
}
|
|
slices.SortFunc(subs, func(a, b *subscription) int {
|
|
if n := a.Priority() - b.Priority(); n != 0 {
|
|
return n
|
|
}
|
|
return a.id - b.id
|
|
})
|
|
s.mutex.RUnlock()
|
|
|
|
// 删除失效的
|
|
if len(bads) > 0 {
|
|
s.mutex.Lock()
|
|
for _, sub := range bads {
|
|
s.subs = slices.DeleteFunc(s.subs, func(s *subscription) bool {
|
|
return sub.id == s.id
|
|
})
|
|
}
|
|
s.mutex.Unlock()
|
|
}
|
|
|
|
var m mistaker
|
|
defer m.flush(data)
|
|
|
|
// 广播
|
|
for _, sub := range subs {
|
|
if sub.Once() {
|
|
unsubscribe(sub)
|
|
}
|
|
if err := sub.Handle(data); err != nil {
|
|
m.captch(s, err, sub.topic)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *simple) Publish(topic string, data any) error {
|
|
select {
|
|
case <-s.closed:
|
|
return ErrClosed
|
|
case s.events <- event{topic, data}:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (s *simple) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Unsubscriber, error) {
|
|
select {
|
|
case <-s.closed:
|
|
return nil, ErrClosed
|
|
default:
|
|
}
|
|
|
|
var sub subscription
|
|
sub.id = int(atomic.AddInt32(&s.idgen, 1))
|
|
sub.topic = topic
|
|
sub.evio = s
|
|
sub.handler = handler
|
|
for _, option := range opts {
|
|
option(&sub)
|
|
}
|
|
|
|
s.mutex.Lock()
|
|
s.subs = append(s.subs, &sub)
|
|
s.mutex.Unlock()
|
|
|
|
if i, ok := handler.(Initializer); ok {
|
|
go i.Init(s)
|
|
}
|
|
|
|
return func() bool { return unsubscribe(&sub) }, nil
|
|
}
|
|
|
|
func (s *simple) Listen(topic string) (func(), <-chan any, error) {
|
|
var stopped int32
|
|
ch := s.pool.Get().(chan any)
|
|
|
|
unsubscribe, err := s.Subscribe(topic, HandlerFunc(func(data any) error {
|
|
if atomic.LoadInt32(&stopped) == 0 {
|
|
ch <- data
|
|
}
|
|
return nil
|
|
}))
|
|
if err != nil {
|
|
s.pool.Put(ch)
|
|
return nil, nil, err
|
|
}
|
|
|
|
stop := func() {
|
|
if atomic.CompareAndSwapInt32(&stopped, 0, 1) {
|
|
select {
|
|
case <-ch:
|
|
default:
|
|
}
|
|
s.pool.Put(ch)
|
|
unsubscribe()
|
|
}
|
|
}
|
|
|
|
return stop, ch, err
|
|
}
|
|
|
|
func (s *simple) Start() error {
|
|
go s.start()
|
|
return nil
|
|
}
|
|
|
|
func (s *simple) start() {
|
|
for {
|
|
select {
|
|
case <-s.closed:
|
|
for n := len(s.events); n > 0; n-- {
|
|
evt := <-s.events
|
|
s.Call(evt.topic, evt.data)
|
|
}
|
|
return
|
|
case evt := <-s.events:
|
|
s.Call(evt.topic, evt.data)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *simple) Stop() error {
|
|
select {
|
|
case <-s.closed:
|
|
default:
|
|
close(s.closed)
|
|
}
|
|
return nil
|
|
}
|
|
|