thrift 协议(TProtocol)
协议大家平时都会遇到,只是没有特别注意。
像平时大家阅读文章的时候都是从上到下、从左往右 按行阅读,这可以看做一种阅读协议。 ( 备注: 古人在竹简上写的文字则是从上往下、从右往左 按列阅读。)
更详细的规则比如:作文的第一行是标题,段首要空两格的是一个自然段,遇到一个句号是一句话。
详细的标点符号用法(通信协议)参考教育部的规范 标点符号用法 - 教育部
在计算机远程方法调用时,传输的都是二进制的01,调用方(写数据)和被调用方(读数据)怎么约定通信协议的?
(1) 协议(TProtocol)的作用
协议的作用就类似于文字中的符号,作为应用拆解请求消息的边界,保证二进制数据经过网络传输后,还能被正确地还原语义。
具体点就是从二进制数据中解析出协议版本
、方法名
、消息类型
、序列Id
、序列化方式
、消息长度
、协议体
等内容。
协议在thrift中的作用如图中绿色部分所示,实现了RPC里通信协议的功能。
thrift中协议层规定了传输协议的规范,
Thrift支持让用户选择客户端与服务端之间传输通信协议,在传输协议上总体划分为文本(text)和二进制(binary)传输协议。详细分类如下:
TBinaryProtocol 二进制编码格式进行数据传输
TCompactProtocol 高效率、密集的二进制编码格式进行数据传输,会对数据进行压缩
TDebugProtocol
THeaderProtocol
TJSONProtocol 使用JSON的数据编码协议进行数据传输
TMultiplexedProtocol
TSimpleJSONProtocol 只提供JSON只写的协议,使用与通过脚本语言解析
(1.1) thrift协议设计
thrift协议里主要包含 Message、Struct、Field、Data 等内容。
(1.1.1) Message设计
<message> ::= <message-begin> <struct> <message-end>
<message-begin> ::= <method-name> <message-type> <message-seqid>
<method-name> ::= STRING
<message-type> ::= T_CALL | T_REPLY | T_EXCEPTION | T_ONEWAY
<message-seqid> ::= I32
Message里规定了 方法名
、消息类型
、序列Id
、消息序列Id
的顺序、占用空间大小和解析规则。 如下图
Binary protocol Message, strict encoding, 12+ bytes:
+--------+--------+--------+--------+--------+--------+--------+--------+--------+...+--------+--------+--------+--------+--------+
|1vvvvvvv|vvvvvvvv|unused |00000mmm| name length | name | seq id |
+--------+--------+--------+--------+--------+--------+--------+--------+--------+...+--------+--------+--------+--------+--------+
(1.1.2) Struct设计
<struct> ::= <struct-begin> <field>* <field-stop> <struct-end>
<struct-begin> ::= <struct-name>
<struct-name> ::= STRING
<field-stop> ::= T_STOP
<field> ::= <field-begin> <field-data> <field-end>
struct是Message的主要内容,里面对应idl里的n个字段,结构如图所示
(1.1.3) Field设计
<field> ::= <field-begin> <field-data> <field-end>
<field-begin> ::= <field-name> <field-type> <field-id>
<field-name> ::= STRING
<field-type> ::= T_BOOL | T_BYTE | T_I8 | T_I16 | T_I32 | T_I64 | T_DOUBLE
| T_STRING | T_BINARY | T_STRUCT | T_MAP | T_SET | T_LIST
| T_UUID
<field-id> ::= I16
<field-data> ::= I8 | I16 | I32 | I64 | DOUBLE | STRING | BINARY
<struct> | <map> | <list> | <set>
(1.1.4) 基础类型设计
<map> ::= <map-begin> <field-datum>* <map-end>
<map-begin> ::= <map-key-type> <map-value-type> <map-size>
<map-key-type> ::= <field-type>
<map-value-type> ::= <field-type>
<map-size> ::= I32
<list> ::= <list-begin> <field-data>* <list-end>
<list-begin> ::= <list-elem-type> <list-size>
<list-elem-type> ::= <field-type>
<list-size> ::= I32
<set> ::= <set-begin> <field-data>* <set-end>
<set-begin> ::= <set-elem-type> <set-size>
<set-elem-type> ::= <field-type>
<set-size> ::= I32
基础类型主要分2类,
一类是基础的数字类型,直接使用数字表示即可,占用1、2、3、4字节,
一类是字符串类型
一类是二进制数据,也就是字节数组
还有一类是集合类型,像map、list、set,稍微复杂点
(1.1.5) thrift协议设计全景图
Message、Struct、Field、Data 组合后如下图所示
<message> ::= <message-begin> <struct> <message-end>
<message-begin> ::= <method-name> <message-type> <message-seqid>
<method-name> ::= STRING
<message-type> ::= T_CALL | T_REPLY | T_EXCEPTION | T_ONEWAY
<message-seqid> ::= I32
<struct> ::= <struct-begin> <field>* <field-stop> <struct-end>
<struct-begin> ::= <struct-name>
<struct-name> ::= STRING
<field-stop> ::= T_STOP
<field> ::= <field-begin> <field-data> <field-end>
<field-begin> ::= <field-name> <field-type> <field-id>
<field-name> ::= STRING
<field-type> ::= T_BOOL | T_BYTE | T_I8 | T_I16 | T_I32 | T_I64 | T_DOUBLE
| T_STRING | T_BINARY | T_STRUCT | T_MAP | T_SET | T_LIST
| T_UUID
<field-id> ::= I16
<field-data> ::= I8 | I16 | I32 | I64 | DOUBLE | STRING | BINARY
<struct> | <map> | <list> | <set>
<map> ::= <map-begin> <field-datum>* <map-end>
<map-begin> ::= <map-key-type> <map-value-type> <map-size>
<map-key-type> ::= <field-type>
<map-value-type> ::= <field-type>
<map-size> ::= I32
<list> ::= <list-begin> <field-data>* <list-end>
<list-begin> ::= <list-elem-type> <list-size>
<list-elem-type> ::= <field-type>
<list-size> ::= I32
<set> ::= <set-begin> <field-data>* <set-end>
<set-begin> ::= <set-elem-type> <set-size>
<set-elem-type> ::= <field-type>
<set-size> ::= I32
(1.2) thrift通信协议的优缺点
优点:
1、协议特别紧凑
缺点
1、二进制协议(TBinaryProtocol) 不支持多版本,需要使用方从业务层面
(2) thrift调用demo
Java代码 https://github.com/weikeqin/thrift-tutorial-java-demo
Go代码 https://github.com/weikeqin/thrift-tutorial-go-demo
(2.1) idl
namespace java tutorial
namespace go tutorial
service Calculator extends shared.SharedService {
// 计算两数之和
i32 add(1:i32 num1, 2:i32 num2),
}
namespace java tutorial
namespace go tutorial
struct SharedStruct {
1: i32 key
2: string value
}
//
service SharedService {
SharedStruct getStruct(1: i32 key)
}
idl生成的go代码
thrift -r --gen go service.thrift
type Calculator interface {
shared.SharedService
// Parameters:
// - Num1
// - Num2
Add(ctx context.Context, num1 int32, num2 int32) (_r int32, _err error)
}
//
type CalculatorClient struct {
*shared.SharedServiceClient
}
//
type SharedServiceClient struct {
c thrift.TClient
meta thrift.ResponseMeta
}
// Parameters:
// - Num1
// - Num2
func (p *CalculatorClient) Add(ctx context.Context, num1 int32, num2 int32) (_r int32, _err error) {
// 入参
var _args3 CalculatorAddArgs
_args3.Num1 = num1
_args3.Num2 = num2
// 声明出参
var _result5 CalculatorAddResult
var _meta4 thrift.ResponseMeta
// 调用方法
_meta4, _err = p.Client_().Call(ctx, "add", &_args3, &_result5)
// 设置header
p.SetLastResponseMeta_(_meta4)
if _err != nil {
return
}
// 返回结果
return _result5.GetSuccess(), nil
}
// 写入参Struct
func (p *CalculatorAddArgs) Write(ctx context.Context, oprot thrift.TProtocol) error {
// 写Struct开始标记
if err := oprot.WriteStructBegin(ctx, "add_args"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) }
if p != nil {
// 这儿入参有几个字段就写几个字段
// 写字段1
if err := p.writeField1(ctx, oprot); err != nil { return err }
// 写字段2
if err := p.writeField2(ctx, oprot); err != nil { return err }
}
// 写字段停止
if err := oprot.WriteFieldStop(ctx); err != nil {
return thrift.PrependError("write field stop error: ", err) }
// 写字段结束标记
if err := oprot.WriteStructEnd(ctx); err != nil {
return thrift.PrependError("write struct stop error: ", err) }
return nil
}
func (p *CalculatorAddArgs) writeField1(ctx context.Context, oprot thrift.TProtocol) (err error) {
// 写字段名
if err := oprot.WriteFieldBegin(ctx, "num1", thrift.I32, 1); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:num1: ", p), err) }
// 写字段值
if err := oprot.WriteI32(ctx, int32(p.Num1)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.num1 (1) field write error: ", p), err) }
// 写字段结束标记
if err := oprot.WriteFieldEnd(ctx); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 1:num1: ", p), err) }
return err
}
func (p *CalculatorAddArgs) writeField2(ctx context.Context, oprot thrift.TProtocol) (err error) {
// 写字段名
if err := oprot.WriteFieldBegin(ctx, "num2", thrift.I32, 2); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:num2: ", p), err) }
// 写字段值
if err := oprot.WriteI32(ctx, int32(p.Num2)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.num2 (2) field write error: ", p), err) }
// 写字段结束标记
if err := oprot.WriteFieldEnd(ctx); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 2:num2: ", p), err) }
return err
}
thrift生成java代码
thrift -r --gen java service.thrift
(3) thrift go源码解析
thrift go源码版本 0.16.0
https://github.com/apache/thrift/tree/0.16.0/lib/go/thrift
TProtocol定义了基本的协议信息,包括传输什么数据,如何解析传输的数据的基本方法。
TProtocol有多种实现,包括 TBinaryProtocol、TCompactProtocol、TDebugProtocol、THeaderProtocol、TJSONProtocol、TMultiplexedProtocol、TSimpleJSONProtocol
(3.1) thrift支持的数据类型
package thrift
// Type constants in the Thrift protocol
type TType byte
const (
STOP = 0
VOID = 1
BOOL = 2
BYTE = 3
I08 = 3
DOUBLE = 4
I16 = 6
I32 = 8
I64 = 10
STRING = 11
UTF7 = 11
STRUCT = 12
MAP = 13
SET = 14
LIST = 15
UTF8 = 16
UTF16 = 17
//BINARY = 18 wrong and unusued
)
(3.2) TProtocol定义
// package thrift
type TProtocol interface {
WriteMessageBegin(name string, typeId TMessageType, seqid int32) error
WriteMessageEnd() error
WriteStructBegin(name string) error
WriteStructEnd() error
WriteFieldBegin(name string, typeId TType, id int16) error
WriteFieldEnd() error
WriteFieldStop() error
WriteMapBegin(keyType TType, valueType TType, size int) error
WriteMapEnd() error
WriteListBegin(elemType TType, size int) error
WriteListEnd() error
WriteSetBegin(elemType TType, size int) error
WriteSetEnd() error
WriteBool(value bool) error
WriteByte(value int8) error
WriteI16(value int16) error
WriteI32(value int32) error
WriteI64(value int64) error
WriteDouble(value float64) error
WriteString(value string) error
WriteBinary(value []byte) error
ReadMessageBegin() (name string, typeId TMessageType, seqid int32, err error)
ReadMessageEnd() error
ReadStructBegin() (name string, err error)
ReadStructEnd() error
ReadFieldBegin() (name string, typeId TType, id int16, err error)
ReadFieldEnd() error
ReadMapBegin() (keyType TType, valueType TType, size int, err error)
ReadMapEnd() error
ReadListBegin() (elemType TType, size int, err error)
ReadListEnd() error
ReadSetBegin() (elemType TType, size int, err error)
ReadSetEnd() error
ReadBool() (value bool, err error)
ReadByte() (value int8, err error)
ReadI16() (value int16, err error)
ReadI32() (value int32, err error)
ReadI64() (value int64, err error)
ReadDouble() (value float64, err error)
ReadString() (value string, err error)
ReadBinary() (value []byte, err error)
Skip(fieldType TType) (err error)
Flush() (err error)
Transport() TTransport
}
服务器端如何知道客户端发送过来的数据是怎么组合的,比如第一个字段是字符串类型,第二个字段是int。这个信息是在IDL生成客户端时生成的代码时提供了。Thrift生成的客户端代码提供了读写参数的方法,这两个方式是一一对应的,包括字段的序号,类型等等。客户端使用写参数的方法,服务器端使用读参数的方法。
- 方法的调用从writeMessageBegin开始,发送了消息头信息
- 写方法的参数,也就是写消息体。方法参数由一个统一的接口TBase描述,提供了read和write的统一接口。自动生成的代码提供了read, write方法参数的具体实现
- 写完结束
(3.3) TBinaryProtocol实现
TBinaryProtocol
(3.3.1) TBinaryProtocol-go实现
type TBinaryProtocol struct {
trans TRichTransport // 增强(包装)传输对象
origTransport TTransport // 传输对象 封装IO层
cfg *TConfiguration // 传输配置
buffer [64]byte // 缓冲
}
WriteMessageBegin
/**
* Writing Methods
*/
// param name 消息名称 对应方法名 "GetUserById"
// param typeId 消息类型ID thrift.CALL
// param seqId 序列号 自增
func (p *TBinaryProtocol) WriteMessageBegin(ctx context.Context, name string, typeId TMessageType, seqId int32) error {
if p.cfg.GetTBinaryStrictWrite() { // 直接写入
// 计算版本
// 十六进制:80010000 转成二进制:10000000000000010000000000000000
// TMessageType 是int32类型,但实际上目前只用了[0,4]的5个值 也就是只用到了int32类型的低3位(低3位可以表示8个值)
version := uint32(VERSION_1) | uint32(typeId)
// 写入版本 // version 长度32-bit 4字节
e := p.WriteI32(ctx, int32(version))
if e != nil {
return e
}
// 写入方法名 当前这个例子里指"GetUserById"
e = p.WriteString(ctx, name)
if e != nil {
return e
}
// 写入序列Id 32位 4字节
e = p.WriteI32(ctx, seqId)
return e
} else {
// 写入方法名 当前这个例子里指"GetUserById"
e := p.WriteString(ctx, name)
if e != nil {
return e
}
// 写入类型id typeId是thrift定义的消息类型Id int32类型 4字节
e = p.WriteByte(ctx, int8(typeId))
if e != nil {
return e
}
// 写入序列Id 32位 4字节
e = p.WriteI32(ctx, seqId)
return e
}
return nil
}
WriteMessageEnd
func (p *TBinaryProtocol) WriteMessageEnd(ctx context.Context) error {
return nil
}
WriteStructBegin
func (p *TBinaryProtocol) WriteStructBegin(ctx context.Context, name string) error {
return nil
}
WriteStructEnd
func (p *TBinaryProtocol) WriteStructEnd(ctx context.Context) error {
return nil
}
WriteFieldBegin
// param name 字段名
// param typeId 字段类型
// param id 字段对应idl里的id
func (p *TBinaryProtocol) WriteFieldBegin(ctx context.Context, name string, typeId TType, id int16) error {
// 写字段类型 8位 也就是1字节
e := p.WriteByte(ctx, int8(typeId))
if e != nil {
return e
}
// 写入idl里的id 16位 2字节
e = p.WriteI16(ctx, id)
return e
}
WriteFieldEnd
func (p *TBinaryProtocol) WriteFieldEnd(ctx context.Context) error {
return nil
}
WriteMapBegin
// keyType key类型
// valueType value类型
// size map大小
func (p *TBinaryProtocol) WriteMapBegin(ctx context.Context, keyType TType, valueType TType, size int) error {
// 写入key类型 8位 1字节
e := p.WriteByte(ctx, int8(keyType))
if e != nil {
return e
}
// 写入value类型 8位 1字节
e = p.WriteByte(ctx, int8(valueType))
if e != nil {
return e
}
// 写入map长度 32位 也就是4字节
e = p.WriteI32(ctx, int32(size))
return e
}
WriteMapEnd
func (p *TBinaryProtocol) WriteMapEnd(ctx context.Context) error {
return nil
}
WriteListBegin
func (p *TBinaryProtocol) WriteListBegin(ctx context.Context, elemType TType, size int) error {
// 写入类型 8位 1字节
e := p.WriteByte(ctx, int8(elemType))
if e != nil {
return e
}
// 写入list长度 32位 4字节
e = p.WriteI32(ctx, int32(size))
return e
}
WriteListEnd
func (p *TBinaryProtocol) WriteListEnd(ctx context.Context) error {
return nil
}
WriteSetBegin
// param
// param elemType
// param size
func (p *TBinaryProtocol) WriteSetBegin(ctx context.Context, elemType TType, size int) error {
// 写入thrift类型 8位 1字节
e := p.WriteByte(ctx, int8(elemType))
if e != nil {
return e
}
// 写入set长度
e = p.WriteI32(ctx, int32(size))
return e
}
WriteSetEnd
func (p *TBinaryProtocol) WriteSetEnd(ctx context.Context) error {
return nil
}
WriteBool
func (p *TBinaryProtocol) WriteBool(ctx context.Context, value bool) error {
if value {
// 使用WriteByte 长度8-bit 1字节
return p.WriteByte(ctx, 1)
}
// 使用WriteByte 长度8-bit 1字节
return p.WriteByte(ctx, 0)
}
WriteByte
// param ctx
// param value 写入的值
func (p *TBinaryProtocol) WriteByte(ctx context.Context, value int8) error {
// 写入byte类型值 长度8-bit 1字节
e := p.trans.WriteByte(byte(value))
return NewTProtocolException(e)
}
WriteI16
// param value 写入的值
func (p *TBinaryProtocol) WriteI16(ctx context.Context, value int16) error {
v := p.buffer[0:2]
// 把int16转成byte数组 长度16-bit 2字节
binary.BigEndian.PutUint16(v, uint16(value))
_, e := p.trans.Write(v)
return NewTProtocolException(e)
}
WriteI32
// param ctx
// param value 写入的值
func (p *TBinaryProtocol) WriteI32(ctx context.Context, value int32) error {
v := p.buffer[0:4]
// 把int32转成byte数组 长度32-bit 4字节
binary.BigEndian.PutUint32(v, uint32(value))
_, e := p.trans.Write(v)
return NewTProtocolException(e)
}
WriteI64
//
// param value 写入的值
func (p *TBinaryProtocol) WriteI64(ctx context.Context, value int64) error {
// 声明一个8字节的数组
v := p.buffer[0:8]
// 把int64转成byte数组 长度64bit 8字节
binary.BigEndian.PutUint64(v, uint64(value))
_, err := p.trans.Write(v)
return NewTProtocolException(err)
}
WriteDouble
func (p *TBinaryProtocol) WriteDouble(ctx context.Context, value float64) error {
// 先把float64转成int64 然后再调WriteI64
return p.WriteI64(ctx, int64(math.Float64bits(value)))
}
WriteString
func (p *TBinaryProtocol) WriteString(ctx context.Context, value string) error {
// 写入字符串长度
e := p.WriteI32(ctx, int32(len(value)))
if e != nil {
return e
}
// 写入字符串
_, err := p.trans.WriteString(value)
return NewTProtocolException(err)
}
WriteBinary
func (p *TBinaryProtocol) WriteBinary(ctx context.Context, value []byte) error {
// 写入byte数组的长度
e := p.WriteI32(ctx, int32(len(value)))
if e != nil {
return e
}
// 写入byte数组
_, err := p.trans.Write(value)
return NewTProtocolException(err)
}
ReadMessageBegin
// param ctx
// return name
// return typeId
// return seqId
// return error
func (p *TBinaryProtocol) ReadMessageBegin(ctx context.Context) (name string, typeId TMessageType, seqId int32, err error) {
// 读取前32位 也就是前4个字节
// 根据读取到的size大小来判断,
// 如果size<0
// 如果size>0,读取到的size大小就是字符串的长度
size, e := p.ReadI32(ctx)
if e != nil {
return "", typeId, 0, NewTProtocolException(e)
}
// 只有直接写入时,读取时读取到的size才会<0
if size < 0 {
// 类型Id 写入时通过或运算 读取时通过与运算
typeId = TMessageType(size & 0x0ff)
// 版本
version := int64(int64(size) & VERSION_MASK)
if version != VERSION_1 {
return name, typeId, seqId, NewTProtocolExceptionWithType(BAD_VERSION, fmt.Errorf("Bad version in ReadMessageBegin"))
}
// 获取方法名
name, e = p.ReadString(ctx)
if e != nil {
return name, typeId, seqId, NewTProtocolException(e)
}
// 读取32位 也就是4字节 获取序列Id
seqId, e = p.ReadI32(ctx)
if e != nil {
return name, typeId, seqId, NewTProtocolException(e)
}
return name, typeId, seqId, nil
}
if p.cfg.GetTBinaryStrictRead() {
return name, typeId, seqId, NewTProtocolExceptionWithType(BAD_VERSION, fmt.Errorf("Missing version in ReadMessageBegin"))
}
// 根据size大小读取字符串 获取方法名
name, e2 := p.readStringBody(size)
if e2 != nil {
return name, typeId, seqId, e2
}
// 读取类型typeId 对应thrift定义的类型
b, e3 := p.ReadByte(ctx)
if e3 != nil {
return name, typeId, seqId, e3
}
typeId = TMessageType(b)
// 读序Id 32位 4字节
seqId, e4 := p.ReadI32(ctx)
if e4 != nil {
return name, typeId, seqId, e4
}
return name, typeId, seqId, nil
}
ReadMessageEnd
func (p *TBinaryProtocol) ReadMessageEnd(ctx context.Context) error {
return nil
}
ReadStructBegin
func (p *TBinaryProtocol) ReadStructBegin(ctx context.Context) (name string, err error) {
return
}
ReadStructEnd
func (p *TBinaryProtocol) ReadStructEnd(ctx context.Context) error {
return nil
}
ReadFieldBegin
//
func (p *TBinaryProtocol) ReadFieldBegin(ctx context.Context) (name string, typeId TType, seqId int16, err error) {
// 获取对应的thrift字段类型 读取前8位 也就是1字节
t, err := p.ReadByte(ctx)
typeId = TType(t)
if err != nil {
return name, typeId, seqId, err
}
if t != STOP {
// 获取idl里设置的id 16位 2字节
seqId, err = p.ReadI16(ctx)
}
// 返回的name是空的
return name, typeId, seqId, err
}
ReadFieldEnd
func (p *TBinaryProtocol) ReadFieldEnd(ctx context.Context) error {
return nil
}
ReadMapBegin
func (p *TBinaryProtocol) ReadMapBegin(ctx context.Context) (kType, vType TType, size int, err error) {
// 读取8位 也就是1字节
k, e := p.ReadByte(ctx)
if e != nil {
err = NewTProtocolException(e)
return
}
// key对应的thrift类型
kType = TType(k)
// 读取8位 也就是1字节
v, e := p.ReadByte(ctx)
if e != nil {
err = NewTProtocolException(e)
return
}
// value对应的thrift类型
vType = TType(v)
// 读取32位 也就是4字节
size32, e := p.ReadI32(ctx)
if e != nil {
err = NewTProtocolException(e)
return
}
err = checkSizeForProtocol(size32, p.cfg)
if err != nil {
return
}
size = int(size32)
// key字段类型 value字段类型 map的长度
return kType, vType, size, nil
}
ReadMapEnd
func (p *TBinaryProtocol) ReadMapEnd(ctx context.Context) error {
return nil
}
ReadListBegin
//
func (p *TBinaryProtocol) ReadListBegin(ctx context.Context) (elemType TType, size int, err error) {
// 读取8位 也就是1字节
b, e := p.ReadByte(ctx)
if e != nil {
err = NewTProtocolException(e)
return
}
// 类型
elemType = TType(b)
// 读取32位 也就是4字节
size32, e := p.ReadI32(ctx)
if e != nil {
err = NewTProtocolException(e)
return
}
err = checkSizeForProtocol(size32, p.cfg)
if err != nil {
return
}
// list的长度
size = int(size32)
return
}
ReadListEnd
func (p *TBinaryProtocol) ReadListEnd(ctx context.Context) error {
return nil
}
// param ctx
func (p *TBinaryProtocol) ReadSetBegin(ctx context.Context) (elemType TType, size int, err error) {
// 读取一字节
b, e := p.ReadByte(ctx)
if e != nil {
err = NewTProtocolException(e)
return
}
// set里的元素类型
elemType = TType(b)
// 读取32位 4字节
size32, e := p.ReadI32(ctx)
if e != nil {
err = NewTProtocolException(e)
return
}
// 校验大小
err = checkSizeForProtocol(size32, p.cfg)
if err != nil {
return
}
// set大小
size = int(size32)
return elemType, size, nil
}
ReadSetEnd
func (p *TBinaryProtocol) ReadSetEnd(ctx context.Context) error {
return nil
}
ReadBool
func (p *TBinaryProtocol) ReadBool(ctx context.Context) (bool, error) {
// 读取1字节
b, e := p.ReadByte(ctx)
// bool用byte类型存储 true:1 false:0
v := true
if b != 1 {
v = false
}
return v, e
}
ReadByte
func (p *TBinaryProtocol) ReadByte(ctx context.Context) (int8, error) {
v, err := p.trans.ReadByte()
return int8(v), err
}
ReadI16
//
func (p *TBinaryProtocol) ReadI16(ctx context.Context) (value int16, err error) {
buf := p.buffer[0:2]
// 读取2字节
err = p.readAll(ctx, buf)
// 从字节数组里读取int16
value = int16(binary.BigEndian.Uint16(buf))
return value, err
}
ReadI32
func (p *TBinaryProtocol) ReadI32(ctx context.Context) (value int32, err error) {
buf := p.buffer[0:4]
// 读取4字节
err = p.readAll(ctx, buf)
// 从字节数组读取int32
value = int32(binary.BigEndian.Uint32(buf))
return value, err
}
ReadI64
func (p *TBinaryProtocol) ReadI64(ctx context.Context) (value int64, err error) {
buf := p.buffer[0:8]
// 读取8字节
err = p.readAll(ctx, buf)
// 从字节数组读取int64
value = int64(binary.BigEndian.Uint64(buf))
return value, err
}
ReadDouble
func (p *TBinaryProtocol) ReadDouble(ctx context.Context) (value float64, err error) {
buf := p.buffer[0:8]
// 读取8字节
err = p.readAll(ctx, buf)
// 字节数组转uint64 uint64转float64
value = math.Float64frombits(binary.BigEndian.Uint64(buf))
return value, err
}
ReadString
func (p *TBinaryProtocol) ReadString(ctx context.Context) (value string, err error) {
// 读取32位 也就是4字节
// size就是字符串的长度
size, e := p.ReadI32(ctx)
if e != nil {
return "", e
}
err = checkSizeForProtocol(size, p.cfg)
if err != nil {
return
}
if size == 0 {
return "", nil
}
// 大小比64字节小 直接读取
if size < int32(len(p.buffer)) {
// Avoid allocation on small reads
buf := p.buffer[:size]
read, e := io.ReadFull(p.trans, buf)
return string(buf[:read]), NewTProtocolException(e)
}
// 根据长度读取字符串
return p.readStringBody(size)
}
readStringBody
func (p *TBinaryProtocol) readStringBody(size int32) (value string, err error) {
buf, err := safeReadBytes(size, p.trans)
return string(buf), NewTProtocolException(err)
}
// 它尝试从 trans 读取 size 字节,以防止在 size 非常大(主要是由格式错误的消息引起)时进行大量分配。
// It tries to read size bytes from trans, in a way that prevents large
// allocations when size is insanely large (mostly caused by malformed message).
func safeReadBytes(size int32, trans io.Reader) ([]byte, error) {
if size < 0 {
return nil, nil
}
buf := new(bytes.Buffer)
// 读取size字节
_, err := io.CopyN(buf, trans, int64(size))
return buf.Bytes(), err
}
TJSONProtocol
// for references to _ParseContext see tsimplejson_protocol.go
// thrift JSON 协议实现。 使用简单的 JSON 协议
//
// JSON protocol implementation for thrift.
// Utilizes Simple JSON protocol
//
type TJSONProtocol struct {
// 组合(继承)TSimpleJSONProtocol的功能
*TSimpleJSONProtocol
}
// 简单 JSON 协议实现
//
// 该协议生成/使用适合脚本语言解析的简单输出格式。它不应与功能齐全的 TJSONProtocol 混淆。
//
// Simple JSON protocol implementation for thrift.
//
// This protocol produces/consumes a simple output format
// suitable for parsing by scripting languages. It should not be
// confused with the full-featured TJSONProtocol.
//
type TSimpleJSONProtocol struct {
trans TTransport // 传输
cfg *TConfiguration // 配置
parseContextStack jsonContextStack // 解析上下文栈 用[]int实现
dumpContext jsonContextStack // 导出上线文 用[]int实现
writer *bufio.Writer //
reader *bufio.Reader //
}
// package thrift // simple_json_protocol.go
var (
JSON_COMMA []byte
JSON_COLON []byte
JSON_LBRACE []byte
JSON_RBRACE []byte
JSON_LBRACKET []byte
JSON_RBRACKET []byte
JSON_QUOTE byte
JSON_QUOTE_BYTES []byte
JSON_NULL []byte
JSON_TRUE []byte
JSON_FALSE []byte
JSON_INFINITY string
JSON_NEGATIVE_INFINITY string
JSON_NAN string
JSON_INFINITY_BYTES []byte
JSON_NEGATIVE_INFINITY_BYTES []byte
JSON_NAN_BYTES []byte
)
func init() {
JSON_COMMA = []byte{','}
JSON_COLON = []byte{':'}
JSON_LBRACE = []byte{'{'}
JSON_RBRACE = []byte{'}'}
JSON_LBRACKET = []byte{'['}
JSON_RBRACKET = []byte{']'}
JSON_QUOTE = '"'
JSON_QUOTE_BYTES = []byte{'"'}
JSON_NULL = []byte{'n', 'u', 'l', 'l'}
JSON_TRUE = []byte{'t', 'r', 'u', 'e'}
JSON_FALSE = []byte{'f', 'a', 'l', 's', 'e'}
JSON_INFINITY = "Infinity"
JSON_NEGATIVE_INFINITY = "-Infinity"
JSON_NAN = "NaN"
JSON_INFINITY_BYTES = []byte{'I', 'n', 'f', 'i', 'n', 'i', 't', 'y'}
JSON_NEGATIVE_INFINITY_BYTES = []byte{'-', 'I', 'n', 'f', 'i', 'n', 'i', 't', 'y'}
JSON_NAN_BYTES = []byte{'N', 'a', 'N'}
}
WriteMessageBegin
func (p *TJSONProtocol) WriteMessageBegin(ctx context.Context, name string, typeId TMessageType, seqId int32) error {
// 重置json上下文栈 这个栈是用int[]实现的
p.resetContextStack() // THRIFT-3735
// 写入list开始 [
if e := p.OutputListBegin(); e != nil {
return e
}
// 写入thrift json 协议版本 4字节
if e := p.WriteI32(ctx, THRIFT_JSON_PROTOCOL_VERSION); e != nil {
return e
}
// 写入方法名
if e := p.WriteString(ctx, name); e != nil {
return e
}
// 写入thrift类型
if e := p.WriteByte(ctx, int8(typeId)); e != nil {
return e
}
// 写入序列Id
if e := p.WriteI32(ctx, seqId); e != nil {
return e
}
return nil
}
func (p *TSimpleJSONProtocol) OutputListBegin() error {
//
if e := p.OutputPreValue(); e != nil {
return e
}
if _, e := p.write(JSON_LBRACKET); e != nil {
return NewTProtocolException(e)
}
p.dumpContext.push(_CONTEXT_IN_LIST_FIRST)
return nil
}
WriteMessageEnd
func (p *TJSONProtocol) WriteMessageEnd(ctx context.Context) error {
//
return p.OutputListEnd()
}
//
func (p *TSimpleJSONProtocol) OutputListEnd() error {
// 写入]
if _, e := p.write(JSON_RBRACKET); e != nil {
return NewTProtocolException(e)
}
// 出栈
_, ok := p.dumpContext.pop()
if !ok {
return errEmptyJSONContextStack
}
// 需要debug
if e := p.OutputPostValue(); e != nil {
return e
}
return nil
}
func (p *TSimpleJSONProtocol) OutputPostValue() error {
cxt, ok := p.dumpContext.peek()
if !ok {
return errEmptyJSONContextStack
}
switch cxt {
case _CONTEXT_IN_LIST_FIRST:
p.dumpContext.pop()
p.dumpContext.push(_CONTEXT_IN_LIST)
case _CONTEXT_IN_OBJECT_FIRST:
p.dumpContext.pop()
p.dumpContext.push(_CONTEXT_IN_OBJECT_NEXT_VALUE)
case _CONTEXT_IN_OBJECT_NEXT_KEY:
p.dumpContext.pop()
p.dumpContext.push(_CONTEXT_IN_OBJECT_NEXT_VALUE)
case _CONTEXT_IN_OBJECT_NEXT_VALUE:
p.dumpContext.pop()
p.dumpContext.push(_CONTEXT_IN_OBJECT_NEXT_KEY)
}
return nil
}
thrift Java源码解析
Java版thrift 版本 0.16.0
源码链接: https://github.com/apache/thrift/tree/0.16.0/lib/java
TSocket transport = new TSocket("localhost", 9090);
transport.open();
TBinaryProtocol protocol = new TBinaryProtocol(transport);
TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator");
Calculator.Client service = new Calculator.Client(mp);
TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport");
WeatherReport.Client service2 = new WeatherReport.Client(mp2);
System.out.println(service.add(2,2));
System.out.println(service2.getTemperature());
() 思考
协议支持多版本
RPC协议设计中,协议可能升级,我们需要考虑协议及数据格式的多版本问题。
比如 thrift v0.10.0 和 thrift v0.16.0 里的TBinaryProtocol可以
同一thrift版本协议支持多版本idl吗?
v1版本idl 老代码
v2版本idl 新增1个字段
版本管理
idl新增字段,服务端上线后,调用端上线中,会存在v1版本的idl调用服务端,还会存v2版本的idl调用服务端。
idl新增字段,服务端上线中会存在2个版本的idl,没有新增字段的v1版本idl和有新增字段的v2版本的idl,这个时候客户端还没有上线。
向前兼容
新程序就需要同时能够解析 v1 和 v2 版本的数据。
向后兼容
老程序仍然可以读新的数据格式,也就是有向后兼容的能力。
thrift是怎么解决的
Thrift 里的 TBinaryProtocol 的实现方式也很简单,那就是顺序写入数据的过程中,不仅会写入数据的值(field-value),还会写入数据的编号(field-id)和类型(field-type);读取的时候也一样。并且,在每一条记录的结束都会写下一个标志位。
在读取数据的时候,老版本的 v1 代码,看到自己没有见过的编号(field-id 对应idl里的id)就可以跳过。
新版本的 v2 代码,对于老数据里没有的字段,也就是读不到值而已,并不会出现不兼容的情况。
顺序排列的编号,就起到了版本的作用,而我们不需要再专门去进行数据版本的管理了。
写数据逻辑
writeField1
func (p *CalculatorAddArgs) writeField1(ctx context.Context, oprot thrift.TProtocol) (err error) {
// 写字段名
if err := oprot.WriteFieldBegin(ctx, "num1", thrift.I32, 1); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:num1: ", p), err) }
// 写字段值
if err := oprot.WriteI32(ctx, int32(p.Num1)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.num1 (1) field write error: ", p), err) }
// 写字段结束标记
if err := oprot.WriteFieldEnd(ctx); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 1:num1: ", p), err) }
return err
}
WriteFieldBegin
// param name 字段名
// param typeId 字段类型
// param id 字段对应idl里的id
func (p *TBinaryProtocol) WriteFieldBegin(ctx context.Context, name string, typeId TType, id int16) error {
// 写字段类型 8位 也就是1字节
e := p.WriteByte(ctx, int8(typeId))
if e != nil {
return e
}
// 写入idl里的id 16位 2字节
e = p.WriteI16(ctx, id)
return e
}
WriteFieldEnd
func (p *TBinaryProtocol) WriteFieldEnd(ctx context.Context) error {
return nil
}
ReadFieldBegin
//
func (p *TBinaryProtocol) ReadFieldBegin(ctx context.Context) (name string, typeId TType, seqId int16, err error) {
// 获取对应的thrift字段类型 读取前8位 也就是1字节
t, err := p.ReadByte(ctx)
typeId = TType(t)
if err != nil {
return name, typeId, seqId, err
}
if t != STOP {
// 获取idl里设置的id 16位 2字节
seqId, err = p.ReadI16(ctx)
}
// 返回的name是空的
return name, typeId, seqId, err
}
ReadFieldEnd
func (p *TBinaryProtocol) ReadFieldEnd(ctx context.Context) error {
return nil
}
idl里可以删除字段吗?
如果字段调用方还在使用,不建议删除
如果字段调用方已经不使用了,可以删除字段,但别删除idl里对应的id
参考资料
[1] thrift
[2] 协议和编解码
[3] rpc之thrift入门与TBinaryProtocol源码追踪
[4] THRIFT-TPROTOCOL
[5] thrift简单示例 (go语言)
[6] 11 | 通过Thrift序列化:我们要预知未来才能向后兼容吗?