gRPC 流:最佳实践和性能见解

grpc 流:最佳实践和性能见解

介绍

grpc 流允许 protobuf 消息从客户端流式传输到服务器、从服务器流式传输到客户端,或者双向流式传输。
这一强大的功能可用于构建实时应用程序,例如聊天应用程序、实时监控仪表板等。

在本文中,我们将探讨如何正确使用 grpc 流。

先决条件

grpc基础知识go 编程语言的基础知识(示例代码是用 go 编写的,但这个概念也可以应用于其他语言)代码示例可在 github 上获取

良好实践

让我们检查一下使用 grpc 流的良好实践:

使用一元请求进行一元请求

一个常见的错误是对一元请求使用流式传输。
例如,考虑以下 grpc 服务定义:

service myservice {  rpc getsomething (somethingrequest) returns (stream somethingresponse) {}}

登录后复制

如果客户端只需要发送一个请求并接收一个响应,
您不需要使用流式传输。相反,我们可以按如下方式定义服务:

service myservice {  rpc getsomething (somethingrequest) returns (somethingresponse) {}}

登录后复制

通过对一元请求使用流式传输,我们增加了不必要的复杂性
到代码,这可能会使其更难理解和维护,而不是
从使用流媒体中获得任何好处。

比较一元请求和流请求的 golang 代码示例:

一元请求:

type somethingunary struct {    pb.unimplementedsomethingunaryserver}func (s *somethingunary) getsomething(ctx context.context, req *pb.somethingrequest) (*pb.somethingresponse, error) {    return &pb.somethingresponse{        message: "hello " + req.name,    }, nil}func testsomethingunary(t *testing.t) {    conn := newserver(t, func(s grpc.serviceregistrar) {        pb.registersomethingunaryserver(s, &somethingunary{})    })    client := pb.newsomethingunaryclient(conn)    response, err := client.getsomething(        context.background(),        &pb.somethingrequest{            name: "test",        },    )    if err != nil {        t.fatalf("failed to get something: %v", err)    }    if response.message != "hello test" {        t.errorf("unexpected response: %v", response.message)    }}

登录后复制

流式一元请求:

type somethingstream struct {    pb.unimplementedsomethingstreamserver}func (s *somethingstream) getsomething(req *pb.somethingrequest, stream pb.somethingstream_getsomethingserver) error {    if err := stream.send(&pb.somethingresponse{        message: "hello " + req.name,    }); err != nil {        return err    }    return nil}func testsomethingstream(t *testing.t) {    conn := newserver(t, func(s grpc.serviceregistrar) {        pb.registersomethingstreamserver(s, &somethingstream{})    })    client := pb.newsomethingstreamclient(conn)    stream, err := client.getsomething(        context.background(),        &pb.somethingrequest{            name: "test",        },    )    if err != nil {        t.fatalf("failed to get something stream: %v", err)    }    response, err := stream.recv()    if err != nil {        t.fatalf("failed to receive response: %v", err)    }    if response.message != "hello test" {        t.errorf("unexpected response: %v", response.message)    }}

登录后复制

我们可以看到,一元请求的代码更简单,更容易理解
比流请求的代码。

如果可以的话,一次发送多个文档

让我们比较一下这两个服务定义:

service bookstore {  rpc listbooks(listbooksrequest) returns (stream book) {}}service bookstorebatch {  rpc listbooks(listbooksrequest) returns (stream listbooksresponse) {}}message listbooksresponse {  repeated book books = 1;}

登录后复制

bookstore 一次流式传输一本书,而 bookstorebatch 同时流式传输多本书。

如果客户端需要列出所有书籍,使用bookstorebatch 效率更高
因为它减少了客户端和服务器之间的往返次数。

让我们看看 bookstore 和 bookstorebatch 的 golang 代码示例:

书店:

