我正在尝试将gob与zmq4套接字一起使用(来自pebbe的zmq4)。 zmq4套接字没有io设备,使gob似乎无法直接读取/写入:

我无法在&client ( type **zmq4.Socket )的参数中将io.Writer用作gob.NewEncoder类型:zmq4.Socket没有实现io.Writer(缺少Write方法)

一个zmq4发送函数SendMessage()接受一个interface{},所以我正在使用它来发送。

在服务器端zmq4接收函数正在返回string[]byte[]string[][]byte。我正在使用RecvMessage(),它返回[]string

可以写一个bytes.Buffer,发送该缓冲区,以[]string读取它,然后获取消息的内容部分以使用gob处理。尽管当前的问题在于将[]string转换为bytes.Buffer,以使gob能够从中进行io.Read。听起来很基础,但是到目前为止,我已经尝试了很多方法而没有成功。这是当前的。问题很明显是gob产生了“缓冲区中的额外数据”,这是因为print语句显示了数据在发送之前和接收之后似乎相同。

有没有更简单,更可行的方法了?如果您有zmq4,则下面的代码是自包含的,应该执行。

package main

import (
    "bytes"
    "encoding/gob"
    "fmt"
    "sync"
    "time"

    zmq "github.com/pebbe/zmq4"
)

type LogEntry struct {
    ErrID  int
    Name   string
    Level  string
    LogStr string
}

// The client task
// --------------------------------------------------------------
func client_task(s string) {
    var mu sync.Mutex
    client, _ := zmq.NewSocket(zmq.DEALER)
    defer client.Close()
    client.SetIdentity(s)
    client.Connect("tcp://localhost:5570")

    go func() {

        aLogEntry := &LogEntry{
            ErrID:  1,
            Name:   "Client",
            LogStr: "Log msg sent",
        }

        for request_nbr := 1; true; request_nbr++ {

            var network bytes.Buffer
            enc := gob.NewEncoder(&network)
            err := enc.Encode(aLogEntry)
            if err != nil {
                fmt.Println("encode error:", err)
            }

            // Early decode test - this will influence subsequent gob
            // behaviour so leave commented when caring about the sent
            // data
            // dec := gob.NewDecoder(&network)
            // var aLogEntry2 *LogEntry
            // err = dec.Decode(&aLogEntry2)
            // if err != nil {
            //  fmt.Printf("client_task(DECODE ERROR) : %s\n\n", err)
            // }
            // fmt.Printf("client_task(TEST DECODE) %+v\n\n", aLogEntry)

            mu.Lock()
            // Replaced length by bytes Buffer method: 91
            fmt.Printf("client_task(len) : %d\n\n", network.Len())
            fmt.Printf("client_task(network) : %v\n\n", network)

            client.SendMessage(network, 0)
            mu.Unlock()
            time.Sleep(5 * time.Second)
        }
    }()

    // pause to allow server
    for {
        time.Sleep(100 * time.Millisecond)
    }
}

// The server task
// --------------------------------------------------------------
func server_task() {

    frontend, _ := zmq.NewSocket(zmq.ROUTER)
    defer frontend.Close()
    frontend.Bind("tcp://*:5570")

    for {
        msg, _ := frontend.RecvMessage(0)
        // Added error reporting - does not report any error
        // err does never get filled in here, never an error could get reported here
        if err != nil {
            fmt.Printf("RECV ERROR: %s", err)
        }

        // using WriteString to write the content portion of the
        // received message to the bytes.Buffer for gob to process
        var network bytes.Buffer
        dec := gob.NewDecoder(&network)
        network.WriteString(msg[1])

        // Added length of the bytes Buffer: 285
        // Before sending the bytes Buffer is: 91
        // More than just msg[1] is written into the buffer ?
        fmt.Printf("server_task(len): %d\n\n", network.Len())
        fmt.Printf("server_task(msg[1]) : %s\n\n", msg[1])

        var aLogEntry *LogEntry
        err := dec.Decode(&aLogEntry)
        if err != nil {
            fmt.Printf("server_task(DECODE ERROR) : %s\n\n", err)
        }

        fmt.Printf("server_task(aLogEntry) %+v\n\n", aLogEntry)
    }
}

func main() {
    defer fmt.Println("main() done")

    go client_task("1")
    go server_task()

    //  Run for 5 seconds then quit
    time.Sleep(5 * time.Second)
}

打印语句在客户端显示:
client_task(network) : {[62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0}

在服务器端:
server_task(msg[1]) : {[62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0}

看起来几乎一样。

结果是:
server_task(DECODE ERROR) : extra data in buffer
server_task(aLogEntry) <nil>

最佳答案

欢迎来到零禅艺术:

如果您从未使用过ZeroMQ,可以在深入了解更多细节之前先看看"ZeroMQ Principles in less than Five Seconds"

观察:

ZeroMQ本机API定义了以下属性:

当接收到消息时,ZMQ_ROUTER套接字应在消息部分之前,该消息部分包含始发对等方到消息的路由ID,然后再传递给应用程序


解决方案:

要么开始使用正确的 PUSH/PULL 可扩展正式原型,而不是使用DEALER/ROUTER(对于您的用例而言过于过分过分),或者可以基于一个假设,即您的ROUTER -node永远不会将.RecvMessage( 0 )与其他内部多部分结构一起使用,但是只有一个,匹配无法完全保证的[ <routing_id> | <[network]-payload> [ | ... ] ]模板,可以吗?

虽然不确定,但是ZeroMQ的pebbezmq4 go-wrapper如何尝试实现所有本机API功能和/或处理潜在的差异(在没有任何用户级应用程序干预的情况下自动实现)读取所有多部分内容组件和处理ott一次和/或以NULL终止的字符串的处理,这可能会在.Decode()方法内部发生冲突。

最后但并非最不重要的一点是,如果您的代码依赖msg[1]在内部具有有效且符合所有约定的有效负载,则如果我没有忽略一些底层的hack,则我不知道,在发起时,我没有看到对情况的显式处理端(DEALER)尚未向消费者端(ROUTER)发送任何此类新消息,但是.RecvMessage( 0 ) -method填充了msg并继续向msg -method传递(带有空.Decode()),在这种情况下,它必须出于明显原因失败在空或格式不正确的msg上,不是吗?

我肯定会从PUSH/PULL替换开始,它不会在ipast_code和相关风险的情况下在交付端注入预先添加的,现在是多帧的合成。

如果没有等待消息等待routing_id -RxQueue-buffers内部等待,则.RecvMessage() -method的非阻塞模式返回仍将在用空数据填充msg时发生冲突,这仍将使PULL -method感到恐慌。

如果.Decode()方法调用实际上表现出阻塞模式接收,如果要完全从错误的根本原因分析中排除ZeroMQ,则应更加注意错误状态的检测和处理。对于所有已部署的ZeroMQ资源,更多的自防御性.RecvMessage( 0 ) -setups(.setsockopt()和许多其他设置)也将提高鲁棒性和易错性水平,即在崩溃的应用程序可能对生产造成任何损害的情况下。

您可能会尝试重现该错误:不幸的是,IDE缺少here ZeroMQ部分。

Golang.org site失败之一:

go: finding module for package github.com/pebbe/zmq4
go: downloading github.com/pebbe/zmq4 v1.2.0
go: found github.com/pebbe/zmq4 in github.com/pebbe/zmq4 v1.2.0
# pkg-config --cflags  -- libzmq
pkg-config: exec: "pkg-config": executable file not found in $PATH

Go build failed.

关于go - Go:将Gob与zmq4结合使用,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/61428228/

10-10 11:18