在过去的几周中,我一直潜伏在Stack Overflow中,寻找与阅读大量WebSocket相关的信息。基本上,我有许多主机都通过websocket发出消息,我需要对其进行汇总。

到目前为止,我已经通过Golang完成了一个websocket连接。我也已经完成了使用Python寻找的东西,但是我真的很想在Go中做到这一点!

我已经使用了gorilla的websocket示例以及其他一些示例,并且可以在Go中成功读取套接字。但是,似乎Websocket服务器使用JS中的.forEach或.Each之类的方法并不完全符合典型的开发实践。导致握手失败。

原始版本



    package main

    import (
            "fmt"
            "golang.org/x/net/websocket"
            "log"
    )

    var url = "ws://10.0.1.19:5000/data/websocket"

    func main() {
            ws, err := websocket.Dial(url, "", origin)
            if err != nil {
                    log.Fatal(err)
            }

            var msg = make([]byte, 512)
            _, err = ws.Read(msg)
            if err != nil {
                    log.Fatal(err)
            }
            fmt.Printf("Receive: %s\n", msg)
    }


我实际上不需要将任何数据发送到套接字,我只需要连接并继续读取它,然后将这些数据聚合到单个流中即可执行以后的操作。

更新(2016-03-19)

经过不断的更改并使用gorilla和旧的x/net/websocket库进行了测试,我发现很不幸,我连接的websocket服务器似乎未正确遵循gorilla希望用于握手的标准。要么就是我没有告诉 gorilla 如何正确连接。 x/net/websocket连接很好;我只是将localhost/指定为源,这似乎可行。我不确定如何告诉 gorilla 如何做同样的事情,看看它是否以相同的方式工作。通过DefaultDialer.Dial()进行挖掘有一些配置选项,但是根据我的少量Go知识,现在我还没有找到一种方法来利用它来做我想做的事情。

当前版本(2016-03-19)
package main

import (
    "fmt"
    "golang.org/x/net/websocket"
    // "log"
    "time"
)

var origin = "http://localhost"

type url struct {
    host string
}

func processUrl(host string, messages chan []byte) {
    client, err := websocket.Dial(host, "", origin)
    if err != nil {
        // log.Printf("dial:", err)
    }
    // Clean up on exit from this goroutine
    defer client.Close()
    // Loop reading messages. Send each message to the channel.
    for {
        var msg = make([]byte, 512)
        _, err = client.Read(msg)
        if err != nil {
            // log.Fatal("read:", err)
            return
        }
        messages <- msg
    }
}

func main() {

    // Create an arry of hosts to read websockets from
    urls := []string{
        "ws://10.0.1.90:3000/data/websocket",
        "ws://10.0.2.90:3000/data/websocket",
        "ws://10.0.3.90:3000/data/websocket",
    }

    // Create channel to receive messages from all connections
    messages := make(chan []byte)

    // Run a goroutine for each URL that you want to dial.
    for _, host := range urls {
        go processUrl(host, messages)
    }

    // Print all messages received from the goroutines.
    for msg := range messages {
        fmt.Printf("%d %s\n", time.Now().Unix(), msg)
    }

}

响应(来自ws的消息):

    {
        "src_city":"Wayne",
        "dest_city":"Amsterdam",
        "src_country":"US",
        "dest_country":"NL",
        "type":"view"
    }

IOWait问题

