日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java thrift连接池_由浅入深了解Thrift之客户端连接池化

發(fā)布時間:2025/3/12 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java thrift连接池_由浅入深了解Thrift之客户端连接池化 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、問題描述

在上一篇《由淺入深了解Thrift之服務(wù)模型和序列化機制》文章中,我們已經(jīng)了解了thrift的基本架構(gòu)和網(wǎng)絡(luò)服務(wù)模型的優(yōu)缺點。如今的互聯(lián)網(wǎng)圈中,RPC服務(wù)化的思想如火如荼。我們又該如何將thrift服務(wù)化應(yīng)用到我們的項目中哪?實現(xiàn)thrift服務(wù)化前,我們先想想這幾個問題:服務(wù)注冊、服務(wù)發(fā)現(xiàn)、服務(wù)健康檢測、服務(wù)“Load Balance”、隱藏client和server端的交互細節(jié)、服務(wù)調(diào)用端的對象池化。

服務(wù)的注冊、發(fā)現(xiàn)和健康檢測,我們使用zookeeper可以很好的解決

服務(wù)“Load Balance",我們可以使用簡單的算法“權(quán)重+隨機”,當(dāng)然也可以使用成熟復(fù)雜的算法

服務(wù)調(diào)用端的對象池化,我們可以使用common pool,使用簡單又可以滿足我們的需求

二、實現(xiàn)思路

1、thrift server端啟動時,每個實例向zk集群以臨時節(jié)點方式注冊(這樣,遍歷zk上/server下有多少個臨時節(jié)點就知道有哪些server實例)

thrift server端可以單機多端口多實例或多機部署多實例方式運行。

2、服務(wù)調(diào)用方實現(xiàn)一個連接池,連接池初始化時,通過zk將在線的server實例信息同步到本地并緩存,同時監(jiān)聽zk下的節(jié)點變化。

3、服務(wù)調(diào)用方與Server通訊時,從連接池中取一個可用的連接,用它實現(xiàn)RPC調(diào)用。

三、具體實現(xiàn)

1、thrift server端

thrift server端,向zk中注冊server address

packagecom.wy.thriftpool.commzkpool;importjava.lang.instrument.IllegalClassFormatException;importjava.lang.reflect.Constructor;importorg.apache.thrift.protocol.TBinaryProtocol;importorg.apache.thrift.protocol.TBinaryProtocol.Factory;importorg.apache.thrift.server.TServer;importorg.apache.thrift.server.TThreadedSelectorServer;importorg.apache.thrift.transport.TFramedTransport;importorg.apache.thrift.transport.TNonblockingServerSocket;importorg.springframework.beans.factory.InitializingBean;importcom.wy.thrift.service.UserService.Processor;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;importcom.wy.thriftpool.commzkpool.support.ThriftServerIpTransfer;importcom.wy.thriftpool.commzkpool.support.impl.LocalNetworkIpTransfer;/*** thrift server端,向zk中注冊server address

*

*@authorwy

**/

public class ThriftServiceServerFactory implementsInitializingBean {//thrift server 服務(wù)端口

privateInteger port;//default 權(quán)重

private Integer priority = 1;//service實現(xiàn)類

privateObject service;//thrift server 注冊路徑

privateString configPath;privateThriftServerIpTransfer ipTransfer;//thrift server注冊類

privateThriftServerAddressReporter addressReporter;//thrift server開啟服務(wù)

privateServerThread serverThread;

@Overridepublic void afterPropertiesSet() throwsException {if (ipTransfer == null) {

ipTransfer= newLocalNetworkIpTransfer();

}

String ip=ipTransfer.getIp();if (ip == null) {throw new NullPointerException("cant find server ip...");

}

String hostname= ip + ":" + port + ":" +priority;

Class extends Object> serviceClass =service.getClass();

ClassLoader classLoader=Thread.currentThread().getContextClassLoader();

Class>[] interfaces =serviceClass.getInterfaces();if (interfaces.length == 0) {throw new IllegalClassFormatException("service-class should implements Iface");

}//reflect,load "Processor";

Processor> processor = null;for (Class>clazz : interfaces) {

String cname=clazz.getSimpleName();if (!cname.equals("Iface")) {continue;

}

String pname= clazz.getEnclosingClass().getName() + "$Processor";try{

Class> pclass =classLoader.loadClass(pname);if (!pclass.isAssignableFrom(Processor.class)) {continue;

}

Constructor> constructor =pclass.getConstructor(clazz);

processor= (Processor>) constructor.newInstance(service);break;

}catch(Exception e) {//TODO

}

}if (processor == null) {throw new IllegalClassFormatException("service-class should implements Iface");

}//需要單獨的線程,因為serve方法是阻塞的.

serverThread = newServerThread(processor, port);

serverThread.start();//report

if (addressReporter != null) {

addressReporter.report(configPath, hostname);

}

}class ServerThread extendsThread {privateTServer server;

ServerThread(Processor> processor, int port) throwsException {//設(shè)置傳輸通道

TNonblockingServerSocket serverTransport = newTNonblockingServerSocket(port);//設(shè)置二進制協(xié)議

Factory protocolFactory = newTBinaryProtocol.Factory();

TThreadedSelectorServer.Args tArgs= newTThreadedSelectorServer.Args(serverTransport);

tArgs.processor(processor);

tArgs.transportFactory(newTFramedTransport.Factory());

tArgs.protocolFactory(protocolFactory);int num = Runtime.getRuntime().availableProcessors() * 2 + 1;

tArgs.selectorThreads(num);

tArgs.workerThreads(num* 10);//網(wǎng)絡(luò)服務(wù)模型

server = newTThreadedSelectorServer(tArgs);

}

@Overridepublic voidrun() {try{

server.serve();

}catch(Exception e) {//TODO

}

}public voidstopServer() {

server.stop();

}

}public voidclose() {

serverThread.stopServer();

}public voidsetService(Object service) {this.service =service;

}public voidsetPriority(Integer priority) {this.priority =priority;

}public voidsetPort(Integer port) {this.port =port;

}public voidsetIpTransfer(ThriftServerIpTransfer ipTransfer) {this.ipTransfer =ipTransfer;

}public voidsetAddressReporter(ThriftServerAddressReporter addressReporter) {this.addressReporter =addressReporter;

}public voidsetConfigPath(String configPath) {this.configPath =configPath;

}

}

