You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ims/util/evio/simple.go

257 lines
4.5 KiB

package evio
import (
"slices"
"sync"
"sync/atomic"
)
type subscription struct {
id int
priority null[int]
once null[bool]
topic string
evio *simple
unsubscribed int32
handler Handler
}
func (s *subscription) SetPriority(priority int) {
s.priority.val = priority
s.priority.ok = true
}
func (s *subscription) SetOnce(once bool) {
s.once.val = once
s.once.ok = true
}
func (s *subscription) Priority() int {
if p, ok := s.handler.(Prioritizer); ok {
return p.Priority()
}
if s.priority.ok {
return s.priority.val
}
return 0
}
func (s *subscription) Once() bool {
if o, ok := s.handler.(Oncer); ok {
return o.Once()
}
if s.once.ok {
return s.once.val
}
return false
}
func (s *subscription) Handle(data any) error {
if atomic.LoadInt32(&s.unsubscribed) > 0 {
return nil
}
return s.handler.Handle(data)
}
func unsubscribe(s *subscription) bool {
return atomic.SwapInt32(&s.unsubscribed, 1) == 0
}
type event struct {
topic string
data any
}
type simple struct {
mutex sync.RWMutex
pool sync.Pool
idgen int32
subs []*subscription
events chan event
closed chan struct{}
newMatcher func(topic string) Matcher
}
func New(newMatcher func(string) Matcher) Evio {
return &simple{
pool: sync.Pool{
New: func() any {
return make(chan any)
},
},
events: make(chan event, 1024),
closed: make(chan struct{}),
newMatcher: newMatcher,
}
}
type mistaker struct {
defers []func(data any)
}
func (m *mistaker) captch(s *simple, err error, topic string) {
m.defers = append(m.defers, func(data any) {
switch x := err.(type) {
case *Error:
s.Call("error", x)
case interface{ Unwrap() error }:
if err = x.Unwrap(); err != nil {
s.Call("error", &Error{err, topic, data})
}
case interface{ Unwrap() []error }:
for _, err := range x.Unwrap() {
s.Call("error", &Error{err, topic, data})
}
default:
s.Call("error", &Error{err, topic, data})
}
})
}
func (m *mistaker) flush(data any) {
for _, deferFunc := range m.defers {
deferFunc(data)
}
}
func (s *simple) Call(topic string, data any) {
matcher := s.newMatcher(topic)
s.mutex.RLock()
var subs []*subscription
var bads []*subscription
for _, sub := range s.subs {
if atomic.LoadInt32(&sub.unsubscribed) > 0 {
bads = append(bads, sub)
continue
}
if matcher.Match(sub.topic) {
subs = append(subs, sub)
}
}
slices.SortFunc(subs, func(a, b *subscription) int {
if n := a.Priority() - b.Priority(); n != 0 {
return n
}
return a.id - b.id
})
s.mutex.RUnlock()
// 删除失效的
if len(bads) > 0 {
s.mutex.Lock()
for _, sub := range bads {
s.subs = slices.DeleteFunc(s.subs, func(s *subscription) bool {
return sub.id == s.id
})
}
s.mutex.Unlock()
}
var m mistaker
defer m.flush(data)
// 广播
for _, sub := range subs {
if sub.Once() {
unsubscribe(sub)
}
if err := sub.Handle(data); err != nil {
m.captch(s, err, sub.topic)
}
}
}
func (s *simple) Publish(topic string, data any) error {
select {
case <-s.closed:
return ErrClosed
case s.events <- event{topic, data}:
return nil
}
}
func (s *simple) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Unsubscriber, error) {
select {
case <-s.closed:
return nil, ErrClosed
default:
}
var sub subscription
sub.id = int(atomic.AddInt32(&s.idgen, 1))
sub.topic = topic
sub.evio = s
sub.handler = handler
for _, option := range opts {
option(&sub)
}
s.mutex.Lock()
s.subs = append(s.subs, &sub)
s.mutex.Unlock()
if i, ok := handler.(Initializer); ok {
go i.Init(s)
}
return func() bool { return unsubscribe(&sub) }, nil
}
func (s *simple) Listen(topic string) (func(), <-chan any, error) {
var stopped int32
ch := s.pool.Get().(chan any)
unsubscribe, err := s.Subscribe(topic, HandlerFunc(func(data any) error {
if atomic.LoadInt32(&stopped) == 0 {
ch <- data
}
return nil
}))
if err != nil {
s.pool.Put(ch)
return nil, nil, err
}
stop := func() {
if atomic.CompareAndSwapInt32(&stopped, 0, 1) {
select {
case <-ch:
default:
}
s.pool.Put(ch)
unsubscribe()
}
}
return stop, ch, err
}
func (s *simple) Start() error {
go s.start()
return nil
}
func (s *simple) start() {
for {
select {
case <-s.closed:
for n := len(s.events); n > 0; n-- {
evt := <-s.events
s.Call(evt.topic, evt.data)
}
return
case evt := <-s.events:
s.Call(evt.topic, evt.data)
}
}
}
func (s *simple) Stop() error {
select {
case <-s.closed:
default:
close(s.closed)
}
return nil
}