服务端流

syntax = "proto3";
package services;
import "Models.proto";
message UserScoreRequest {
    repeated UserInfo users = 1;
}
message UserScoreResponse {
    repeated UserInfo users = 1;
}
service UserService {
    rpc GetUserScore (UserScoreRequest) returns (UserScoreResponse);
    rpc GetUserScoreByServerStream (UserScoreRequest) returns (stream UserScoreResponse);//在return的参数前面加上steam表示服务端流处理,支持分批次处理
}

UserService编写

package services

import "context"

type UserService struct {
}

func (this *UserService) GetUserScoreByServerStream(in *UserScoreRequest, stream UserService_GetUserScoreByServerStreamServer) error {
    var score int32 = 101
    users := make([]*UserInfo, 0)
    for index, user := range in.Users { //模拟异步获取数据,真实情况下这里是异步获取数据,然后达到2条就当成一批发送到生产者的chan里面去
        user.UserScore = score
        score++
        users = append(users, user)
        if (index+1)%2 == 0 && index > 0 { //每拿到两条数据就发送,>0是第一条不处理
            err := stream.Send(&UserScoreResponse{Users: users})
            if err != nil {
                return err
            }
            users = users[0:0]
        }
    }
    if len(users) > 0 { //因为每两条发送一次,如果是奇数最后一条数据就没有发送出去 就漏掉了,所以这里要补发
        err := stream.Send(&UserScoreResponse{Users: users})
        if err != nil {
            return err
        }
    }
    return nil
}

客户端代码

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "grpccli/helper"
    "grpccli/services"
    "io"
    "log"
    "time"
)

func main() {
    //creds, err := credentials.NewClientTLSFromFile("keys/server.crt", "localhost")
    //if err != nil {
    //    log.Fatal(err)+
    //}

    creds := helper.GetClientCreds()

    conn, err := grpc.Dial(":8081", grpc.WithTransportCredentials(creds))
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()
    userClient := services.NewUserServiceClient(conn)
    var i int32
    req := services.UserScoreRequest{}
    req.Users = make([]*services.UserInfo, 0)
    for i = 0; i < 6; i++ {
        req.Users = append(req.Users, &services.UserInfo{UserId: i})
    }
    stream, err := userClient.GetUserScoreByServerStream(context.Background(), &req) //流处理返回值stream需要放在for循环中不断取值
    if err != nil {
        log.Fatal(err)
    }
    for {
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal(err)
        }
        go func() {
            //执行批处理数据
            fmt.Println(res)
        }()
    }
    time.Sleep(10 * time.Second)
}

客户端流模式

syntax = "proto3";
package services;
import "Models.proto";
message UserScoreRequest {
    repeated UserInfo users = 1;
}
message UserScoreResponse {
    repeated UserInfo users = 1;
}
service UserService {
    rpc GetUserScore (UserScoreRequest) returns (UserScoreResponse);
    rpc GetUserScoreByServerStream (UserScoreRequest) returns (stream UserScoreResponse);
    rpc GetUserScoreByClientSteam (stream UserScoreRequest) returns (UserScoreResponse);//stream写在参数前面表示用客户端流
}

服务端代码

package services

import (
    "context"
    "io"
)

type UserService struct {
}

func (this *UserService) GetUserScoreByClientSteam(stream UserService_GetUserScoreByClientSteamServer) error {
    //客户端流一般用于服务端接收数据耗时比较小,速度比较快,但是客户端发送的比较慢,所以为了避免服务端在等待客户端发送的过程中浪费时间,可以先按批次处理客户端发送过来的数据,最后再完整的返回给客户端
    var score int32 = 101
    users := make([]*UserInfo, 0)
    for {
        req, err := stream.Recv()
        if err == io.EOF { //我在客户端设置的一次发送五条,而服务端会先处理完每次的五条,直到客户端发送完所有的数据,才会走这个判断,这时候才会把处理好的数据全部发送给客户端
            return stream.SendAndClose(&UserScoreResponse{Users: users}) //发送并关闭流,因为是单方向的流处理,所以在数据全部发送完成后要告诉客户端我已经全部接受完成了,并且返回了所有已经处理完成的数据
        }
        if err != nil {
            return err
        }
        for _, user := range req.Users {
            user.UserScore = score //好比是服务端做的业务处理,在等待客户端发送批数据过来的过程中可以先处理之前接受到的部分数据,不必等所有数据全部接受完再处理,这里可以用协程来避免阻塞
            score++
            users = append(users, user)
        }
    }
}

客户端代码

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "grpccli/helper"
    "grpccli/services"
    "log"
)

func main() {
    //creds, err := credentials.NewClientTLSFromFile("keys/server.crt", "localhost")
    //if err != nil {
    //    log.Fatal(err)+
    //}

    creds := helper.GetClientCreds()

    conn, err := grpc.Dial(":8081", grpc.WithTransportCredentials(creds))
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()
    userClient := services.NewUserServiceClient(conn)
    var i int32
    stream, err := userClient.GetUserScoreByClientSteam(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    for j := 1; j < 3; j++ {
        req := services.UserScoreRequest{}
        req.Users = make([]*services.UserInfo, 0)
        for i = 0; i <= 5; i++ { //加入5条信息,假设是一个耗时的操作,客户端发送数据比较慢,所以不全部发送,每五条发送一次数据,先让服务器把这些数据处理了
            req.Users = append(req.Users, &services.UserInfo{UserId: i})
        }
        err := stream.Send(&req) //每五条发送一次数据
        if err != nil {
            log.Println(err)
        }
    }
    res, err := stream.CloseAndRecv()//所有数据发送完成后,关闭steam,这样服务端会收到io.EOF错误知道数据全部发送完毕,就会把之前所有处理好的数据全部返回
    if err != nil {
        return
    }
    fmt.Println(res)
}




12-25 16:11