使用grpc双向模式,可以实现客户端随时发送消息给服务端,服务端也可以随时发送消息到客户端,不再是一问一答的模式。
grpc_stream/hellowoldstream/helloworldstream.proto
syntax = "proto3"; package pb; message HelloRequest { string username = 1; } message HelloResponse { string message = 1; } // + message ClientStream { bytes stream = 1; } message ServerStream { bytes stream = 1; } service HelloService { rpc SayHello(HelloRequest) returns (HelloResponse){} rpc Chat(stream ClientStream) returns (stream ServerStream){} }
protoc --go_out=plugins=grpc:. helloworldstream.proto 生成 helloworldstream.pb.go
grpc_stream/server_stream.go
package main import ( "context" "fmt" "io" "net" "strconv" "time" pb "github.com/mypractise/grpc/grpc_stream/helloworldstream" "google.golang.org/grpc" ) type HelloService struct { } func (hs *HelloService) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloResponse, error) { return &pb.HelloResponse{Message: fmt.Sprintf("你好,%s", req.Username)}, nil } // ++++++++++++++++++++++++++++++++ func (hs *HelloService) Chat(conn pb.HelloService_ChatServer) error { go func() { var i int = 100 for { time.Sleep(2 * time.Second) conn.Send(&pb.ServerStream{Stream: []byte("server send i:" + strconv.Itoa(i))}) i++ } }() for { time.Sleep(2 * time.Second) stream, err := conn.Recv() if err == io.EOF { return nil } if err != nil { return err } fmt.Println("server recv:", string(stream.Stream)) // fmt.Println("receive from client:", stream.Stream) } return nil } // ++++++++++++++++++++ func main() { lis, err := net.Listen("tcp", ":6001") if err != nil { fmt.Println(err.Error()) return } s := grpc.NewServer() pb.RegisterHelloServiceServer(s, &HelloService{}) go func() { s.Serve(lis) }() fmt.Println(s.GetServiceInfo()) select {} } func newBytes(a ...byte) []byte { return a }
grpc_stream/client_stream.go
package main import ( "context" "fmt" "io" "strconv" "time" pb "github.com/mypractise/grpc/grpc_stream/helloworldstream" "google.golang.org/grpc" ) func main() { conn, e := grpc.Dial("localhost:6001", grpc.WithInsecure()) if e != nil { fmt.Println(e.Error()) return } defer conn.Close() c := pb.NewHelloServiceClient(conn) // // say hello // r, e := c.SayHello(context.Background(), &pb.HelloRequest{Username: "ft"}) // if e != nil { // fmt.Println(e.Error()) // return // } // fmt.Println(r.Message) // ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ // chat chatClilent, e := c.Chat(context.Background()) if e != nil { fmt.Println(e.Error()) return } go func() { for { time.Sleep(3 * time.Second) stream, e := chatClilent.Recv() if e == io.EOF { fmt.Println("EOF") return } if e != nil { fmt.Println("-----err:", e) return } fmt.Println("client recv:", string(stream.Stream)) } }() go func() { i := 0 for { time.Sleep(3 * time.Second) chatClilent.Send(&pb.ClientStream{Stream: []byte("client send i:" + strconv.Itoa(i))}) i++ } }() select { case <-time.After(20 * time.Second): } } func newBytes(a ...byte) []byte { return a }