package evio import ( "slices" "sync" "sync/atomic" ) type subscription struct { id int priority null[int] once null[bool] topic string evio *simple unsubscribed int32 handler Handler } func (s *subscription) SetPriority(priority int) { s.priority.val = priority s.priority.ok = true } func (s *subscription) SetOnce(once bool) { s.once.val = once s.once.ok = true } func (s *subscription) Priority() int { if p, ok := s.handler.(Prioritizer); ok { return p.Priority() } if s.priority.ok { return s.priority.val } return 0 } func (s *subscription) Once() bool { if o, ok := s.handler.(Oncer); ok { return o.Once() } if s.once.ok { return s.once.val } return false } func (s *subscription) Handle(data any) error { if atomic.LoadInt32(&s.unsubscribed) > 0 { return nil } return s.handler.Handle(data) } func unsubscribe(s *subscription) bool { return atomic.SwapInt32(&s.unsubscribed, 1) == 0 } type event struct { topic string data any } type simple struct { mutex sync.RWMutex pool sync.Pool idgen int32 subs []*subscription events chan event closed chan struct{} newMatcher func(topic string) Matcher } func New(newMatcher func(string) Matcher) Evio { return &simple{ pool: sync.Pool{ New: func() any { return make(chan any) }, }, events: make(chan event, 1024), closed: make(chan struct{}), newMatcher: newMatcher, } } type mistaker struct { defers []func(data any) } func (m *mistaker) captch(s *simple, err error, topic string) { m.defers = append(m.defers, func(data any) { switch x := err.(type) { case *Error: s.Call("error", x) case interface{ Unwrap() error }: if err = x.Unwrap(); err != nil { s.Call("error", &Error{err, topic, data}) } case interface{ Unwrap() []error }: for _, err := range x.Unwrap() { s.Call("error", &Error{err, topic, data}) } default: s.Call("error", &Error{err, topic, data}) } }) } func (m *mistaker) flush(data any) { for _, deferFunc := range m.defers { deferFunc(data) } } func (s *simple) Call(topic string, data any) { matcher := s.newMatcher(topic) s.mutex.RLock() var subs []*subscription var bads []*subscription for _, sub := range s.subs { if atomic.LoadInt32(&sub.unsubscribed) > 0 { bads = append(bads, sub) continue } if matcher.Match(sub.topic) { subs = append(subs, sub) } } slices.SortFunc(subs, func(a, b *subscription) int { if n := a.Priority() - b.Priority(); n != 0 { return n } return a.id - b.id }) s.mutex.RUnlock() // 删除失效的 if len(bads) > 0 { s.mutex.Lock() for _, sub := range bads { s.subs = slices.DeleteFunc(s.subs, func(s *subscription) bool { return sub.id == s.id }) } s.mutex.Unlock() } var m mistaker defer m.flush(data) // 广播 for _, sub := range subs { if sub.Once() { unsubscribe(sub) } if err := sub.Handle(data); err != nil { m.captch(s, err, sub.topic) } } } func (s *simple) Publish(topic string, data any) error { select { case <-s.closed: return ErrClosed case s.events <- event{topic, data}: return nil } } func (s *simple) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Unsubscriber, error) { select { case <-s.closed: return nil, ErrClosed default: } var sub subscription sub.id = int(atomic.AddInt32(&s.idgen, 1)) sub.topic = topic sub.evio = s sub.handler = handler for _, option := range opts { option(&sub) } s.mutex.Lock() s.subs = append(s.subs, &sub) s.mutex.Unlock() if i, ok := handler.(Initializer); ok { go i.Init(s) } return func() bool { return unsubscribe(&sub) }, nil } func (s *simple) Listen(topic string) (func(), <-chan any, error) { var stopped int32 ch := s.pool.Get().(chan any) unsubscribe, err := s.Subscribe(topic, HandlerFunc(func(data any) error { if atomic.LoadInt32(&stopped) == 0 { ch <- data } return nil })) if err != nil { s.pool.Put(ch) return nil, nil, err } stop := func() { if atomic.CompareAndSwapInt32(&stopped, 0, 1) { select { case <-ch: default: } s.pool.Put(ch) unsubscribe() } } return stop, ch, err } func (s *simple) Start() error { go s.start() return nil } func (s *simple) start() { for { select { case <-s.closed: for n := len(s.events); n > 0; n-- { evt := <-s.events s.Call(evt.topic, evt.data) } return case evt := <-s.events: s.Call(evt.topic, evt.data) } } } func (s *simple) Stop() error { select { case <-s.closed: default: close(s.closed) } return nil }