背景
c++正在开发一个二方包,其中依赖了thrift,但是thrift又依赖了boost,libevent,这对于用户来说是不可接受的,所以那只能自己将网络及协议部分实现一下,本片仅分析协议部分。
flume thrift协议描述文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| namespace java org.apache.flume.thrift struct ThriftFlumeEvent { 1: required map <string, string> headers, 2: required binary body, } enum Status { OK, FAILED, ERROR, UNKNOWN } service ThriftSourceProtocol { Status append(1: ThriftFlumeEvent event), Status appendBatch(1: list<ThriftFlumeEvent> events), }
|
协议发送流程
我们这边flume使用的是TCompactProtocol协议,他的消息头如下:
具体为:
1 2 3 4
| byte PROTOCOL_ID 0x82 byte VERSION_TYPE (VERSION & VERSION_MASK) | ((message.type << TYPE_SHIFT_AMOUNT) & TYPE_MASK) varint32 MSG_SEQ_ID ++ string MSG_NAME method
|
其中varint32 就是一个对整形数字进行压缩的算法,及对于每一个byte,用首位部分表示时候该数字还有后缀,其余部分来表示数字实体,这样对于0-127就可以用1个byte表示,128-16384用两个byte表示,依次类推。
由于少了4个bit用来表示连续状态,所以varint想要完整的表示int的范围,需要5个byte,这在数字相对比较小的情况下是非常有利的。
其中由于定义了ThriftFlumeEvent数据结构,它的发送流程需要单独描述:
在上图中可以清楚的看到依次处理headers及body的过程
代码实现
知道上面的数据及流程,就可以针对性的实现了
thrift支持的数据结构
1 2 3 4 5 6 7 8 9 10 11 12
| byte STOP = 0; byte BYTE = 3; byte DOUBLE = 4; byte I16 = 6; byte I32 = 8; byte I64 = 10; byte STRING = 11; byte STRUCT = 12; byte MAP = 13; byte SET = 14; byte LIST = 15; byte ENUM = 16;
|
我们必须实现的类型:
byte
,int32
,map
,string
,list
,struct
必须实现的逻辑方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public void writeMessageBegin(TMessage message); public void writeStructBegin(TStruct struct); public void writeStructEnd(); public void writeFieldBegin(TField field); private void writeFieldBeginInternal(TField field, byte typeOverride); public void writeFieldStop(); public void writeMapBegin(TMap map); public void writeListBegin(TList list); protected void writeCollectionBegin(byte elemType, int size); public void writeMessageEnd() {} public void writeMapEnd() {} public void writeListEnd() {} public void writeSetEnd() {} public void writeFieldEnd() {} public TMessage readMessageBegin(); public TStruct readStructBegin(); public TField readFieldBegin(); public TMap readMapBegin(); public TList readListBegin();
|
必须实现的基础方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void writeByte(byte b); public void writeI16(short i16); public void writeString(String str); public void writeBinary(ByteBuffer bin); private void writeBinary(byte[] buf, int offset, int length); private void writeVarint32(int n); private int intToZigZag(int n); private void writeByteDirect(byte b); private void writeByteDirect(int n); public byte readByte(); private int readVarint32(); private int readString();
|
这里我们发现出现了一个writeI16,为什么不是varint16呢?这是因为TField类型允许id为负数,之前有介绍varint32算法,但是对于负数怎么处理呢?就是上面必须实现的另一个方法,intToZigZag,代码实现如下:
1 2 3
| private int intToZigZag(int n) { return (n << 1) ^ (n >> 31); }
|
及将它转化成正数,这样varint算法就通用了
总结
至此我们分析了flume 中使用thrift的协议的完整流程及相关方法,剩下的就是根据thrift库文件的实现,按需照搬过来就可以接触thrift的依赖了