Xsank's Blog

ThriftFlume 协议分析

背景

c++正在开发一个二方包,其中依赖了thrift,但是thrift又依赖了boost,libevent,这对于用户来说是不可接受的,所以那只能自己将网络及协议部分实现一下,本片仅分析协议部分。

flume thrift协议描述文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
namespace java org.apache.flume.thrift
struct ThriftFlumeEvent {
1: required map <string, string> headers,
2: required binary body,
}
enum Status {
OK,
FAILED,
ERROR,
UNKNOWN
}
service ThriftSourceProtocol {
Status append(1: ThriftFlumeEvent event),
Status appendBatch(1: list<ThriftFlumeEvent> events),
}

协议发送流程

screenshot.png
我们这边flume使用的是TCompactProtocol协议,他的消息头如下:
screenshot.png
具体为:

1
2
3
4
byte PROTOCOL_ID 0x82
byte VERSION_TYPE (VERSION & VERSION_MASK) | ((message.type << TYPE_SHIFT_AMOUNT) & TYPE_MASK)
varint32 MSG_SEQ_ID ++
string MSG_NAME method

其中varint32 就是一个对整形数字进行压缩的算法,及对于每一个byte,用首位部分表示时候该数字还有后缀,其余部分来表示数字实体,这样对于0-127就可以用1个byte表示,128-16384用两个byte表示,依次类推。
由于少了4个bit用来表示连续状态,所以varint想要完整的表示int的范围,需要5个byte,这在数字相对比较小的情况下是非常有利的。

其中由于定义了ThriftFlumeEvent数据结构,它的发送流程需要单独描述:
screenshot.png
在上图中可以清楚的看到依次处理headers及body的过程

代码实现

知道上面的数据及流程,就可以针对性的实现了

thrift支持的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
byte STOP = 0;
byte BYTE = 3;
byte DOUBLE = 4;
byte I16 = 6;
byte I32 = 8;
byte I64 = 10;
byte STRING = 11;
byte STRUCT = 12;
byte MAP = 13;
byte SET = 14;
byte LIST = 15;
byte ENUM = 16;

我们必须实现的类型:
byte,int32,map,string,list,struct

必须实现的逻辑方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//写部分
public void writeMessageBegin(TMessage message);
public void writeStructBegin(TStruct struct);
public void writeStructEnd();
public void writeFieldBegin(TField field);
private void writeFieldBeginInternal(TField field, byte typeOverride);
public void writeFieldStop();
public void writeMapBegin(TMap map);
public void writeListBegin(TList list);
protected void writeCollectionBegin(byte elemType, int size);
public void writeMessageEnd() {}
public void writeMapEnd() {}
public void writeListEnd() {}
public void writeSetEnd() {}
public void writeFieldEnd() {}
//读部分
public TMessage readMessageBegin();
public TStruct readStructBegin();
public TField readFieldBegin();
public TMap readMapBegin();
public TList readListBegin();

必须实现的基础方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//写部分
public void writeByte(byte b);
public void writeI16(short i16);
public void writeString(String str);
public void writeBinary(ByteBuffer bin);
private void writeBinary(byte[] buf, int offset, int length);
private void writeVarint32(int n);
private int intToZigZag(int n);
private void writeByteDirect(byte b);
private void writeByteDirect(int n);
//读部分
public byte readByte()
private int readVarint32()
private int readString();

这里我们发现出现了一个writeI16,为什么不是varint16呢?这是因为TField类型允许id为负数,之前有介绍varint32算法,但是对于负数怎么处理呢?就是上面必须实现的另一个方法,intToZigZag,代码实现如下:

1
2
3
private int intToZigZag(int n) {
return (n << 1) ^ (n >> 31);
}

及将它转化成正数,这样varint算法就通用了

总结

至此我们分析了flume 中使用thrift的协议的完整流程及相关方法,剩下的就是根据thrift库文件的实现,按需照搬过来就可以接触thrift的依赖了