type bookstore struct {    pb.unimplementedbookstoreserver}func (s *bookstore) listbooks(req *pb.listbooksrequest, stream pb.bookstore_listbooksserver) error {    for _, b := range bookstoredata {        if b.author == req.author {            if err := stream.send(&pb.book{                title:           b.title,                author:          b.author,                publicationyear: int32(b.publicationyear),                genre:           b.genre,            }); err != nil {                return err            }        }    }    return nil}func testbookstore_listbooks(t *testing.t) {    conn := newserver(t, func(s grpc.serviceregistrar) {        pb.registerbookstoreserver(s, &bookstore{})    })    client := pb.newbookstoreclient(conn)    stream, err := client.listbooks(        context.background(),        &pb.listbooksrequest{            author: charlesdickens,        },    )    if err != nil {        t.fatalf("failed to list books: %v", err)    }    books := []*pb.book{}    for {        book, err := stream.recv()        if err != nil {            break        }        books = append(books, book)    }    if len(books) != charlesdickensbooks {        t.errorf("unexpected number of books: %d", len(books))    }}

登录后复制

书店批次:

type bookstorebatch struct {    pb.unimplementedbookstorebatchserver}func (s *bookstorebatch) listbooks(req *pb.listbooksrequest, stream pb.bookstorebatch_listbooksserver) error {    const batchsize = 10    books := make([]*pb.book, 0, batchsize)    for _, b := range bookstoredata {        if b.author == req.author {            books = append(books, &pb.book{                title:           b.title,                author:          b.author,                publicationyear: int32(b.publicationyear),                genre:           b.genre,            })            if len(books) == batchsize {                if err := stream.send(&pb.listbooksresponse{                    books: books,                }); err != nil {                    return err                }                books = books[:0]            }        }    }    if len(books) > 0 {        if err := stream.send(&pb.listbooksresponse{            books: books,        }); err != nil {            return nil        }    }    return nil}func testbookstorebatch_listbooks(t *testing.t) {    conn := newserver(t, func(s grpc.serviceregistrar) {        pb.registerbookstorebatchserver(s, &bookstorebatch{})    })    client := pb.newbookstorebatchclient(conn)    stream, err := client.listbooks(        context.background(),        &pb.listbooksrequest{            author: charlesdickens,        },    )    if err != nil {        t.fatalf("failed to list books: %v", err)    }    books := []*pb.book{}    for {        response, err := stream.recv()        if err != nil {            break        }        books = append(books, response.books...)    }    if len(books) != charlesdickensbooks {        t.errorf("unexpected number of books: %d", len(books))    }}

登录后复制

从上面的代码中,需要明确哪一个更好。
让我们运行一个基准测试来看看差异:

书店基准:

func benchmarkbookstore_listbooks(b *testing.b) {    conn := newserver(b, func(s grpc.serviceregistrar) {        pb.registerbookstoreserver(s, &bookstore{})    })    client := pb.newbookstoreclient(conn)    var benchinnerbooks []*pb.book    b.resettimer()    for i := 0; i < b.n; i++ {        stream, err := client.listbooks(            context.background(),            &pb.listbooksrequest{                author: charlesdickens,            },        )        if err != nil {            b.fatalf("failed to list books: %v", err)        }        books := []*pb.book{}        for {            book, err := stream.recv()            if err != nil {                break            }            books = append(books, book)        }        benchinnerbooks = books    }    benchbooks = benchinnerbooks}

登录后复制

bookstorebatch 基准:

func benchmarkbookstorebatch_listbooks(b *testing.b) {    conn := newserver(b, func(s grpc.serviceregistrar) {        pb.registerbookstorebatchserver(s, &bookstorebatch{})    })    client := pb.newbookstorebatchclient(conn)    var benchinnerbooks []*pb.book    b.resettimer()    for i := 0; i < b.n; i++ {        stream, err := client.listbooks(            context.background(),            &pb.listbooksrequest{                author: charlesdickens,            },        )        if err != nil {            b.fatalf("failed to list books: %v", err)        }        books := []*pb.book{}        for {            response, err := stream.recv()            if err != nil {                break            }            books = append(books, response.books...)        }        benchinnerbooks = books    }    benchbooks = benchinnerbooks}

