grpc 实战
我们以 Google 的 google-protobuf 为例子,用 NodeJs 搭建 grpc 客户端和服务端
proto 文件
使用 grpc 通常需要定义 protobuf 文件,文件以 .proto 结尾,我们先来看它的简单的语法
message
protobuf 中定义一个消息类型是通过关键字 message 字段指定的。消息就算需要传输的数据格式的定义
message User {
string name = 1;
int32 age = 2;
}字段规则
默认是 required,也可以是 optional 或者 repeated
- required: 代表必填字段
- optional: 代表可选字段
- repeated: 代表可重复的字段
message User {
string name = 1;
int32 age = 2;
optional string sex = 3;
repeated bool isBlack = 4;
}定义服务
定义客户端调用的方法,在方法参数中传入请求 message 和响应的 message
service SearchService {
rpc Search (SearchRequest) returns (SearchResponse);
}使用 loader 加载
要知道 protobuf 不限制使用的语言,例如 Java、Go、C++、NodeJs 等语言都能够使用 .proto 文件,来获取消息的格式。但是由于原生支持 protobuf 语言,所以如何导入就是一个问题
在 NodeJs 中,有两种方法来解决,第一种方法是使用 @grpc/proto-loader 来加载 .proto 文件,这有些类似于前端 webpack 中的 loader 思想:用 loader 来处理 JS 处理不了的语言
举个例子:
const PROTO_PATH = './protos/helloworld.proto';
const protoLoader = require('@grpc/proto-loader');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});完整的例子:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}const PROTO_PATH = __dirname + './helloworld.proto';
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const packageDefinition = protoLoader.loadSync(PROTO_PATH,{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const hello_proto = grpc.loadPackageDefinition(packageDefinition).helloworld;
/**
* Implements the SayHello RPC method.
*/
function sayHello(call, callback) {
callback(null, {message: 'Hello ' + call.request.name});
}
/**
* Starts an RPC server that receives requests for the Greeter service at the
* sample server port
*/
function main() {
var server = new grpc.Server();
server.addService(hello_proto.Greeter.service, {sayHello: sayHello});
server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
server.start();
});
}
main();const PROTO_PATH = __dirname + './helloworld.proto';
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const packageDefinition = protoLoader.loadSync(PROTO_PATH,{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const hello_proto = grpc.loadPackageDefinition(packageDefinition).helloworld;
function main() {
const argv = parseArgs(process.argv.slice(2), {
string: 'target'
});
const target = 'localhost:50051';
const client = new hello_proto.Greeter(target, grpc.credentials.createInsecure());
client.sayHello({name: 'world'}, function(err, response) {
console.log('Greeting:', response.message);
});
}
main();分别运行 server 和 client,我们将看到客户端成功输出了服务端返回的消息:
$ node server.js$ node client.js
Greeting: Hello World上面的代码是用 JavaScript 写的,因为如果是 Typescript 写的话,上面通过 loader 加载 protobuf 的方法,获取不到类型提示,写起来不好受
所以为了在 TS 中,有更好的体验,我们还会用另一种方法来加载 protobuf,就是使用编译器生成 TS 静态代码
使用 protoc 编译
我们可以使用 Protocol Buffer 编译器,把 .proto 来转成对应语言的代码,以此获得完整的类型提示,不光是 TS 语言,还有包括 Java/Python/C++/Ruby 等等 grpc 流行的语言
TIP
实际上 Js 并不是 grpc 的主流语言,所以相关的生态其实比其他语言更少些
安装
在 Linux/MacOs 上,使用 brew 进行安装:
brew install protobuf使用
可以编译成多种主流语言的代码
$ mkdir dist && protoc --cpp_out=./dist src/rpc/helloworld.proto # 生成 c++ 版本的
$ protoc --java_out=./dist src/rpc/helloworld.proto # 生成 Java 版本的生成 TS 版本需要安装额外的插件,这里我们可以选择 thesayyn/protoc-gen-ts
$ npm i -g protoc-gen-ts
$ protoc --ts_out=./dist src/rpc/helloworld.proto --ts_opt=no_namespace # 设置无 namespace我们将会得到 dist/src/rpc/helloworld.ts 文件:
dist/src/rpc/helloworld.ts
/**
* Generated by the protoc-gen-ts. DO NOT EDIT!
* compiler version: 4.24.3
* source: src/rpc/hello.proto
* git: https://github.com/thesayyn/protoc-gen-ts */
import * as pb_1 from "google-protobuf";
import * as grpc_1 from "@grpc/grpc-js";
export namespace helloworld {
export class HelloRequest extends pb_1.Message {
#one_of_decls: number[][] = [];
constructor(data?: any[] | {
name?: string;
}) {
super();
pb_1.Message.initialize(this, Array.isArray(data) ? data : [], 0, -1, [], this.#one_of_decls);
if (!Array.isArray(data) && typeof data == "object") {
if ("name" in data && data.name != undefined) {
this.name = data.name;
}
}
}
get name() {
return pb_1.Message.getFieldWithDefault(this, 1, "") as string;
}
set name(value: string) {
pb_1.Message.setField(this, 1, value);
}
static fromObject(data: {
name?: string;
}): HelloRequest {
const message = new HelloRequest({});
if (data.name != null) {
message.name = data.name;
}
return message;
}
toObject() {
const data: {
name?: string;
} = {};
if (this.name != null) {
data.name = this.name;
}
return data;
}
serialize(): Uint8Array;
serialize(w: pb_1.BinaryWriter): void;
serialize(w?: pb_1.BinaryWriter): Uint8Array | void {
const writer = w || new pb_1.BinaryWriter();
if (this.name.length)
writer.writeString(1, this.name);
if (!w)
return writer.getResultBuffer();
}
static deserialize(bytes: Uint8Array | pb_1.BinaryReader): HelloRequest {
const reader = bytes instanceof pb_1.BinaryReader ? bytes : new pb_1.BinaryReader(bytes), message = new HelloRequest();
while (reader.nextField()) {
if (reader.isEndGroup())
break;
switch (reader.getFieldNumber()) {
case 1:
message.name = reader.readString();
break;
default: reader.skipField();
}
}
return message;
}
serializeBinary(): Uint8Array {
return this.serialize();
}
static deserializeBinary(bytes: Uint8Array): HelloRequest {
return HelloRequest.deserialize(bytes);
}
}
export class HelloReply extends pb_1.Message {
#one_of_decls: number[][] = [];
constructor(data?: any[] | {
message?: string;
}) {
super();
pb_1.Message.initialize(this, Array.isArray(data) ? data : [], 0, -1, [], this.#one_of_decls);
if (!Array.isArray(data) && typeof data == "object") {
if ("message" in data && data.message != undefined) {
this.message = data.message;
}
}
}
get message() {
return pb_1.Message.getFieldWithDefault(this, 1, "") as string;
}
set message(value: string) {
pb_1.Message.setField(this, 1, value);
}
static fromObject(data: {
message?: string;
}): HelloReply {
const message = new HelloReply({});
if (data.message != null) {
message.message = data.message;
}
return message;
}
toObject() {
const data: {
message?: string;
} = {};
if (this.message != null) {
data.message = this.message;
}
return data;
}
serialize(): Uint8Array;
serialize(w: pb_1.BinaryWriter): void;
serialize(w?: pb_1.BinaryWriter): Uint8Array | void {
const writer = w || new pb_1.BinaryWriter();
if (this.message.length)
writer.writeString(1, this.message);
if (!w)
return writer.getResultBuffer();
}
static deserialize(bytes: Uint8Array | pb_1.BinaryReader): HelloReply {
const reader = bytes instanceof pb_1.BinaryReader ? bytes : new pb_1.BinaryReader(bytes), message = new HelloReply();
while (reader.nextField()) {
if (reader.isEndGroup())
break;
switch (reader.getFieldNumber()) {
case 1:
message.message = reader.readString();
break;
default: reader.skipField();
}
}
return message;
}
serializeBinary(): Uint8Array {
return this.serialize();
}
static deserializeBinary(bytes: Uint8Array): HelloReply {
return HelloReply.deserialize(bytes);
}
}
interface GrpcUnaryServiceInterface<P, R> {
(message: P, metadata: grpc_1.Metadata, options: grpc_1.CallOptions, callback: grpc_1.requestCallback<R>): grpc_1.ClientUnaryCall;
(message: P, metadata: grpc_1.Metadata, callback: grpc_1.requestCallback<R>): grpc_1.ClientUnaryCall;
(message: P, options: grpc_1.CallOptions, callback: grpc_1.requestCallback<R>): grpc_1.ClientUnaryCall;
(message: P, callback: grpc_1.requestCallback<R>): grpc_1.ClientUnaryCall;
}
interface GrpcStreamServiceInterface<P, R> {
(message: P, metadata: grpc_1.Metadata, options?: grpc_1.CallOptions): grpc_1.ClientReadableStream<R>;
(message: P, options?: grpc_1.CallOptions): grpc_1.ClientReadableStream<R>;
}
interface GrpWritableServiceInterface<P, R> {
(metadata: grpc_1.Metadata, options: grpc_1.CallOptions, callback: grpc_1.requestCallback<R>): grpc_1.ClientWritableStream<P>;
(metadata: grpc_1.Metadata, callback: grpc_1.requestCallback<R>): grpc_1.ClientWritableStream<P>;
(options: grpc_1.CallOptions, callback: grpc_1.requestCallback<R>): grpc_1.ClientWritableStream<P>;
(callback: grpc_1.requestCallback<R>): grpc_1.ClientWritableStream<P>;
}
interface GrpcChunkServiceInterface<P, R> {
(metadata: grpc_1.Metadata, options?: grpc_1.CallOptions): grpc_1.ClientDuplexStream<P, R>;
(options?: grpc_1.CallOptions): grpc_1.ClientDuplexStream<P, R>;
}
interface GrpcPromiseServiceInterface<P, R> {
(message: P, metadata: grpc_1.Metadata, options?: grpc_1.CallOptions): Promise<R>;
(message: P, options?: grpc_1.CallOptions): Promise<R>;
}
export abstract class UnimplementedGreeterService {
static definition = {
SayHello: {
path: "/helloworld.Greeter/SayHello",
requestStream: false,
responseStream: false,
requestSerialize: (message: HelloRequest) => Buffer.from(message.serialize()),
requestDeserialize: (bytes: Buffer) => HelloRequest.deserialize(new Uint8Array(bytes)),
responseSerialize: (message: HelloReply) => Buffer.from(message.serialize()),
responseDeserialize: (bytes: Buffer) => HelloReply.deserialize(new Uint8Array(bytes))
}
};
[method: string]: grpc_1.UntypedHandleCall;
abstract SayHello(call: grpc_1.ServerUnaryCall<HelloRequest, HelloReply>, callback: grpc_1.sendUnaryData<HelloReply>): void;
}
export class GreeterClient extends grpc_1.makeGenericClientConstructor(UnimplementedGreeterService.definition, "Greeter", {}) {
constructor(address: string, credentials: grpc_1.ChannelCredentials, options?: Partial<grpc_1.ChannelOptions>) {
super(address, credentials, options);
}
SayHello: GrpcUnaryServiceInterface<HelloRequest, HelloReply> = (message: HelloRequest, metadata: grpc_1.Metadata | grpc_1.CallOptions | grpc_1.requestCallback<HelloReply>, options?: grpc_1.CallOptions | grpc_1.requestCallback<HelloReply>, callback?: grpc_1.requestCallback<HelloReply>): grpc_1.ClientUnaryCall => {
return super.SayHello(message, metadata, options, callback);
};
}
}然后有了这个文件,我们搭建 grpc 服务更加简单了:
import * as grpc from '@grpc/grpc-js'
import { HelloReply, HelloRequest, UnimplementedGreeterService } from './dist/src/rpc/helloworld.ts'
class GreeterService extends UnimplementedGreeterService {
SayHello(call: grpc.ServerUnaryCall<HelloRequest, HelloReply>, callback: grpc.sendUnaryData<HelloReply>) {
callback(null, new HelloReply({message: 'Hello ' + call.request.name }));
}
}
const service = new GreeterService()
var server = new grpc.Server();
server.addService(GreeterService.definition, service);
server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
server.start();
});import { HelloRequest, GreeterClient } from './dist/src/rpc/helloworld.ts'
import * as grpc from '@grpc/grpc-js'
new GreeterClient('localhost:50051', grpc.credentials.createInsecure())
.SayHello(new HelloRequest({ name: 'world'}), (err, response) => {
console.log(response?.message)
})确保我们安装了 @grpc/grpc-js 和 google-protobuf,然后执行:
$ tsx server.ts$ tsx client.ts
Greeting: Hello World这种做法相较于使用 loader 方法,虽然失去了动态加载的功能,但是能够给我们更好的类型提示,以及编写更少的代码