我在MacOSX上使用brew安装了zeromq:稳定的4.1.4,并编写了一个简单的PUB / SUB程序来测试zeromq。但是,当我使用--bufsize> 5标志运行示例程序时(使用大于5MB的缓冲区)(运行go_zmq_pubsub.go --bufsize = 6);它引发以下异常:
没有可用的缓冲区空间(tcp.cpp:69)
SIGABRT:中止
PC = 0x7fff9911c286 m = 0
CGO执行期间信号到达
下面是我用来测试zeromq4.x的程序
package main
import (
"fmt"
"flag"
"strconv"
"sync"
log "github.com/Sirupsen/logrus"
zmq "github.com/pebbe/zmq4"
"time"
)
var _ = fmt.Println
func main(){
var port int
var bufsize int
flag.IntVar(&port, "port", 7676, "server's zmq tcp port")
flag.IntVar(&bufsize, "bufsize", 0, "socket kernel buffer size")
flag.Parse();
publisher, err := zmq.NewSocket(zmq.PUB)
if(err != nil) {
log.Fatal(err)
}
//set publisher kernel transmit buffer size
//convert into bytes
if err := publisher.SetSndbuf(bufsize * 1000000); err != nil {
log.Fatal(err)
}
defer publisher.Close()
publisher.Bind("tcp://*:" + strconv.Itoa(port))
//SETUP subscriber
subscriber, err := zmq.NewSocket(zmq.SUB)
if(err != nil) {
log.Fatal(err)
}
//set subscriber kernel receive buffer size
if err := subscriber.SetRcvbuf(bufsize * 1000000); err != nil {
log.Fatal(err)
}
defer subscriber.Close()
subscriber.Connect("tcp://127.0.0.1:" + strconv.Itoa(port))
subscriber.SetSubscribe("")
var wg sync.WaitGroup
wg.Add(2)
idx := 0
go func(wg *sync.WaitGroup) {
//start streaming messages
ticker := time.NewTicker(1 * time.Second)
go func() {
for {
select {
case <-ticker.C:
_, err = publisher.Send("PKG:"+strconv.Itoa(idx), 0)
idx++;
if(err != nil) {
log.Error(err)
}
}
}
}()
}(&wg)
//receiver
go func(wg *sync.WaitGroup) {
go func(){
for {
payload, err := subscriber.Recv(0)
_ = payload
if err != nil {
log.Error(err)
break
}
//now sending into worker pool
log.Info("RECEIVE:" + payload)
}
}()
}(&wg)
wg.Wait()
}
在具有从源代码构建的lib-zeromq的Centos7上,上面的代码可以正常工作。
不知道这是由于libzeromq还是操作系统本身引起的。
谢谢。
最佳答案
大于5MB的缓冲区是没有意义的。超出相关链路的带宽延迟乘积的任何东西都是浪费的空间。
适度您的要求。