登录后复制

基准测试结果:

benchmarkbookstore_listbooksbenchmarkbookstore_listbooks-12                      732           1647454 ns/op           85974 b/op       1989 allocs/opbenchmarkbookstorebatch_listbooksbenchmarkbookstorebatch_listbooks-12                1202            937491 ns/op           61098 b/op        853 allocs/op

登录后复制

多么大的进步啊! bookstorebatch 比 bookstore 快 1.75 倍。

但是为什么 bookstorebatch 比 bookstore 快?

服务器每次向客户端发送消息流.send(),都需要
对消息进行编码并通过网络发送。通过发送多个文件
我们立即减少了服务器需要编码和发送的次数
消息,不仅提高了服务器的性能,还提高了
对于需要解码消息的客户端。

在上面的例子中,批量大小设置为10,但客户端可以根据网络情况和文档大小进行调整。

使用双向流来控制流量

书店示例返回所有书籍并完成流,但如果客户端
需要实时观察事件(例如传感器),使用双向
直播是正确的选择。

双向流有点棘手,因为客户端和服务器都
可以同时发送和接收消息。希望 golang 能让这一切变得简单
像这样处理并发。

如前所述,传感器是双向流的一个很好的例子。
监视功能允许客户端决定监视和请求哪些传感器
如果需要的话,当前值。

让我们看一下下面的protobuf定义:

