Skip to content
0

grpc 实战

我们以 Google 的 google-protobuf 为例子,用 NodeJs 搭建 grpc 客户端和服务端

proto 文件

使用 grpc 通常需要定义 protobuf 文件,文件以 .proto 结尾,我们先来看它的简单的语法

message

protobuf 中定义一个消息类型是通过关键字 message 字段指定的。消息就算需要传输的数据格式的定义

message User {
    string name = 1;
    int32 age = 2;
}

字段规则

默认是 required,也可以是 optional 或者 repeated

  • required: 代表必填字段
  • optional: 代表可选字段
  • repeated: 代表可重复的字段
message User {
    string name = 1;
    int32 age = 2;
    optional string sex = 3;
    repeated bool isBlack = 4;
}

定义服务

定义客户端调用的方法,在方法参数中传入请求 message 和响应的 message

service SearchService {
    rpc Search (SearchRequest) returns (SearchResponse);
}

使用 loader 加载

要知道 protobuf 不限制使用的语言,例如 JavaGoC++NodeJs 等语言都能够使用 .proto 文件,来获取消息的格式。但是由于原生支持 protobuf 语言,所以如何导入就是一个问题

在 NodeJs 中,有两种方法来解决,第一种方法是使用 @grpc/proto-loader 来加载 .proto 文件,这有些类似于前端 webpack 中的 loader 思想:用 loader 来处理 JS 处理不了的语言

举个例子:

const PROTO_PATH = './protos/helloworld.proto';
const protoLoader = require('@grpc/proto-loader');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
});

完整的例子:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}

  rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}
const PROTO_PATH = __dirname + './helloworld.proto';

const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const packageDefinition = protoLoader.loadSync(PROTO_PATH,{
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true
});
const hello_proto = grpc.loadPackageDefinition(packageDefinition).helloworld;

/**
 * Implements the SayHello RPC method.
 */
function sayHello(call, callback) {
  callback(null, {message: 'Hello ' + call.request.name});
}

/**
 * Starts an RPC server that receives requests for the Greeter service at the
 * sample server port
 */
function main() {
  var server = new grpc.Server();
  server.addService(hello_proto.Greeter.service, {sayHello: sayHello});
  server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
    server.start();
  });
}

main();
const PROTO_PATH = __dirname + './helloworld.proto';

const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const packageDefinition = protoLoader.loadSync(PROTO_PATH,{
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true
});
const hello_proto = grpc.loadPackageDefinition(packageDefinition).helloworld;

function main() {
  const argv = parseArgs(process.argv.slice(2), {
    string: 'target'
  });
  const target = 'localhost:50051';
  const client = new hello_proto.Greeter(target, grpc.credentials.createInsecure());

  client.sayHello({name: 'world'}, function(err, response) {
    console.log('Greeting:', response.message);
  });
}

main();

分别运行 serverclient,我们将看到客户端成功输出了服务端返回的消息:

$ node server.js
$ node client.js
Greeting: Hello World

上面的代码是用 JavaScript 写的,因为如果是 Typescript 写的话,上面通过 loader 加载 protobuf 的方法,获取不到类型提示,写起来不好受

所以为了在 TS 中,有更好的体验,我们还会用另一种方法来加载 protobuf,就是使用编译器生成 TS 静态代码

使用 protoc 编译

我们可以使用 Protocol Buffer 编译器,把 .proto 来转成对应语言的代码,以此获得完整的类型提示,不光是 TS 语言,还有包括 Java/Python/C++/Ruby 等等 grpc 流行的语言

TIP

实际上 Js 并不是 grpc 的主流语言,所以相关的生态其实比其他语言更少些

安装

Linux/MacOs 上,使用 brew 进行安装:

brew install protobuf

使用

可以编译成多种主流语言的代码

$ mkdir dist && protoc --cpp_out=./dist src/rpc/helloworld.proto # 生成 c++ 版本的
$ protoc --java_out=./dist src/rpc/helloworld.proto # 生成 Java 版本的

生成 TS 版本需要安装额外的插件,这里我们可以选择 thesayyn/protoc-gen-ts

$ npm i -g protoc-gen-ts
$ protoc --ts_out=./dist src/rpc/helloworld.proto --ts_opt=no_namespace # 设置无 namespace

我们将会得到 dist/src/rpc/helloworld.ts 文件:

dist/src/rpc/helloworld.ts
 /**
 * Generated by the protoc-gen-ts.  DO NOT EDIT!
 * compiler version: 4.24.3
 * source: src/rpc/hello.proto
 * git: https://github.com/thesayyn/protoc-gen-ts */
