java thrift连接池_由浅入深了解Thrift之客户端连接池化
一、問(wèn)題描述
在上一篇《由淺入深了解Thrift之服務(wù)模型和序列化機(jī)制》文章中,我們已經(jīng)了解了thrift的基本架構(gòu)和網(wǎng)絡(luò)服務(wù)模型的優(yōu)缺點(diǎn)。如今的互聯(lián)網(wǎng)圈中,RPC服務(wù)化的思想如火如荼。我們又該如何將thrift服務(wù)化應(yīng)用到我們的項(xiàng)目中哪?實(shí)現(xiàn)thrift服務(wù)化前,我們先想想這幾個(gè)問(wèn)題:服務(wù)注冊(cè)、服務(wù)發(fā)現(xiàn)、服務(wù)健康檢測(cè)、服務(wù)“Load Balance”、隱藏client和server端的交互細(xì)節(jié)、服務(wù)調(diào)用端的對(duì)象池化。
服務(wù)的注冊(cè)、發(fā)現(xiàn)和健康檢測(cè),我們使用zookeeper可以很好的解決
服務(wù)“Load Balance",我們可以使用簡(jiǎn)單的算法“權(quán)重+隨機(jī)”,當(dāng)然也可以使用成熟復(fù)雜的算法
服務(wù)調(diào)用端的對(duì)象池化,我們可以使用common pool,使用簡(jiǎn)單又可以滿(mǎn)足我們的需求
二、實(shí)現(xiàn)思路
1、thrift server端啟動(dòng)時(shí),每個(gè)實(shí)例向zk集群以臨時(shí)節(jié)點(diǎn)方式注冊(cè)(這樣,遍歷zk上/server下有多少個(gè)臨時(shí)節(jié)點(diǎn)就知道有哪些server實(shí)例)
thrift server端可以單機(jī)多端口多實(shí)例或多機(jī)部署多實(shí)例方式運(yùn)行。
2、服務(wù)調(diào)用方實(shí)現(xiàn)一個(gè)連接池,連接池初始化時(shí),通過(guò)zk將在線(xiàn)的server實(shí)例信息同步到本地并緩存,同時(shí)監(jiān)聽(tīng)zk下的節(jié)點(diǎn)變化。
3、服務(wù)調(diào)用方與Server通訊時(shí),從連接池中取一個(gè)可用的連接,用它實(shí)現(xiàn)RPC調(diào)用。
三、具體實(shí)現(xiàn)
1、thrift server端
thrift server端,向zk中注冊(cè)server address
packagecom.wy.thriftpool.commzkpool;importjava.lang.instrument.IllegalClassFormatException;importjava.lang.reflect.Constructor;importorg.apache.thrift.protocol.TBinaryProtocol;importorg.apache.thrift.protocol.TBinaryProtocol.Factory;importorg.apache.thrift.server.TServer;importorg.apache.thrift.server.TThreadedSelectorServer;importorg.apache.thrift.transport.TFramedTransport;importorg.apache.thrift.transport.TNonblockingServerSocket;importorg.springframework.beans.factory.InitializingBean;importcom.wy.thrift.service.UserService.Processor;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;importcom.wy.thriftpool.commzkpool.support.ThriftServerIpTransfer;importcom.wy.thriftpool.commzkpool.support.impl.LocalNetworkIpTransfer;/*** thrift server端,向zk中注冊(cè)server address
*
*@authorwy
**/
public class ThriftServiceServerFactory implementsInitializingBean {//thrift server 服務(wù)端口
privateInteger port;//default 權(quán)重
private Integer priority = 1;//service實(shí)現(xiàn)類(lèi)
privateObject service;//thrift server 注冊(cè)路徑
privateString configPath;privateThriftServerIpTransfer ipTransfer;//thrift server注冊(cè)類(lèi)
privateThriftServerAddressReporter addressReporter;//thrift server開(kāi)啟服務(wù)
privateServerThread serverThread;
@Overridepublic void afterPropertiesSet() throwsException {if (ipTransfer == null) {
ipTransfer= newLocalNetworkIpTransfer();
}
String ip=ipTransfer.getIp();if (ip == null) {throw new NullPointerException("cant find server ip...");
}
String hostname= ip + ":" + port + ":" +priority;
Class extends Object> serviceClass =service.getClass();
ClassLoader classLoader=Thread.currentThread().getContextClassLoader();
Class>[] interfaces =serviceClass.getInterfaces();if (interfaces.length == 0) {throw new IllegalClassFormatException("service-class should implements Iface");
}//reflect,load "Processor";
Processor> processor = null;for (Class>clazz : interfaces) {
String cname=clazz.getSimpleName();if (!cname.equals("Iface")) {continue;
}
String pname= clazz.getEnclosingClass().getName() + "$Processor";try{
Class> pclass =classLoader.loadClass(pname);if (!pclass.isAssignableFrom(Processor.class)) {continue;
}
Constructor> constructor =pclass.getConstructor(clazz);
processor= (Processor>) constructor.newInstance(service);break;
}catch(Exception e) {//TODO
}
}if (processor == null) {throw new IllegalClassFormatException("service-class should implements Iface");
}//需要單獨(dú)的線(xiàn)程,因?yàn)閟erve方法是阻塞的.
serverThread = newServerThread(processor, port);
serverThread.start();//report
if (addressReporter != null) {
addressReporter.report(configPath, hostname);
}
}class ServerThread extendsThread {privateTServer server;
ServerThread(Processor> processor, int port) throwsException {//設(shè)置傳輸通道
TNonblockingServerSocket serverTransport = newTNonblockingServerSocket(port);//設(shè)置二進(jìn)制協(xié)議
Factory protocolFactory = newTBinaryProtocol.Factory();
TThreadedSelectorServer.Args tArgs= newTThreadedSelectorServer.Args(serverTransport);
tArgs.processor(processor);
tArgs.transportFactory(newTFramedTransport.Factory());
tArgs.protocolFactory(protocolFactory);int num = Runtime.getRuntime().availableProcessors() * 2 + 1;
tArgs.selectorThreads(num);
tArgs.workerThreads(num* 10);//網(wǎng)絡(luò)服務(wù)模型
server = newTThreadedSelectorServer(tArgs);
}
@Overridepublic voidrun() {try{
server.serve();
}catch(Exception e) {//TODO
}
}public voidstopServer() {
server.stop();
}
}public voidclose() {
serverThread.stopServer();
}public voidsetService(Object service) {this.service =service;
}public voidsetPriority(Integer priority) {this.priority =priority;
}public voidsetPort(Integer port) {this.port =port;
}public voidsetIpTransfer(ThriftServerIpTransfer ipTransfer) {this.ipTransfer =ipTransfer;
}public voidsetAddressReporter(ThriftServerAddressReporter addressReporter) {this.addressReporter =addressReporter;
}public voidsetConfigPath(String configPath) {this.configPath =configPath;
}
}
View Code
thrift server address注冊(cè)到zk
packagecom.wy.thriftpool.commzkpool.support.impl;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.imps.CuratorFrameworkState;importorg.apache.zookeeper.CreateMode;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;/*** thrift server address注冊(cè)到zk
*
*@authorwy
**/
public class DynamicAddressReporter implementsThriftServerAddressReporter {privateCuratorFramework zookeeper;publicDynamicAddressReporter() {
}publicDynamicAddressReporter(CuratorFramework zookeeper) {this.zookeeper =zookeeper;
}public voidsetZookeeper(CuratorFramework zookeeper) {this.zookeeper =zookeeper;
}
@Overridepublic void report(String service, String address) throwsException {if (zookeeper.getState() ==CuratorFrameworkState.LATENT) {
zookeeper.start();
zookeeper.newNamespaceAwareEnsurePath(service);
}
zookeeper.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(service+ "/i_", address.getBytes("utf-8"));
}public voidclose() {
zookeeper.close();
}
}
View Code
。。。
spring配置文件
View Code
2、服務(wù)調(diào)用端
連接池實(shí)現(xiàn)
杯了個(gè)具,為啥就不能提交。代碼在評(píng)論中。
連接池工廠(chǎng),負(fù)責(zé)與Thrift server通信
packagecom.wy.thriftpool.commzkconnpool;importjava.net.InetSocketAddress;importorg.apache.commons.pool.PoolableObjectFactory;importorg.apache.thrift.transport.TSocket;importorg.apache.thrift.transport.TTransport;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressProvider;/*** 連接池工廠(chǎng),負(fù)責(zé)與Thrift server通信
*
*@authorwy
**/
public class ThriftPoolFactory implements PoolableObjectFactory{private final Logger logger =LoggerFactory.getLogger(getClass());//超時(shí)設(shè)置
public inttimeOut;private finalThriftServerAddressProvider addressProvider;privatePoolOperationCallBack callback;publicThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback) {super();this.addressProvider =addressProvider;this.callback =callback;
}public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback, inttimeOut) {super();this.addressProvider =addressProvider;this.callback =callback;this.timeOut =timeOut;
}/*** 創(chuàng)建對(duì)象*/@Overridepublic TTransport makeObject() throwsException {try{
InetSocketAddress address=addressProvider.selector();
TTransport transport= new TSocket(address.getHostName(), address.getPort(), this.timeOut);
transport.open();if (callback != null) {
callback.make(transport);
}returntransport;
}catch(Exception e) {
logger.error("creat transport error:", e);throw newRuntimeException(e);
}
}/*** 銷(xiāo)毀對(duì)象*/@Overridepublic void destroyObject(TTransport transport) throwsException {if (transport != null &&transport.isOpen()) {
transport.close();
}
}/*** 檢驗(yàn)對(duì)象是否可以由pool安全返回*/@Overridepublic booleanvalidateObject(TTransport transport) {try{if (transport != null && transport instanceofTSocket) {
TSocket thriftSocket=(TSocket) transport;if(thriftSocket.isOpen()) {return true;
}else{return false;
}
}else{return false;
}
}catch(Exception e) {return false;
}
}
@Overridepublic void activateObject(TTransport obj) throwsException {//TODO Auto-generated method stub
}
@Overridepublic void passivateObject(TTransport obj) throwsException {//TODO Auto-generated method stub
}public static interfacePoolOperationCallBack {//創(chuàng)建成功是執(zhí)行
voidmake(TTransport transport);//銷(xiāo)毀之前執(zhí)行
voiddestroy(TTransport transport);
}
}
View Code
連接池管理
packagecom.wy.thriftpool.commzkconnpool;importorg.apache.thrift.transport.TSocket;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;/*** 連接池管理
*
*@authorwy
**/@Servicepublic classConnectionManager {private final Logger logger =LoggerFactory.getLogger(getClass());//保存local對(duì)象
ThreadLocal socketThreadSafe = new ThreadLocal();//連接提供池
@AutowiredprivateConnectionProvider connectionProvider;publicTSocket getSocket() {
TSocket socket= null;try{
socket=connectionProvider.getConnection();
socketThreadSafe.set(socket);returnsocketThreadSafe.get();
}catch(Exception e) {
logger.error("error ConnectionManager.invoke()", e);
}finally{
connectionProvider.returnCon(socket);
socketThreadSafe.remove();
}returnsocket;
}
}
View Code
spring配置文件
View Code
參考:http://www.cnblogs.com/mumuxinfei/p/3876187.html
由于本人經(jīng)驗(yàn)有限,文章中難免會(huì)有錯(cuò)誤,請(qǐng)瀏覽文章的您指正或有不同的觀(guān)點(diǎn)共同探討!
總結(jié)
以上是生活随笔為你收集整理的java thrift连接池_由浅入深了解Thrift之客户端连接池化的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 如何在C++中将filetime时间转化
- 下一篇: android:configchange