日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Flask和mysql多线程_Flask解析(二):Flask-Sqlalchemy与多线程、多进程

發布時間:2023/12/1 数据库 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flask和mysql多线程_Flask解析(二):Flask-Sqlalchemy与多线程、多进程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Sqlalchemy

flask-sqlalchemy的session是線程安全的,但在多進程環境下,要確保派生子進程時,父進程不存在任何的數據庫連接,可以通過調用db.get_engine(app=app).dispose()來手動銷毀已經創建的engine,然后再派生子進程。

最近線上的項目總是會報出數據庫連接相關的錯誤,比如“Command out of Sync”,“Mysql server has gone away”,“Lost databse connection”,“Package sequence out of order”等等,最終解決下來,發現以上錯誤可以分為兩種,一種是和連接丟失有關的,一種是和連接被多個線程(進程)同時使用了有關。

我們項目基于flask,有多線程的場景,也有多進程的場景。orm用的是flask的拓展flask-sqlalchemy。flask-sqlalchemy的使用必須基于flask的app實例,也就是說要在app上下文中才能使用flask-sqlalchemy,所以在某些離線(非web)場景下,我們也用到了原生的Sqlalchemy。

原生的Sqlalchemy的使用方式是

engine = create_engine(db_url)

Session = sessionmaker(bind=engine)

session = Session()

session.query(xxx)

首先要創建一個engine,engine顧名思義就是和數據庫連接的引擎。在實際發起查詢前,是不會創建任何connection的。創建engine時可以通過指定poolclass參數來指定engine使用的連接池。默認是QueuePool,也可以設置為NullPool(不使用連接池)。為了方便理解,可以把engine視為管理連接池的對象。

sqlalchemy中session和我們平時數據庫里說的session是兩個不同的概念,在平時數據庫中,session的生命周期從連接上數據庫開始,到斷開和數據庫的連接位置。但是sqlalchemy中的session更多的是一種管理連接的對象,它從連接池取出一個連接,使用連接,然后釋放連接,而自身也跟隨著銷毀。sqlalchemy中的Connection對象是管理真正數據庫連接的對象,真正的數據庫連接在sqlalchemy中是DBAPI。

默認地,如果不傳入poolclass,則使用QueuePool(具有一定數量的連接池),如果不指定pool_recycle參數,則默認數據庫連接不會刷新。也就是說連接如果不適用,則一直不去刷新它。但是問題來了,在Mysql中,輸入“show variables like "%timeout%"; ” ,可以看到有一個waittimeout,還有interacttimeout,默認值為28800(8小時),這兩個值代表著,如果8個小時內某個數據庫連接都不和mysql聯系,那么就會斷掉這個連接。所以,8個小時過去了,Mysql把連接斷掉了,但是sqlalchemy客戶端這邊卻還保持著這個連接。當某個時候該連接從連接池被取出使用時,就會拋出“Mysql server has gone away”等連接丟失的信息。

解決這個問題的辦法很簡單,只要傳入pool_recycle參數即可。特別地,在flask-sqlalchemy中不會出現這種問題,因為falsk-sqlalchemy拓展自動地幫我們注入了pool_recycle參數,默認為7200秒。

def apply_driver_hacks(self, app, sa_url, options):

"""This method is called before engine creation and used to inject

driver specific hacks into the options. The `options` parameter is

a dictionary of keyword arguments that will then be used to call

the :func:`sqlalchemy.create_engine` function.

The default implementation provides some saner defaults for things

like pool sizes for MySQL and sqlite. Also it injects the setting of

`SQLALCHEMY_NATIVE_UNICODE`.

"""

if sa_url.drivername.startswith('mysql'):

sa_url.query.setdefault('charset', 'utf8')

if sa_url.drivername != 'mysql+gaerdbms':

options.setdefault('pool_size', 10)

options.setdefault('pool_recycle', 7200)  # 默認7200秒刷新連接

elif sa_url.drivername == 'sqlite':

pool_size = options.get('pool_size')

detected_in_memory = False

if sa_url.database in (None, '', ':memory:'):

detected_in_memory = True

from sqlalchemy.pool import StaticPool

options['poolclass'] = StaticPool

if 'connect_args' not in options:

options['connect_args'] = {}

options['connect_args']['check_same_thread'] = False

# we go to memory and the pool size was explicitly set

# to 0 which is fail. Let the user know that

if pool_size == 0:

raise RuntimeError('SQLite in memory database with an '

'empty queue not possible due to data '

'loss.')

# if pool size is None or explicitly set to 0 we assume the

# user did not want a queue for this sqlite connection and

# hook in the null pool.

elif not pool_size:

from sqlalchemy.pool import NullPool

options['poolclass'] = NullPool

# if it's not an in memory database we make the path absolute.

if not detected_in_memory:

sa_url.database = os.path.join(app.root_path, sa_url.database)

unu = app.config['SQLALCHEMY_NATIVE_UNICODE']

if unu is None:

unu = self.use_native_unicode