import * as pb_1 from "google-protobuf";
import * as grpc_1 from "@grpc/grpc-js";
export namespace helloworld {
    export class HelloRequest extends pb_1.Message {
        #one_of_decls: number[][] = [];
        constructor(data?: any[] | {
            name?: string;
        }) {
            super();
            pb_1.Message.initialize(this, Array.isArray(data) ? data : [], 0, -1, [], this.#one_of_decls);
            if (!Array.isArray(data) && typeof data == "object") {
                if ("name" in data && data.name != undefined) {
                    this.name = data.name;
                }
            }
        }
        get name() {
            return pb_1.Message.getFieldWithDefault(this, 1, "") as string;
        }
        set name(value: string) {
            pb_1.Message.setField(this, 1, value);
        }
        static fromObject(data: {
            name?: string;
        }): HelloRequest {
            const message = new HelloRequest({});
            if (data.name != null) {
                message.name = data.name;
            }
            return message;
        }
        toObject() {
            const data: {
                name?: string;
            } = {};
            if (this.name != null) {
                data.name = this.name;
            }
            return data;
        }
        serialize(): Uint8Array;
        serialize(w: pb_1.BinaryWriter): void;
        serialize(w?: pb_1.BinaryWriter): Uint8Array | void {
            const writer = w || new pb_1.BinaryWriter();
            if (this.name.length)
                writer.writeString(1, this.name);
            if (!w)
                return writer.getResultBuffer();
        }
        static deserialize(bytes: Uint8Array | pb_1.BinaryReader): HelloRequest {
            const reader = bytes instanceof pb_1.BinaryReader ? bytes : new pb_1.BinaryReader(bytes), message = new HelloRequest();
            while (reader.nextField()) {
                if (reader.isEndGroup())
                    break;
                switch (reader.getFieldNumber()) {
                    case 1:
                        message.name = reader.readString();
                        break;
                    default: reader.skipField();
                }
            }
            return message;
        }
        serializeBinary(): Uint8Array {
            return this.serialize();
        }
        static deserializeBinary(bytes: Uint8Array): HelloRequest {
            return HelloRequest.deserialize(bytes);
        }
    }
    export class HelloReply extends pb_1.Message {
        #one_of_decls: number[][] = [];
        constructor(data?: any[] | {
            message?: string;
        }) {
            super();
            pb_1.Message.initialize(this, Array.isArray(data) ? data : [], 0, -1, [], this.#one_of_decls);
            if (!Array.isArray(data) && typeof data == "object") {
                if ("message" in data && data.message != undefined) {
                    this.message = data.message;
                }
            }
        }
        get message() {
            return pb_1.Message.getFieldWithDefault(this, 1, "") as string;
        }
        set message(value: string) {
            pb_1.Message.setField(this, 1, value);
        }
        static fromObject(data: {
            message?: string;
        }): HelloReply {
            const message = new HelloReply({});
            if (data.message != null) {
                message.message = data.message;
            }
            return message;
        }
        toObject() {
            const data: {
                message?: string;
            } = {};
            if (this.message != null) {
                data.message = this.message;
            }
            return data;
        }
        serialize(): Uint8Array;
        serialize(w: pb_1.BinaryWriter): void;
        serialize(w?: pb_1.BinaryWriter): Uint8Array | void {
            const writer = w || new pb_1.BinaryWriter();
            if (this.message.length)
                writer.writeString(1, this.message);
            if (!w)
                return writer.getResultBuffer();
        }
        static deserialize(bytes: Uint8Array | pb_1.BinaryReader): HelloReply {
            const reader = bytes instanceof pb_1.BinaryReader ? bytes : new pb_1.BinaryReader(bytes), message = new HelloReply();
            while (reader.nextField()) {
                if (reader.isEndGroup())
                    break;
                switch (reader.getFieldNumber()) {
                    case 1:
                        message.message = reader.readString();
                        break;
                    default: reader.skipField();
                }
            }
            return message;
        }
        serializeBinary(): Uint8Array {
            return this.serialize();
        }
        static deserializeBinary(bytes: Uint8Array): HelloReply {
            return HelloReply.deserialize(bytes);
        }
    }
    interface GrpcUnaryServiceInterface<P, R> {
        (message: P, metadata: grpc_1.Metadata, options: grpc_1.CallOptions, callback: grpc_1.requestCallback<R>): grpc_1.ClientUnaryCall;
        (message: P, metadata: grpc_1.Metadata, callback: grpc_1.requestCallback<R>): grpc_1.ClientUnaryCall;
        (message: P, options: grpc_1.CallOptions, callback: grpc_1.requestCallback<R>): grpc_1.ClientUnaryCall;
        (message: P, callback: grpc_1.requestCallback<R>): grpc_1.ClientUnaryCall;
    }
    interface GrpcStreamServiceInterface<P, R> {
        (message: P, metadata: grpc_1.Metadata, options?: grpc_1.CallOptions): grpc_1.ClientReadableStream<R>;
        (message: P, options?: grpc_1.CallOptions): grpc_1.ClientReadableStream<R>;
    }
    interface GrpWritableServiceInterface<P, R> {
        (metadata: grpc_1.Metadata, options: grpc_1.CallOptions, callback: grpc_1.requestCallback<R>): grpc_1.ClientWritableStream<P>;
        (metadata: grpc_1.Metadata, callback: grpc_1.requestCallback<R>): grpc_1.ClientWritableStream<P>;
        (options: grpc_1.CallOptions, callback: grpc_1.requestCallback<R>): grpc_1.ClientWritableStream<P>;
        (callback: grpc_1.requestCallback<R>): grpc_1.ClientWritableStream<P>;
    }
    interface GrpcChunkServiceInterface<P, R> {
        (metadata: grpc_1.Metadata, options?: grpc_1.CallOptions): grpc_1.ClientDuplexStream<P, R>;
        (options?: grpc_1.CallOptions): grpc_1.ClientDuplexStream<P, R>;
    }
    interface GrpcPromiseServiceInterface<P, R> {
        (message: P, metadata: grpc_1.Metadata, options?: grpc_1.CallOptions): Promise<R>;
        (message: P, options?: grpc_1.CallOptions): Promise<R>;
    }
    export abstract class UnimplementedGreeterService {
        static definition = {
            SayHello: {
                path: "/helloworld.Greeter/SayHello",
                requestStream: false,
                responseStream: false,
                requestSerialize: (message: HelloRequest) => Buffer.from(message.serialize()),
                requestDeserialize: (bytes: Buffer) => HelloRequest.deserialize(new Uint8Array(bytes)),
                responseSerialize: (message: HelloReply) => Buffer.from(message.serialize()),
                responseDeserialize: (bytes: Buffer) => HelloReply.deserialize(new Uint8Array(bytes))
            }
        };
        [method: string]: grpc_1.UntypedHandleCall;
        abstract SayHello(call: grpc_1.ServerUnaryCall<HelloRequest, HelloReply>, callback: grpc_1.sendUnaryData<HelloReply>): void;
    }
    export class GreeterClient extends grpc_1.makeGenericClientConstructor(UnimplementedGreeterService.definition, "Greeter", {}) {
        constructor(address: string, credentials: grpc_1.ChannelCredentials, options?: Partial<grpc_1.ChannelOptions>) {
            super(address, credentials, options);
        }
        SayHello: GrpcUnaryServiceInterface<HelloRequest, HelloReply> = (message: HelloRequest, metadata: grpc_1.Metadata | grpc_1.CallOptions | grpc_1.requestCallback<HelloReply>, options?: grpc_1.CallOptions | grpc_1.requestCallback<HelloReply>, callback?: grpc_1.requestCallback<HelloReply>): grpc_1.ClientUnaryCall => {
            return super.SayHello(message, metadata, options, callback);
        };
    }
}

然后有了这个文件,我们搭建 grpc 服务更加简单了:

import * as grpc from '@grpc/grpc-js'
import { HelloReply, HelloRequest, UnimplementedGreeterService } from './dist/src/rpc/helloworld.ts'

class GreeterService extends UnimplementedGreeterService {
    SayHello(call: grpc.ServerUnaryCall<HelloRequest, HelloReply>, callback: grpc.sendUnaryData<HelloReply>) {
        callback(null, new HelloReply({message: 'Hello ' + call.request.name }));
    }
}

const service = new GreeterService()

var server = new grpc.Server();
server.addService(GreeterService.definition, service);
server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
    server.start();
});
import { HelloRequest, GreeterClient } from './dist/src/rpc/helloworld.ts'
import * as grpc from '@grpc/grpc-js'

new GreeterClient('localhost:50051', grpc.credentials.createInsecure())
.SayHello(new HelloRequest({ name: 'world'}), (err, response) => {
    console.log(response?.message)
})

确保我们安装了 @grpc/grpc-jsgoogle-protobuf,然后执行:

$ tsx server.ts
$ tsx client.ts
Greeting: Hello World

这种做法相较于使用 loader 方法,虽然失去了动态加载的功能,但是能够给我们更好的类型提示,以及编写更少的代码

Released under the MIT License.