日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

pyhive数据库连接池使用

發(fā)布時(shí)間:2023/12/31 数据库 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 pyhive数据库连接池使用 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

python連接hive的工具可以用 pyhive 和 impala,不管是哪個(gè)配置都比較麻煩。需要的依賴包比較多。

  • https://github.com/cloudera/impyla
  • https://github.com/dropbox/PyHive

pyhive模塊沒有提供數(shù)據(jù)庫連接池的API。所以自己根據(jù)模塊 mysql-connector-python 的連接池改裝了一個(gè) pyhive 的連接池,效率會提升不少。

連接池介紹

# hive_pool.py import re from pyhive import hivetry:import queue except ImportError:import Queue as queue import threadingCONNECTION_POOL_LOCK = threading.RLock() CNX_POOL_MAXSIZE = 32 CNX_POOL_MAXNAMESIZE = 64 CNX_POOL_NAMEREGEX = re.compile(r'[^a-zA-Z0-9._:\-*$#]')class PoolError(BaseException):passdef generate_pool_name(**kwargs):parts = []for key in ('host', 'port', 'database'):try:parts.append(str(kwargs[key]))except KeyError:passif not parts:raise PoolError("Failed generating pool name; specify pool_name")return '_'.join(parts)class PooledHiveConnection(object):def __init__(self, pool, cnx):if not isinstance(pool, HiveConnectionPool):raise AttributeError("pool should be a HiveConnectionPool")if not isinstance(cnx, hive.Connection):raise AttributeError("cnx should be a hive.Connection")self._cnx_pool = poolself._cnx = cnxdef __getattr__(self, attr):return getattr(self._cnx, attr)def close(self):cnx = self._cnxself._cnx_pool.add_connection(cnx)self._cnx = None@propertydef pool_name(self):return self._cnx_pool.pool_nameclass HiveConnectionPool(object):def __init__(self, pool_size=5, pool_name=None,**kwargs):self._pool_size = Noneself._pool_name = Noneself._set_pool_size(pool_size)self._set_pool_name(pool_name or generate_pool_name(**kwargs))self._cnx_config = {}self._cnx_queue = queue.Queue(self._pool_size)if kwargs:self.set_config(**kwargs)cnt = 0while cnt < self._pool_size:self.add_connection()cnt += 1@propertydef pool_name(self):return self._pool_name@propertydef pool_size(self):return self._pool_sizedef set_config(self, **kwargs):if not kwargs:returnwith CONNECTION_POOL_LOCK:try:hive.Connection(**kwargs)self._cnx_config = kwargsexcept AttributeError as err:raise PoolError("Connection configuration not valid: {0}".format(err))def _set_pool_size(self, pool_size):if pool_size <= 0 or pool_size > CNX_POOL_MAXSIZE:raise AttributeError("Pool size should be higher than 0 and ""lower or equal to {0}".format(CNX_POOL_MAXSIZE))self._pool_size = pool_sizedef _set_pool_name(self, pool_name):if CNX_POOL_NAMEREGEX.search(pool_name):raise AttributeError("Pool name '{0}' contains illegal characters".format(pool_name))if len(pool_name) > CNX_POOL_MAXNAMESIZE:raise AttributeError("Pool name '{0}' is too long".format(pool_name))self._pool_name = pool_namedef _queue_connection(self, cnx):if not isinstance(cnx, hive.Connection):raise PoolError("Connection instance not subclass of MySQLConnection.")try:self._cnx_queue.put(cnx, block=False)except queue.Full:raise PoolError("Failed adding connection; queue is full")def add_connection(self, cnx=None):with CONNECTION_POOL_LOCK:if not self._cnx_config:raise PoolError("Connection configuration not available")if self._cnx_queue.full():raise PoolError("Failed adding connection; queue is full")if not cnx:cnx = hive.Connection(**self._cnx_config)else:if not isinstance(cnx, hive.Connection):raise PoolError("Connection instance not subclass of MySQLConnection.")self._queue_connection(cnx)def get_connection(self):with CONNECTION_POOL_LOCK:try:cnx = self._cnx_queue.get(block=False)except queue.Empty:raise PoolError("Failed getting connection; pool exhausted")return PooledHiveConnection(self, cnx)def _remove_connections(self):with CONNECTION_POOL_LOCK:cnt = 0cnxq = self._cnx_queuewhile cnxq.qsize():try:cnx = cnxq.get(block=False)cnx.close()cnt += 1except queue.Empty:return cntexcept PoolError:raisereturn cntclass ReallyHiveConnectionPool(HiveConnectionPool):def __init__(self, **hive_config):pool_size = hive_config.get('pool_size', 10)self._semaphore = threading.Semaphore(pool_size)super().__init__(**hive_config)def get_connection(self):self._semaphore.acquire()return super().get_connection()def put_connection(self, con):con.close()self._semaphore.release()

連接池代碼樣例

連接池如何使用,樣例如下。

from contextlib import contextmanagerfrom hive_pool import ReallyHiveConnectionPoolhive_config = {'host': '***.***.***.***','port': '10000','database': 'default' }conxpool = ReallyHiveConnectionPool(pool_size=10, pool_name='myhive', **hive_config)@contextmanager def get_cursor():try:# con = hive.Connection(**hive_config)con = conxpool.get_connection()cursor = con.cursor()yield cursorfinally:cursor.close()# con.close()conxpool.put_connection(con)class MYPyHive(object):"""創(chuàng)建python操作hive類"""@staticmethoddef get_all(sql):with get_cursor() as cursor:cursor.execute(sql)return cursor.fetchall()if __name__ == '__main__':def t(n):ph = MYPyHivehive_query = "show tables"r = ph.get_all(hive_query)print(str(n) + str(r))import timefrom concurrent.futures import ThreadPoolExecutors = time.time()# for i in range(20):# t(i)with ThreadPoolExecutor(max_workers=15) as pool:for i in range(20):pool.submit(t, (i))print(time.time() - s)

?

總結(jié)

以上是生活随笔為你收集整理的pyhive数据库连接池使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。