if not unu:

options['use_native_unicode'] = False

if app.config['SQLALCHEMY_NATIVE_UNICODE'] is not None:

warnings.warn(

"The 'SQLALCHEMY_NATIVE_UNICODE' config option is deprecated and will be removed in"

" v3.0. Use 'SQLALCHEMY_ENGINE_OPTIONS' instead.",

DeprecationWarning

)

if not self.use_native_unicode:

warnings.warn(

"'use_native_unicode' is deprecated and will be removed in v3.0."

" Use the 'engine_options' parameter instead.",

DeprecationWarning

)

sessionmaker是Session定制方法,我們把engine傳入sessionmaker中,就可以得到一個session工廠,通過工廠來生產真正的session對象。但是這種生產出來的session是線程不安全的,sqlalchemy提供了scoped_session來幫助我們生產線程安全的session,原理類似于Local,就是代理session,通過線程的id來找到真正屬于本線程的session。

flask-sqlalchemy就是使用了scoped_session來保證線程安全,具體的代碼可以在Sqlalchemy中看到,構造session時,使用了scoped_session。

def create_scoped_session(self, options=None):

"""Create a :class:`~sqlalchemy.orm.scoping.scoped_session`

on the factory from :meth:`create_session`.

An extra key ``'scopefunc'`` can be set on the ``options`` dict to

specify a custom scope function. If it's not provided, Flask's app

context stack identity is used. This will ensure that sessions are

created and removed with the request/response cycle, and should be fine

in most cases.

:param options: dict of keyword arguments passed to session class in

``create_session``

"""

if options is None:

options = {}

scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)

options.setdefault('query_cls', self.Query)

return orm.scoped_session(

self.create_session(options), scopefunc=scopefunc

)

def create_session(self, options):

"""Create the session factory used by :meth:`create_scoped_session`.

The factory **must** return an object that SQLAlchemy recognizes as a session,

or registering session events may raise an exception.

Valid factories include a :class:`~sqlalchemy.orm.session.Session`

class or a :class:`~sqlalchemy.orm.session.sessionmaker`.

The default implementation creates a ``sessionmaker`` for :class:`SignallingSession`.

:param options: dict of keyword arguments passed to session class

"""

return orm.sessionmaker(class_=SignallingSession, db=self, **options)

多進程和數據庫連接

多進程環境下,要注意和數據庫連接相關的操作。

說到多進程,python里最常用的就是multiprocessing。multiprocessing在windows下和linux的表現有所區別,在此只討論linux下的表現。linux下多進程通過fork()來派生,要理解我下面說的必須先弄懂fork()是什么東西。粗略地說,每個進程都有自己的一個空間,稱為進程空間,每個進程的進程空間都是獨立的,進程與進程之間互不干擾。fork()的作用,就是將一個進程的進程空間,完完全全地copy一份,copy出來的就是子進程了,所以我們說子進程和父進程有著一模一樣的地址空間。地址空間就是進程運行的空間,這空間里會有進程已經打開的文件描述符,文件描述符會間接地指向進程已經打開的文件。也就是說,fork()之后,父進程,子進程會有相同的文件描述符,指向相同的一個文件。為什么?因為文件是存在硬盤里的,fork()時copy的內存中的進程空間,并沒有把文件也copy一份。這就導致了,父進程,子進程,同時指向同一個文件,他們任意一個都可以對這個文件進行操作。這和本文說的數據庫有啥關系?順著這個思路想,數據庫連接是不是一個TCP連接?TCP連接是不是一個socket?socket在linux下是什么,就是一個文件。所以說,如果父進程在fork()之前打開了數據庫連接,那么子進程也會擁有這個打開的連接。

兩個進程同時寫一個連接會導致數據混亂,所以會出現“Command out of sync”的錯誤,兩個進程同時讀一個連接,會導致一個進程讀到了,另一個沒讀到,就是“No result”。一個進程關閉了連接,另一個進程并不知道,它試圖去操作連接時,就會出現“Lost database connection”的錯誤。

在此討論的場景是,父進程在派生子進程之前,父進程擁有已打開的數據庫連接。派生出子進程之后,子進程也就擁有了相應的連接。如果在fork()之前父進程沒有打開數據庫連接,那么也不用擔心這個問題。比如Celery使用的prefork池,雖然是多進程模型,但是celery在派子進程前時不會打開數據庫連接的,所以不用擔心在celery任務中會出現數據庫連接混亂的問題。

我做的項目里的多進程的場景之一就是使用tornado來跑web應用,在派生多個web應用實例時,確保此前創建的數據庫連接被銷毀。

app = Flask()

db = Sqlalchemy()

db.init_app(app)

...

...

db.get_engine(app=app).dispose()  # 先銷毀已有的engine,確保父進程沒有數據庫連接

...

...

fork() # 派生子進程

# 例如

tornado.start()  # 啟動多個web實例進程

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Flask和mysql多线程_Flask解析(二):Flask-Sqlalchemy与多线程、多进程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。