我遇到的一个问题是IOWait错误。我将二进制文件隔10个websocket运行了一夜,没有任何问题。我针对488运行了它,我需要对其进行测试,并且它在IOWait上运行了2分钟,依此类推。我看到了一些常规错误:


    goroutine 72 [IO wait]:
    net.runtime_pollWait(0x7f356149b208, 0x72, 0x0)
        /usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
    net.(*pollDesc).Wait(0xc20804e610, 0x72, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
    net.(*pollDesc).WaitRead(0xc20804e610, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
    net.(*netFD).Read(0xc20804e5b0, 0xc2080d1000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
        /usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
    net.(*conn).Read(0xc20803a150, 0xc2080d1000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/net.go:122 +0xe7
    bufio.(*Reader).fill(0xc208005140)
        /usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
    bufio.(*Reader).ReadByte(0xc208005140, 0xc2080f22d0, 0x0, 0x0)
        /usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
    golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc208005140, 0x7f356149b908, 0xc2080f22d0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
    golang.org/x/net/websocket.(*Conn).Read(0xc2080d7050, 0xc2080f4c00, 0x200, 0x200, 0x0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
    main.processUrl(0x705010, 0x26, 0xc208004180)
        /home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
    created by main.main
        /home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126

    goroutine 73 [IO wait, 2 minutes]:
    net.runtime_pollWait(0x7f356149b158, 0x72, 0x0)
        /usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
    net.(*pollDesc).Wait(0xc20804e760, 0x72, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
    net.(*pollDesc).WaitRead(0xc20804e760, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
    net.(*netFD).Read(0xc20804e700, 0xc208015000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
        /usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
    net.(*conn).Read(0xc20803a018, 0xc208015000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/net.go:122 +0xe7
    bufio.(*Reader).fill(0xc2080042a0)
        /usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
    bufio.(*Reader).ReadByte(0xc2080042a0, 0x67d6e0, 0x0, 0x0)
        /usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
    golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc2080042a0, 0x7f356149b908, 0xc2080196d0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
    golang.org/x/net/websocket.(*Conn).Read(0xc208024240, 0xc208080000, 0x200, 0x200, 0x0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
    main.processUrl(0x705190, 0x25, 0xc208004180)
        /home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
    created by main.main
        /home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126

    goroutine 74 [IO wait]:
    net.runtime_pollWait(0x7f356149b0a8, 0x72, 0x0)
        /usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
    net.(*pollDesc).Wait(0xc20804e8b0, 0x72, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
    net.(*pollDesc).WaitRead(0xc20804e8b0, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
    net.(*netFD).Read(0xc20804e850, 0xc2080d9000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
        /usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
    net.(*conn).Read(0xc20803a160, 0xc2080d9000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/net.go:122 +0xe7
    bufio.(*Reader).fill(0xc208005200)
        /usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
    bufio.(*Reader).ReadByte(0xc208005200, 0xc2080f2320, 0x0, 0x0)
        /usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
    golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc208005200, 0x7f356149b908, 0xc2080f2320, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
    golang.org/x/net/websocket.(*Conn).Read(0xc2080d70e0, 0xc2080f4e00, 0x200, 0x200, 0x0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
    main.processUrl(0x7052d0, 0x27, 0xc208004180)
        /home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
    created by main.main
        /home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126


我有另一个二进制文件,试图与每个Web套接字地址建立初始连接,以便确保它可以访问,但是这又是另一个问题。我的悬而未决的问题是:
  • 当我使用x/net/websocket的实现时,如何使用Gorilla的websocket实现从SockJS服务的websocket中读取信息。
  • 我看到的IOWait问题可能是什么或可能是什么原因?
  • 因为我使用[] byte作为响应,所以我的日志文件(只是将stdout用管道传送到文件中)包含将二进制数据附加到末尾的行。为了避免这种情况,我应该/应该如何从 byte slice 转换为仅将其作为文本/字符串写入stdout?
  • 使用chan,如何最好地从我的流程函数中传递一个附加参数,以将消息返回的Websocket主机返回到 channel ,以便我可以同时记录主机和消息?我是否应该使用一种结构来定义 channel 并让该 channel 包含我想要的内容:时间戳,主机,消息?

  • 对于IOWait错误的问题,我不能仅想象(a)无法建立连接,并且例程将其保持打开状态并最终引发错误; (b)是否有太多例程在运行?我尝试使用10、20、50和所有400+,其中甚至指定10的版本都可以工作(只要所有10都在响应),而其中10不能工作的版本(因为主机没有响应)。

    我可能会有其他疑问,但我感谢您的见解和帮助。 channel 的建议肯定让我感动。我以前使用过它们一次,但并不总是了解如何最好地实现它们。我的其他项目利用 channel 和 WaitGroup (wg),但老实说,我不了解其中一项的意义。

    再次感谢您的想法和建议,真是太好了!

    为这篇文章中的奇怪语法表示歉意,我似乎无法让编辑器删除我的代码元素周围的一些空行

    最佳答案

    启动goroutine读取每个连接。将收到的消息发送到 channel 。从该 channel 接收以获取所有连接的消息。

    // Create channel to receive messages from all connections
    messages := make(chan []byte)
    
    // Run a goroutine for each URL that you want to dial.
    for _, u := range urls {
        go func(u string) {
            // Dial with Gorilla package. The x/net/websocket package has issues.
            c, _, err := websocket.DefaultDialer.Dial(u, http.Header{"Origin":{origin}})
            if err != nil {
                log.Fatal("dial:", err)
            }
            // Clean up on exit from this goroutine
            defer c.Close()
            // Loop reading messages. Send each message to the channel.
            for {
                _, m, err := c.ReadMessage()
                if err != nil {
                    log.Fatal("read:", err)
                    return
                }
                messages <- m
            }
        }(u)
    }
    
    // Print all messages received from the goroutines.
    for m := range messages {
        fmt.Printf("%s\n", m)
    }
    

    关于sockets - golang阅读了许多websockets,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/36062878/

    10-14 23:46