Thrift源码解析--TBinaryProtocol
本文為原創(chuàng):http://www.cnblogs.com/leehfly/p/4958206.html,未經(jīng)許可禁止轉(zhuǎn)載。
關(guān)于Tprotocol層都是一些通信協(xié)議,個(gè)人感覺(jué)內(nèi)容較大,很難分類(lèi)描述清楚。故打算以TBinaryProtocol為例,分析客戶(hù)端發(fā)請(qǐng)求以及接收服務(wù)端返回?cái)?shù)據(jù)的整個(gè)過(guò)程。
先將客戶(hù)端的測(cè)試用例貼上。
1 public class DemoClient {2 public static void main(String[] args) throws Exception{3 String param1 = "haha";4 Map<String, String> param3 = new HashMap<String, String>();5 param3.put("1", "2");6 Parameter param2 = new Parameter(10, "kaka");7 8 TSocket socket = new TSocket("127.0.0.1", 7911);9 socket.setTimeout(3000);
10 TTransport transport = socket;
11 transport.open();
12 TProtocol protocol = new TBinaryProtocol(transport);
13 DemoService.Client client = new DemoService.Client.Factory().getClient(protocol);
14 int result = client.demoMethod(param1, param2, param3);
15 System.out.println("result: " + result);
16 transport.close();
17 } 首先就是構(gòu)造transport,這里由于TSocket extens TIOStreamTransport,因此可構(gòu)造一個(gè)TSocket即可,而TSocket包含:host(主機(jī)IP),port(端口號(hào)),time_out(超時(shí)時(shí)間)與一個(gè)Socket。
1 public TSocket(String host, int port, int timeout) {
2 host_ = host;
3 port_ = port;
4 timeout_ = timeout;
5 initSocket();
6 } 對(duì)于socket.setTimeout(3000);實(shí)際操作就是為T(mén)Socket中的socket設(shè)置timeout
1 public void setTimeout(int timeout) {
2 timeout_ = timeout;
3 try {
4 socket_.setSoTimeout(timeout);
5 } catch (SocketException sx) {
6 LOGGER.warn("Could not set socket timeout.", sx);
7 }
8 } ?下圖是構(gòu)造的transport直觀構(gòu)造:包含了host,inputStream,outputStream,port,socket,timeout.
transport.open所做的事情就是初始化一些輸入輸出流并且connect the socket to the InetSocketAddress
1 /**2 * Connects the socket, creating a new socket object if necessary.3 */4 public void open() throws TTransportException {5 if (isOpen()) {6 throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");7 }8 9 if (host_.length() == 0) {
10 throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
11 }
12 if (port_ <= 0) {
13 throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port.");
14 }
15
16 if (socket_ == null) {
17 initSocket();
18 }
19
20 try {
21 socket_.connect(new InetSocketAddress(host_, port_), timeout_);
22 inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);//均采用緩沖模式輸入輸出流
23 outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
24 } catch (IOException iox) {
25 close();
26 throw new TTransportException(TTransportException.NOT_OPEN, iox);
27 }
28 } 再看一下open之后的transport:
接下來(lái)就是在已有transport也就是TSocket的基礎(chǔ)之上,完成Tprotocol的構(gòu)建,這里選擇了TBinaryProtocol。這個(gè)工作實(shí)際上就是將上一步建好的Ttransport關(guān)聯(lián)到Tprotocol上來(lái)。相當(dāng)于進(jìn)一步封裝。
1 public abstract class TProtocol {2 3 /**4 * Prevent direct instantiation5 */6 @SuppressWarnings("unused")7 private TProtocol() {}8 9 /**
10 * Transport
11 */
12 protected TTransport trans_;
13
14 /**
15 * Constructor
16 */
17 protected TProtocol(TTransport trans) {
18 trans_ = trans;
19 }
20
21 /**
22 * Transport accessor
23 */
24 public TTransport getTransport() {
25 return trans_;
26 }
27 /**各種讀寫(xiě)方法略去
28 */
29 } 從TProtocol的構(gòu)造方法中可以看出,實(shí)際上就是將上一步生成的Transport賦與TProtocol中的trans_變量并將strictRead_與strictWrite_賦值。
1 /**2 * Constructor3 */4 public TBinaryProtocol(TTransport trans) {5 this(trans, false, true);6 }7 8 public TBinaryProtocol(TTransport trans, boolean strictRead, boolean strictWrite) {9 super(trans);
10 strictRead_ = strictRead;
11 strictWrite_ = strictWrite;
12 } 其中還有一些字節(jié)數(shù)組的初始化工作。
1 private byte [] bout = new byte[1];2 3 4 private byte[] i16out = new byte[2];5 6 7 private byte[] i32out = new byte[4];8 9 10 private byte[] i64out = new byte[8]; 11
?
這時(shí)候一切準(zhǔn)備就緒。Tprotocol目前狀態(tài)如下圖:
Tprotocol已經(jīng)準(zhǔn)備就緒,接下來(lái)的工作就是new 一個(gè)client,然后才可以去與服務(wù)端進(jìn)行請(qǐng)求與響應(yīng)。下面我把一個(gè)client的代碼全部粘貼出來(lái)。
1 public static class Client extends org.apache.thrift.TServiceClient implements Iface {2 public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {3 public Factory() {}4 public Client getClient(org.apache.thrift.protocol.TProtocol prot) {//通過(guò)Tprotocol去構(gòu)造client5 return new Client(prot);6 }7 public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {8 return new Client(iprot, oprot);9 }
10 }
11
12 public Client(org.apache.thrift.protocol.TProtocol prot)
13 {
14 super(prot, prot);//使用了相同的Tprotocol進(jìn)行構(gòu)造
15 }
16
17 public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
18 super(iprot, oprot);
19 }
20
21 public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException
22 {
23 send_demoMethod(param1, param2, param3);
24 return recv_demoMethod();
25 }
26
27 public void send_demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException
28 {
29 demoMethod_args args = new demoMethod_args();
30 args.setParam1(param1);
31 args.setParam2(param2);
32 args.setParam3(param3);
33 sendBase("demoMethod", args);
34 }
35
36 public int recv_demoMethod() throws org.apache.thrift.TException
37 {
38 demoMethod_result result = new demoMethod_result();
39 receiveBase(result, "demoMethod");
40 if (result.isSetSuccess()) {
41 return result.success;
42 }
43 throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "demoMethod failed: unknown result");
44 }
45
46 } 為了理解客戶(hù)端構(gòu)造的具體過(guò)程,我把TserviceClient.class的部分源碼貼出來(lái):
1 public TServiceClient(TProtocol iprot, TProtocol oprot) {2 iprot_ = iprot;3 oprot_ = oprot;4 }5 6 protected TProtocol iprot_;7 protected TProtocol oprot_;8 9 protected int seqid_;
10
11 /**
12 * Get the TProtocol being used as the input (read) protocol.
13 * @return the TProtocol being used as the input (read) protocol.
14 */
15 public TProtocol getInputProtocol() {
16 return this.iprot_;
17 }
18
19 /**
20 * Get the TProtocol being used as the output (write) protocol.
21 * @return the TProtocol being used as the output (write) protocol.
22 */
23 public TProtocol getOutputProtocol() {
24 return this.oprot_;
25 } 明顯的可以看到,client有三個(gè)變量,TProtocol類(lèi)型的iprot_和oprot_,還有一個(gè)順序號(hào)seqid_.由于在構(gòu)造client的過(guò)程中使用了相同的Tprotocol,在這里也就是使用了相同的TBinaryProtocol,因此iprot_與oprot_是相同的,都指向上一步生成的TProtocol,也就是TBinaryProtocol.當(dāng)DemoService.Client client = new DemoService.Client.Factory().getClient(protocol);執(zhí)行完畢后,client的狀態(tài)如下圖:
client已經(jīng)準(zhǔn)備完畢,我們調(diào)用client的方法就可以向服務(wù)端發(fā)送請(qǐng)求了。而這個(gè)過(guò)程的總體代碼也就那么一點(diǎn)點(diǎn),先直接貼出來(lái):
1 public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException2 {3 send_demoMethod(param1, param2, param3);//發(fā)送請(qǐng)求4 return recv_demoMethod();//接收響應(yīng)5 }6 7 public void send_demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException8 {9 demoMethod_args args = new demoMethod_args();//封裝請(qǐng)求參數(shù)demoMethod_args
10 args.setParam1(param1);
11 args.setParam2(param2);
12 args.setParam3(param3);
13 sendBase("demoMethod", args);//發(fā)請(qǐng)求
14 }
15
16 public int recv_demoMethod() throws org.apache.thrift.TException
17 {
18 demoMethod_result result = new demoMethod_result();//封裝接收響應(yīng)數(shù)據(jù)demoMethod_result,貌似與demoMethod_args還不一樣
19 receiveBase(result, "demoMethod");//接收返回?cái)?shù)據(jù)
20 if (result.isSetSuccess()) {
21 return result.success;
22 }
23 throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "demoMethod failed: unknown result");
24 } 當(dāng)執(zhí)行完demoMethod_args args = new demoMethod_args();之后,其實(shí)就是對(duì)demoMethod_args中的靜態(tài)變量進(jìn)行了初始化,STRUCT_DESC,PARAM1_FIELD_DESC,PARAM2_FIELD_DESC,schemes,PARAM3_FIELD_DESC,metaDataMap等都有了初始值。args.setParam之后,demoMethod_args的狀態(tài):
接下來(lái)就是:
1 protected void sendBase(String methodName, TBase args) throws TException {
2 oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));//注意這里的++seqid,就是發(fā)送請(qǐng)求的序號(hào),遞增
3 args.write(oprot_);
4 oprot_.writeMessageEnd();
5 oprot_.getTransport().flush();//這里最終其實(shí)就是outputStream進(jìn)行flush
6 } 將methodName:?demoMethod, args:?demoMethod_args(param1:haha, param2:Parameter(id:10, name:kaka), param3:{1=2})寫(xiě)入Tprotocol,在這里是oprot_。
1 public void writeMessageBegin(TMessage message) throws TException {2 if (strictWrite_) {3 int version = VERSION_1 | message.type;//異或形成版本號(hào)4 writeI32(version);//寫(xiě)入版本號(hào)5 writeString(message.name);//寫(xiě)方法名6 writeI32(message.seqid);//方法序號(hào)7 } else {8 writeString(message.name);9 writeByte(message.type);
10 writeI32(message.seqid);
11 }
12 } 1 public void writeString(String str) throws TException {
2 try {
3 byte[] dat = str.getBytes("UTF-8");
4 writeI32(dat.length);
5 trans_.write(dat, 0, dat.length);
6 } catch (UnsupportedEncodingException uex) {
7 throw new TException("JVM DOES NOT SUPPORT UTF-8");
8 }
9 } 1 public void writeI32(int i32) throws TException {
2 i32out[0] = (byte)(0xff & (i32 >> 24));
3 i32out[1] = (byte)(0xff & (i32 >> 16));
4 i32out[2] = (byte)(0xff & (i32 >> 8));
5 i32out[3] = (byte)(0xff & (i32));
6 trans_.write(i32out, 0, 4);
7 } 1 /**2 * Writes to the underlying output stream if not null.3 */4 public void write(byte[] buf, int off, int len) throws TTransportException {5 if (outputStream_ == null) {6 throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");7 }8 try {9 outputStream_.write(buf, off, len);
10 } catch (IOException iox) {
11 throw new TTransportException(TTransportException.UNKNOWN, iox);
12 }
13 } 從以上代碼可以看出來(lái),無(wú)論怎么寫(xiě),都是一層層深入的,TProtocol?oprot_ ----->Ttransport trans_ ----->OutputStream?outputStream(TODO:這里的outputStream其實(shí)也是bufferedOutputStream,也就是剛剛初始化transport的時(shí)候那個(gè)outputstream.其中比較奇葩的是args_.write,其代碼如下,最后還是繞到了oprot.write,只不過(guò)這里有Struct,Field.目測(cè)這里用? schemes.get(oprot.getScheme()).getScheme().write(oprot, this);就是因?yàn)閍rgs的一些參數(shù)在靜態(tài)初始化的時(shí)候已經(jīng)放入了schemes
1 public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
2 schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
3 } 1 public void write(org.apache.thrift.protocol.TProtocol oprot, demoMethod_args struct) throws org.apache.thrift.TException {2 struct.validate();3 4 oprot.writeStructBegin(STRUCT_DESC);5 if (struct.param1 != null) {6 oprot.writeFieldBegin(PARAM1_FIELD_DESC);7 oprot.writeString(struct.param1);8 oprot.writeFieldEnd();9 }
10 if (struct.param2 != null) {
11 oprot.writeFieldBegin(PARAM2_FIELD_DESC);
12 struct.param2.write(oprot);
13 oprot.writeFieldEnd();
14 }
15 if (struct.param3 != null) {
16 oprot.writeFieldBegin(PARAM3_FIELD_DESC);
17 {
18 oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.param3.size()));
19 for (Map.Entry<String, String> _iter4 : struct.param3.entrySet())
20 {
21 oprot.writeString(_iter4.getKey());
22 oprot.writeString(_iter4.getValue());
23 }
24 oprot.writeMapEnd();
25 }
26 oprot.writeFieldEnd();
27 }
28 oprot.writeFieldStop();
29 oprot.writeStructEnd();
30 }
31
32 } 到此為止,send_domoMethod完畢,接下來(lái)就是recv_demoMethod()也就是接受服務(wù)端返回的數(shù)據(jù)。
1 public int recv_demoMethod() throws org.apache.thrift.TException
2 {
3 demoMethod_result result = new demoMethod_result();//與封裝請(qǐng)求參數(shù)類(lèi)似,加入一些內(nèi)容到schema中
4 receiveBase(result, "demoMethod");//讀取數(shù)據(jù)進(jìn)行一些組裝工作
5 if (result.isSetSuccess()) {
6 return result.success;//返回result中的success值
7 }
8 throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "demoMethod failed: unknown result");
9 } 1 protected void receiveBase(TBase result, String methodName) throws TException {//讀取返回結(jié)果,并將返回結(jié)果組裝好放到result中2 TMessage msg = iprot_.readMessageBegin();3 if (msg.type == TMessageType.EXCEPTION) {4 TApplicationException x = TApplicationException.read(iprot_);5 iprot_.readMessageEnd();6 throw x;7 }8 if (msg.seqid != seqid_) {9 throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
10 }
11 result.read(iprot_);//將所讀取的數(shù)據(jù)封裝成需要類(lèi)型返回
12 iprot_.readMessageEnd();//這一步其實(shí)什么也沒(méi)做,到此為止result其實(shí)已經(jīng)形成
13 } 由于寫(xiě)入的時(shí)候有寫(xiě)入信息的類(lèi)型,序號(hào)之類(lèi)的東西,故這里讀取和寫(xiě)入保持一致,也要readMessageBegin,只不過(guò)這里使用的是iprot_,其實(shí)還是Tprotocol。Tprotocol iprot_ ----->Ttransport trans_ ----->InputStream inputstream
1 public TMessage readMessageBegin() throws TException {2 int size = readI32();3 if (size < 0) {4 int version = size & VERSION_MASK;5 if (version != VERSION_1) {6 throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin");7 }8 return new TMessage(readString(), (byte)(size & 0x000000ff), readI32());9 } else {
10 if (strictRead_) {
11 throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?");
12 }
13 return new TMessage(readStringBody(size), readByte(), readI32());
14 }
15 } 其中result.read(iprot_)還是對(duì)應(yīng)著寫(xiě)入時(shí)候的args.write,代碼貼出來(lái):
1 private static class demoMethod_resultStandardScheme extends StandardScheme<demoMethod_result> {2 3 public void read(org.apache.thrift.protocol.TProtocol iprot, demoMethod_result struct) throws org.apache.thrift.TException {4 org.apache.thrift.protocol.TField schemeField;5 iprot.readStructBegin();6 while (true)7 {8 schemeField = iprot.readFieldBegin();9 if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
10 break;
11 }
12 switch (schemeField.id) {
13 case 0: // SUCCESS
14 if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
15 struct.success = iprot.readI32();//在這里讀取返回結(jié)果,這些結(jié)果的結(jié)構(gòu)都是早已經(jīng)定義好的,因?yàn)槲覀冞@里的例子是int類(lèi)型,故這里只需要讀取readI32即可
16 struct.setSuccessIsSet(true);
17 } else {
18 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
19 }
20 break;
21 default:
22 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
23 }
24 iprot.readFieldEnd();
25 }
26 iprot.readStructEnd();
27
28 // check for required fields of primitive type, which can't be checked in the validate method
29 struct.validate();
30 }
31
32 public void write(org.apache.thrift.protocol.TProtocol oprot, demoMethod_result struct) throws org.apache.thrift.TException {
33 struct.validate();
34
35 oprot.writeStructBegin(STRUCT_DESC);
36 oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
37 oprot.writeI32(struct.success);
38 oprot.writeFieldEnd();
39 oprot.writeFieldStop();
40 oprot.writeStructEnd();
41 }
42
43 } 綜上,整個(gè)客戶(hù)端發(fā)請(qǐng)求以及接受返回?cái)?shù)據(jù)也就是先寫(xiě)后讀的一個(gè)完整過(guò)程也就完畢。整體流程圖我就用從網(wǎng)上找到的一個(gè)例子來(lái)看就好了,除了方法不一樣,其他都是一樣的道理。
本文為博主原創(chuàng),未經(jīng)許可禁止轉(zhuǎn)載。謝謝。
做人第一,做學(xué)問(wèn)第二。轉(zhuǎn)載于:https://www.cnblogs.com/xumaojun/p/8526522.html
總結(jié)
以上是生活随笔為你收集整理的Thrift源码解析--TBinaryProtocol的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 萌萌棉花糖车和萝莉兔梦幻拖纱换荧光珍珠礼
- 下一篇: ssh免密连接远程服务器