跳到主要内容
版本:0.6

如何为 GreptimeDB 开发一个 gRPC SDK

GreptimeDB 有 2 个 gRPC 服务。一个是 GreptimeDB 自己定义的,另一个是基于 Apache Arrow Flight 开发。 如果你想给用你熟悉的编程语言给 GreptimeDB 开发一个 gRPC SDK,请继续阅读本文。

目前,GreptimeDB 有 2 个 gRPC SDK,分别由 Java 和 Go 实现。参见它们的 SDK 文档:

GreptimeDatabase 服务

GreptimeDB 自定义了一个 gRPC 服务:GreptimeDatabase。你可以在这里找到它的 protobuf 描述。 GreptimeDatabase 有 2 个 RPC 方法:

service GreptimeDatabase {
rpc Handle(GreptimeRequest) returns (GreptimeResponse);

rpc HandleRequests(stream GreptimeRequest) returns (GreptimeResponse);
}

Handle 方法是一个 unary 调用:当 GreptimeDB 服务接收到一个 GreptimeRequest 请求后,它立刻处理该请求并返回一个相应的 GreptimeResponse

HandleRequests 方法则是一个 "Client Streaming RPC" 方式的调用。 它可以接受一个连续的 GreptimeRequest 请求流,持续地发给 GreptimeDB 服务。 GreptimeDB 服务会在收到流中的每个请求时立刻进行处理,并最终(流结束时)返回一个总结性的 GreptimeResponse。 通过 HandleRequests,我们可以获得一个非常高的请求吞吐量。

然而,目前 GreptimeDatabase 服务只能处理写请求。对于读请求,我们需要实现 Apache Arrow Flight 的 gRPC 客户端。

Apache Arrow Flight 服务

首先,你可以在这个仓库里找到 GreptimeDB 请求和响应的 protobuf 描述。 同时建议阅读那个仓库的 README 的 "For SDK developers" 部分。

确认 Arrow Flight RPC 官方支持你使用的编程语言。当前支持的有 C++, Java, Go, C# 和 Rust。 未来它可能支持其他语言,所以请留意它的 Implementation Status。 如果你没有找到官方支持的语言,就只能从 Arrow Flight RPC 的原始 protobuf 定义开始写 gRPC 客户端了。

现在请关注 Arrow Flight gRPC 服务的 DoGet 方法。该方法是所有 GreptimeDB 请求的入口。

DoGet 方法定义如下:

  rpc DoGet(Ticket) returns (stream FlightData) {}

要发送一个 GreptimeDB 请求,首先将其序列化为字节。然后将这些字节封装进一个 protobuf 的 Ticket 消息:

message Ticket {
bytes ticket = 1;
}

处理 GreptimeDB 的响应有一些复杂。DoGet 是一个 "Server Streaming RPC",它返回一个 FlightData 流。 FlightData 的 protobuf 定义是:

message FlightData {

/*
* The descriptor of the data. This is only relevant when a client is
* starting a new DoPut stream.
*/
FlightDescriptor flight_descriptor = 1;

/*
* Header for message data as described in Message.fbs::Message.
*/
bytes data_header = 2;

/*
* Application-defined metadata.
*/
bytes app_metadata = 3;

/*
* The actual batch of Arrow data. Preferably handled with minimal-copies
* coming last in the definition to help with sidecar patterns (it is
* expected that some implementations will fetch this field off the wire
* with specialized code to avoid extra memory copies).
*/
bytes data_body = 1000;
}

各字段的说明如下:

  • flight_descriptor 可忽略。
  • data_header 必须首先反序列化到 Message FlatBuffer 消息。 其定义可见 "message.fbs"。Message 的 header type 决定了下面两个字段如何解析。
  • app_metadata 包含了 GreptimeDB 的自定义数据。 该字段只在客户端发送了写请求(比如 InsertRequestInsert Into SQL)之后有值。 当 app_metadata 非空时,Message 的 header type 是 None
    • app_metadata 可以反序列化为 FlightMetadata
    • 对写请求的响应,FlightMetadata 只包含了 "Affected Rows"。就像 Insert Into SQL 在 MySQL 的返回值一样。 如果你不关注这个返回值,可以忽略 app_metadata 字段。
  • Message 的 header type 是 SchemaRecordbatch 时,data_body 带有读请求的实际返回数据。 你可以解析 DoGet 接口返回的 FlightData 流来得到完整的读请求结果。 通常情况下,FlightData 流的首个元素是整个读请求结果的 schema。剩下的元素则是 recordbatch。 schema 用于解析后续的 recordbatch,需要保存在某个上下文中。

这里我们提供了一个解析 DoGet 返回的 FlightData 流的伪代码例子:

Context {
schema: Schema
}

Result {
affected_rows: int
recordbatches: List<Recordbatch>
}

function decode_do_get_stream(flight_datas: List<FlightData>) -> Result {
result = Result
context = Context

for flight_data in flight_datas {
decode(flight_data, context, result);
}

return result;
}

function decode(flight_data: FlightData, context: Context, result: Result) {
message = Message::deserialize(flight_data.data_header);

switch message.header_type {
case None:
flight_metadata = FlightMetadata::deserialize(flight_data.app_metadata);

result.affected_rows = flight_metadata.affected_rows;

case Schema:
context.schema = Schema::deserialize(flight_data.data_body);

case Recordbatch:
recordbatch = Recordbatch::deserialize(
flight_data.data_body,
context.schema
);

result.recordbatches.push(recordbatch);
}
}

你也可以参考我们的 Rust clientFlightData 的解析方式。