service sensor {  rpc watch(stream watchrequest) returns (stream watchresponse) {}}message watchrequest {  oneof request {    watchcreaterequest create_request = 1;    watchcancelrequest cancel_request = 2;    watchnowrequest now_request = 3;  }}message watchcreaterequest {  // sensor_id contains the sensor id to watch.  string sensor_id = 1;}message watchcancelrequest {  // sensor_id contains the sensor id to cancel.  string sensor_id = 1;}message watchnowrequest {  // sensor_id contains the sensor id to get the current value.  string sensor_id = 1;}message watchresponse {  // sensor_id contains the sensor id for the current response.  string sensor_id = 1;  // created is true if the watch was created successfully.  bool created = 2;  // canceleted is true if the watch was canceled successfully or if the creation failed.  bool canceleted = 3;  // error contains the error message if something went wrong.  string error = 4;  // timestamp contains the timestamp of the value.  google.protobuf.timestamp timestamp = 5;  // value contains the value of the sensor.  int32 value = 6;}

登录后复制

请求消息不仅仅是消息流,更是一条可以
包含不同类型的请求。 oneof 指令允许我们定义一个
只能包含指定类型之一的字段。

传感器的 golang 代码将被忽略,但您可以在这里找到它

serverstream 包装流和传感器数据,使其更易于使用。

type serverstream struct {    s           *sensorservice         // service    stream      pb.sensor_watchserver  // stream    sendch      chan *pb.watchresponse // control channel    sensorch    chan sensordata        // data channel    sensorwatch map[string]int         // map of sensor id to watch id}

登录后复制

如前所述,服务器可以同时发送和接收消息,一个
函数将处理传入的消息,另一个函数将处理
传出消息。

接收消息:

func (ss *serverstream) recvloop() error {    defer ss.close()    for {        req, err := ss.stream.recv()        if errors.is(err, io.eof) {            return nil        }        if err != nil {            return err        }        switch req := req.request.(type) {        case *pb.watchrequest_createrequest:            // ignore validation (check the full code)            // create a channel to send data to the client            id := sensor.watch(ss.sensorch)            ss.sensorwatch[sensorid] = id            // send created message            ss.sendch <- &pb.watchresponse{                sensorid: sensorid,                created:  true,            }        case *pb.watchrequest_cancelrequest:            // ignore validation (check the full code)            // cancel the watch            ss.s.sensors[sensorid].cancel(id)            delete(ss.sensorwatch, sensorid)            ss.sendch <- &pb.watchresponse{                sensorid:   sensorid,                canceleted: true,            }        case *pb.watchrequest_nowrequest:            // ignore validation (check the full code)            // send current value            ss.sendch <- &pb.watchresponse{                sensorid:  sensorid,                timestamp: timestamppb.now(),                value:     int32(sensor.read()),            }        }    }}

登录后复制

switch语句用于处理不同类型的请求并做出决定
如何处理每个请求。只保留r​​ecvloop 函数很重要
读取消息但不向客户端发送消息,因此我们有 sendloop
将从控制通道读取消息并将其发送到客户端。

发送消息:

func (ss *serverstream) sendloop() {    for {        select {        case m, ok := <-ss.sendch:            if !ok {                return            }            // send message            if err := ss.stream.send(m); err != nil {                return            }        case data, ok := <-ss.sensorch:            if !ok {                return            }            // send data            if err := ss.stream.send(&pb.watchresponse{                sensorid:  data.id,                timestamp: timestamppb.new(data.time),                value:     int32(data.val),            }); err != nil {                return            }        case <-ss.stream.context().done():            return        }    }}

登录后复制

sendloop函数读取控制通道和数据通道并发送
发送给客户端的消息。如果流关闭,该函数将返回。

最后,传感器服务的快乐路径测试:

func TestSensor(t *testing.T) {    conn := newServer(t, func(s grpc.ServiceRegistrar) {        pb.RegisterSensorServer(s, &sensorService{            sensors: newSensors(),        })    })    client := pb.NewSensorClient(conn)    stream, err := client.Watch(context.Background())    if err != nil {        t.Fatalf("failed to watch: %v", err)    }    response := make(chan *pb.WatchResponse)    // Go routine to read from the stream    go func() {        defer close(response)        for {            resp, err := stream.Recv()            if errors.Is(err, io.EOF) {                return            }            if err != nil {                return            }            response <- resp        }    }()    createRequest(t, stream, "temp")    waitUntilCreated(t, response, "temp")    waitForSensorData(t, response, "temp")    createRequest(t, stream, "pres")    waitUntilCreated(t, response, "pres")    waitForSensorData(t, response, "pres")    waitForSensorData(t, response, "temp")    waitForSensorData(t, response, "pres")    // invalid sensor    createRequest(t, stream, "invalid")    waitUntilCanceled(t, response, "invalid")    nowRequest(t, stream, "light")    waitForSensorData(t, response, "light")    // Wait for 2 seconds to make sure we don't receive any data for light    waitForNoSensorData(t, response, "light", 2*time.Second)    cancelRequest(t, stream, "temp")    waitUntilCanceled(t, response, "temp")    waitForSensorData(t, response, "pres")    // Wait for 2 seconds to make sure we don't receive any data for temp    waitForNoSensorData(t, response, "temp", 2*time.Second)    err = stream.CloseSend()    if err != nil {        t.Fatalf("failed to close send: %v", err)    }}

登录后复制

从上面的测试中我们可以看到客户端可以创建、取消、获取当前
传感器的值。客户端还可以同时观看多个传感器。

挑战自己

使用 grpc 流实现聊天应用程序。修改传感器服务以一次发送多个值以节省往返次数。嗅探网络流量以查看一元请求和流式请求之间的区别。

结论

grpc 流是一种用于构建实时应用程序的多功能且强大的工具。
通过遵循最佳实践,例如仅在必要时使用流式传输、有效地批处理数据以及明智地利用双向流式传输,开发人员可以最大限度地提高性能
并保持代码简单性。
虽然 grpc 流式传输带来了复杂性,但其好处远远超过了挑战
当深思熟虑地应用时。

保持联系

如果您有任何问题或反馈,请随时在 linkedin 上与我联系。

以上就是gRPC 流:最佳实践和性能见解的详细内容,更多请关注【创想鸟】其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2477209.html

(0)
上一篇 2025年3月4日 19:20:32
下一篇 2025年2月19日 21:40:12

AD推荐 黄金广告位招租... 更多推荐

相关推荐

  • PostGolang 打印函数

    系列:golang golang 中的打印函数 在 golang 中,有多个函数可用于打印文本,每个函数都服务于特定的用例。以下是最常用的打印功能的说明: 1.fmt.打印 描述:将提供的参数打印为纯文本,而不添加换行符。它不会格式化输出。…

    2025年3月4日
    200
  • PnR:具有 Go 平台抽象的配置意图驱动的容器编排

    您是否曾经希望容器编排能够比静态依赖链更灵活,但又比 kubernetes 更简单?满足 pnr(提示和响应) – 一种配置驱动的方法,利用 go 强大的平台抽象功能根据实际的就绪状态而不是简单的依赖关系来编排容器。 go 平台…

    编程技术 2025年3月4日
    200
  • Starknet 交易批量处理程序

    抽象的 本文介绍了 metacube 中使用的交易批处理程序,用于即时发送玩家赚取的 nft。它解释了批处理程序基于参与者的可扩展架构,并提供了 go 中的详细实现。 所有代码片段都可以在关联的 github 存储库中找到。 建筑学 巴彻由…

    2025年3月4日
    200
  • 每个后端开发人员都应该知道的热门库

    Go语言凭借其简洁性、性能和并发优势,已成为后端开发的热门选择。虽然Go标准库功能强大,但许多第三方库能显著提升开发效率和代码质量。 本文将介绍五个必备的Go语言库,助您高效构建API、管理数据库、完善日志记录等。 1. Gin 高性能HT…

    2025年3月4日
    200
  • 使用 OpenAI、Go 和 PostgreSQL (pgvector) 构建语义搜索引擎

    近年来,向量嵌入技术已成为自然语言处理(NLP)和语义搜索的核心。与传统的关键词搜索不同,向量数据库通过比较文本的向量表示(嵌入)来理解文本的语义含义。本示例展示如何结合OpenAI嵌入、Go语言和PostgreSQL数据库(以及pgvec…

    2025年3月4日
    200
  • 微服务中的事务:SAGA 模式概述部分

    构建强大的分布式系统是一项极具挑战性的任务,尤其是在保证多个服务数据一致性方面。在微服务架构中,传统的数据库事务往往力不从心。这时,分布式事务便成为关键解决方案。 分布式事务能够协调多个服务间的操作,并优雅地处理各种故障。而SAGA模式是实…

    2025年3月4日
    200
  • Docker 卷

    容器化应用的关键在于数据持久化。docker容器默认情况下,删除后其内部所有数据都会丢失。解决方法是使用docker卷。它们允许数据在容器生命周期中持续存在,为任何应用提供隔离和可扩展性。 为何使用Docker卷? 持久性:创建或链接卷到容…

    2025年3月4日
    200
  • 如何使用Golang,Gin和Postgresql构建CRUD应用程序

    本教程演示如何使用Go语言、Gin框架和PostgreSQL数据库构建一个简单的CRUD (创建、读取、更新、删除) 应用。最终,您将得到一个可以管理PostgreSQL数据库中数据的基本应用。 目录 简介前提条件项目结构项目设置创建数据库…

    2025年3月4日
    200
  • Golang文本/模板中的SQL查询

    在使用Go的text/template包动态生成SQL查询构建后端API时,提高开发效率的同时,务必注意SQL注入的风险。本文将演示如何避免这种风险。 text/template的SQL注入漏洞示例 以下代码片段展示了如何通过字符串插值构建…

    2025年3月4日
    200
  • GO中的数据处理管道(Golang)

    Go语言数据处理管道详解 Go语言中的数据处理管道是一种将数据处理流程分解成一系列阶段或步骤的模式。每个阶段对数据执行特定操作,前一阶段的输出作为下一阶段的输入。这种模式广泛应用于ETL(提取、转换、加载)、流处理和批处理等场景。Go语言利…

    2025年3月4日
    200

发表回复

登录后才能评论