go项目脚手架
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
sorbet/pkg/app/bus.go

215 lines
3.5 KiB

package app
import (
"errors"
"net"
"slices"
"sorbet/pkg/app/appkit"
"sync"
"sync/atomic"
)
var (
// 操作锁
//
// * 添加订阅器
lock sync.RWMutex
// 用于控制主体程序并发,涉及以下几个方面:
//
// * 任务调度
// * 定时器
// * 发布订阅
// * 主程序轮询
wg sync.WaitGroup
// 消息发布通道
pubs chan Msg
// 消息订阅器
subs map[string][]*subscriber
nextSubId int32
)
func initBus() {
if pubs == nil {
pubs = make(chan Msg, 256)
}
if subs == nil {
subs = make(map[string][]*subscriber)
}
OnFree(freeBus)
}
func freeBus() {
clear(subs)
select {
case _, ok := <-pubs:
if !ok {
// 通道已经关闭了,需要重新构建
pubs = make(chan Msg, 256)
return
}
// 清空 pubs 缓存的数据,复用它
l := len(pubs)
for i := 0; i < l; i++ {
<-pubs
}
default:
// 通道里面没有值,可以复用
}
}
func Sub(topic string, handle func([]byte) []byte) appkit.Subscriber {
lock.Lock()
defer lock.Unlock()
sub := &subscriber{
id: atomic.AddInt32(&nextSubId, 1),
topic: topic,
handle: handle,
}
if _, ok := subs[topic]; !ok {
subs[topic] = make([]*subscriber, 0)
}
subs[topic] = append(subs[topic], sub)
return sub
}
func Pub(topic string, data []byte) {
pubMsg(Msg{Data: data, topic: topic})
}
func pubMsg(msg Msg) {
wg.Add(1)
pubs <- msg
}
type Msg struct {
Data []byte
topic string
addr net.Addr
pc net.PacketConn
}
// Size 消息长度
func (m Msg) Size() int {
return 6 + len([]byte(m.topic)) + len(m.Data)
}
type subscriber struct {
id int32
topic string
handle func([]byte) []byte
}
func (s *subscriber) Active() bool {
return atomic.LoadInt32(&s.id) > 0
}
func (s *subscriber) Cancel() {
id := atomic.SwapInt32(&s.id, 0)
if id == 0 {
return
}
lock.Lock()
defer lock.Unlock()
subscribers, ok := subs[s.topic]
if !ok {
return
}
if len(subscribers) > 0 {
subscribers = slices.DeleteFunc(subscribers, func(sub *subscriber) bool {
return sub.id == id
})
}
if len(subscribers) > 0 {
subs[s.topic] = subscribers
} else {
delete(subs, s.topic)
}
}
func (s *subscriber) invoke(data []byte) []byte {
if s.Active() {
return s.handle(data)
}
return nil
}
func dispatch(msg Msg) error {
lock.RLock()
var cbs []*subscriber
if subscribers, ok := subs[msg.topic]; ok {
for _, s := range subscribers {
if s.Active() {
cbs = append(cbs, s)
}
}
}
lock.RUnlock()
if len(cbs) == 0 {
return errors.New("no subscribers for topic " + msg.topic)
}
var response []byte
for _, cb := range cbs {
res := cb.invoke(msg.Data)
if res != nil {
response = res
}
}
if len(response) > 0 && msg.pc != nil {
stats.v8workerRespond++
stats.v8workerBytesSent += len(response)
n, ex := msg.pc.WriteTo(response, msg.addr)
if n > 0 {
stats.v8workerSend++
stats.v8workerBytesSent += n
}
if ex != nil {
// todo handle the error
}
}
return nil
}
func recv(pc net.PacketConn, addr net.Addr, buf []byte) {
stats.v8workerRecv++
stats.v8workerBytesRecv += len(buf)
topic, data, err := appkit.Decode(buf)
if err != nil {
// todo handle the error
// return errors.New("invalid payload")
return
}
pubMsg(Msg{
Data: data,
topic: string(topic),
addr: addr,
pc: pc,
})
}
func checkPubsEmpty() {
// We've received a done event. As a sanity check, make sure that resChan is
// empty.
select {
case _, ok := <-pubs:
if ok {
panic("Read a message from pubs after context deadlined.")
} else {
panic("pubs closed. Unexpected.")
}
default:
// No value ready, moving on.
}
}