View Code

thrift server address注冊到zk

packagecom.wy.thriftpool.commzkpool.support.impl;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.imps.CuratorFrameworkState;importorg.apache.zookeeper.CreateMode;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;/*** thrift server address注冊到zk

*

*@authorwy

**/

public class DynamicAddressReporter implementsThriftServerAddressReporter {privateCuratorFramework zookeeper;publicDynamicAddressReporter() {

}publicDynamicAddressReporter(CuratorFramework zookeeper) {this.zookeeper =zookeeper;

}public voidsetZookeeper(CuratorFramework zookeeper) {this.zookeeper =zookeeper;

}

@Overridepublic void report(String service, String address) throwsException {if (zookeeper.getState() ==CuratorFrameworkState.LATENT) {

zookeeper.start();

zookeeper.newNamespaceAwareEnsurePath(service);

}

zookeeper.create().creatingParentsIfNeeded()

.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)

.forPath(service+ "/i_", address.getBytes("utf-8"));

}public voidclose() {

zookeeper.close();

}

}

View Code

。。。

spring配置文件

View Code

2、服務(wù)調(diào)用端

連接池實現(xiàn)

杯了個具,為啥就不能提交。代碼在評論中。

連接池工廠,負責(zé)與Thrift server通信

packagecom.wy.thriftpool.commzkconnpool;importjava.net.InetSocketAddress;importorg.apache.commons.pool.PoolableObjectFactory;importorg.apache.thrift.transport.TSocket;importorg.apache.thrift.transport.TTransport;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressProvider;/*** 連接池工廠,負責(zé)與Thrift server通信

*

*@authorwy

**/

public class ThriftPoolFactory implements PoolableObjectFactory{private final Logger logger =LoggerFactory.getLogger(getClass());//超時設(shè)置

public inttimeOut;private finalThriftServerAddressProvider addressProvider;privatePoolOperationCallBack callback;publicThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback) {super();this.addressProvider =addressProvider;this.callback =callback;

}public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback, inttimeOut) {super();this.addressProvider =addressProvider;this.callback =callback;this.timeOut =timeOut;

}/*** 創(chuàng)建對象*/@Overridepublic TTransport makeObject() throwsException {try{

InetSocketAddress address=addressProvider.selector();

TTransport transport= new TSocket(address.getHostName(), address.getPort(), this.timeOut);

transport.open();if (callback != null) {

callback.make(transport);

}returntransport;

}catch(Exception e) {

logger.error("creat transport error:", e);throw newRuntimeException(e);

}

}/*** 銷毀對象*/@Overridepublic void destroyObject(TTransport transport) throwsException {if (transport != null &&transport.isOpen()) {

transport.close();

}

}/*** 檢驗對象是否可以由pool安全返回*/@Overridepublic booleanvalidateObject(TTransport transport) {try{if (transport != null && transport instanceofTSocket) {

TSocket thriftSocket=(TSocket) transport;if(thriftSocket.isOpen()) {return true;

}else{return false;

}

}else{return false;

}

}catch(Exception e) {return false;

}

}

@Overridepublic void activateObject(TTransport obj) throwsException {//TODO Auto-generated method stub

}

@Overridepublic void passivateObject(TTransport obj) throwsException {//TODO Auto-generated method stub

}public static interfacePoolOperationCallBack {//創(chuàng)建成功是執(zhí)行

voidmake(TTransport transport);//銷毀之前執(zhí)行

voiddestroy(TTransport transport);

}

}

View Code

連接池管理

packagecom.wy.thriftpool.commzkconnpool;importorg.apache.thrift.transport.TSocket;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;/*** 連接池管理

*

*@authorwy

**/@Servicepublic classConnectionManager {private final Logger logger =LoggerFactory.getLogger(getClass());//保存local對象

ThreadLocal socketThreadSafe = new ThreadLocal();//連接提供池

@AutowiredprivateConnectionProvider connectionProvider;publicTSocket getSocket() {

TSocket socket= null;try{

socket=connectionProvider.getConnection();

socketThreadSafe.set(socket);returnsocketThreadSafe.get();

}catch(Exception e) {

logger.error("error ConnectionManager.invoke()", e);

}finally{

connectionProvider.returnCon(socket);

socketThreadSafe.remove();

}returnsocket;

}

}

View Code

spring配置文件

View Code

參考:http://www.cnblogs.com/mumuxinfei/p/3876187.html

由于本人經(jīng)驗有限,文章中難免會有錯誤,請瀏覽文章的您指正或有不同的觀點共同探討!

總結(jié)

以上是生活随笔為你收集整理的java thrift连接池_由浅入深了解Thrift之客户端连接池化的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。