使用grpc双向模式,可以实现客户端随时发送消息给服务端,服务端也可以随时发送消息到客户端,不再是一问一答的模式。

grpc_stream/hellowoldstream/helloworldstream.proto

syntax = "proto3";
package pb;

message HelloRequest {
  string username = 1;
}

message HelloResponse {
  string message = 1;
}

// +
message ClientStream {
    bytes stream = 1;
}
message ServerStream {
    bytes stream = 1;
}
service HelloService {
  rpc SayHello(HelloRequest) returns (HelloResponse){}

  rpc Chat(stream ClientStream) returns (stream ServerStream){}
}

protoc --go_out=plugins=grpc:. helloworldstream.proto 生成 helloworldstream.pb.go

grpc_stream/server_stream.go

package main

import (
    "context"
    "fmt"
    "io"
    "net"
    "strconv"
    "time"

    pb "github.com/mypractise/grpc/grpc_stream/helloworldstream"
    "google.golang.org/grpc"
)

type HelloService struct {
}

func (hs *HelloService) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloResponse, error) {
    return &pb.HelloResponse{Message: fmt.Sprintf("你好,%s", req.Username)}, nil
}

// ++++++++++++++++++++++++++++++++
func (hs *HelloService) Chat(conn pb.HelloService_ChatServer) error {
    go func() {
        var i int = 100
        for {
            time.Sleep(2 * time.Second)
            conn.Send(&pb.ServerStream{Stream: []byte("server send i:" + strconv.Itoa(i))})
            i++
        }
    }()

    for {
        time.Sleep(2 * time.Second)
        stream, err := conn.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        fmt.Println("server recv:", string(stream.Stream))
        // fmt.Println("receive from client:", stream.Stream)

    }

    return nil
}

// ++++++++++++++++++++

func main() {
    lis, err := net.Listen("tcp", ":6001")
    if err != nil {
        fmt.Println(err.Error())
        return
    }
    s := grpc.NewServer()
    pb.RegisterHelloServiceServer(s, &HelloService{})
    go func() {
        s.Serve(lis)
    }()
    fmt.Println(s.GetServiceInfo())
    select {}
}

func newBytes(a ...byte) []byte {
    return a
}

grpc_stream/client_stream.go

package main

import (
    "context"
    "fmt"
    "io"
    "strconv"

    "time"

    pb "github.com/mypractise/grpc/grpc_stream/helloworldstream"
    "google.golang.org/grpc"
)

func main() {
    conn, e := grpc.Dial("localhost:6001", grpc.WithInsecure())
    if e != nil {
        fmt.Println(e.Error())
        return
    }
    defer conn.Close()
    c := pb.NewHelloServiceClient(conn)
    // // say hello
    // r, e := c.SayHello(context.Background(), &pb.HelloRequest{Username: "ft"})
    // if e != nil {
    //     fmt.Println(e.Error())
    //     return
    // }
    // fmt.Println(r.Message)

    // ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    // chat
    chatClilent, e := c.Chat(context.Background())
    if e != nil {
        fmt.Println(e.Error())
        return
    }
    go func() {
        for {
            time.Sleep(3 * time.Second)
            stream, e := chatClilent.Recv()
            if e == io.EOF {
                fmt.Println("EOF")
                return
            }
            if e != nil {
                fmt.Println("-----err:", e)
                return
            }
            fmt.Println("client recv:", string(stream.Stream))
        }
    }()

    go func() {
        i := 0
        for {
            time.Sleep(3 * time.Second)
            chatClilent.Send(&pb.ClientStream{Stream: []byte("client send i:" + strconv.Itoa(i))})
            i++
        }
    }()
    select {
    case <-time.After(20 * time.Second):
    }
}

func newBytes(a ...byte) []byte {
    return a
}
01-02 22:55