python多线程怎么写日志_Python日志记录在多进程下的使用
1、 問題描述
項目中,使用RotatingFileHandler根據日志文件大小來切分日志。設置文件的MaxBytes為1GB, backupCount大小為5。
經查看,發現日志文件的大小均小于10MB,且每個回滾日志文件的寫入時間也都比較接近。
2、 分析
日志文件過小,猜測是代碼有問題,或者是文件內容有丟失;日志寫入時間接近猜測是同時寫入的問題。
經檢查,代碼沒有問題,排除此原因??紤]當前使用gunicorn的多進程啟動程序,多半是多個進程同時寫入當個文件造成日志文件丟失。
logging模塊是線程安全的,但并不是進程安全的。
如何解決此問題呢?首先先過一遍Python的logging模塊在處理日志回滾的具體實現方法。
2.1 logging模塊實現日志回滾
logging中RotatingFileHandler類和TimedRotatingFileHandler類分別實現按照日志文件大小和日志文件時間來切分文件,均繼承自BaseRotatingHandler類。
BaseRotatingHandler類中實現了文件切分的觸發和執行,具體過程如下:
def emit(self, record):
"""
Emit a record.
Output the record to the file, catering for rollover as described
in doRollover().
"""
try:
if self.shouldRollover(record):
self.doRollover()
logging.FileHandler.emit(self, record)
except Exception:
self.handleError(record)
具體的執行過程shouldRollover(record)和doRollover()函數則在RotatingFileHandler類和TimedRotatingFileHandler類中實現。
以RotatingFileHandler類為例,doRollover()函數流程如下:
def doRollover(self):
if self.stream:
self.stream.close()
self.stream = None
if self.backupCount > 0:
for i in range(self.backupCount - 1, 0, -1): # 從backupCount,依次到1
sfn = self.rotation_filename("%s.%d" % (self.baseFilename, i))
dfn = self.rotation_filename("%s.%d" % (self.baseFilename, i + 1))
if os.path.exists(sfn):
if os.path.exists(dfn):
os.remove(dfn)
os.rename(sfn, dfn) # 實現將xx.log.i->xx.log.i+1
dfn = self.rotation_filename(self.baseFilename + ".1")
# ---------start-----------
if os.path.exists(dfn): # 判斷如果xx.log.1存在,則刪除xx.log.1
os.remove(dfn)
self.rotate(self.baseFilename, dfn) # 將xx.log->xx.log.1
# ----------end------------
if not self.delay:
self.stream = self._open() # 執行新的xx.log
分析如上過程,整個步驟是:
當前正在處理的日志文件名為self.baseFilename,該值self.baseFilename = os.path.abspath(filename)是設置的日志文件的絕對路徑,假設baseFilename為error.log。
當進行文件回滾的時候,會依次將error.log.i重命名為error.log.i+1。
判斷error.log.1是否存在,若存在,則刪除,將當前日志文件error.log重命名為error.log.1。
self.stream重新指向新建error.log文件。
當程序啟動多進程時,每個進程都會執行doRollover過程,若有多個進程進入臨界區,則會導致dfn被刪除多次等多種混亂操作。
2.2 多進程日志安全輸出到同一文件方案
相應的解決方法:
將日志發送到同一個進程中,由該進程負責輸出到文件中(使用Queue和QueueHandler將所有日志事件發送至一個進程中)
對日志輸出加鎖,每個進程在執行日志輸出時先獲得鎖(用多處理模塊中的Lock類來序列化對進程的文件訪問)
讓所有進程都將日志記錄至一個SocketHandler,然后用一個實現了套接字服務器的單獨進程一邊從套接字中讀取一邊將日志記錄至文件(Python手冊中提供)
3、解決方案
3.1 使用ConcurrentRotatingFileHandler包
該方法就屬于加鎖方案。
ConcurrentLogHandler 可以在多進程環境下安全的將日志寫入到同一個文件,并且可以在日志文件達到特定大小時,分割日志文件(支持按文件大小分割)。但ConcurrentLogHandler 不支持按時間分割日志文件的方式。
ConcurrentLogHandler 模塊使用文件鎖定,以便多個進程同時記錄到單個文件,而不會破壞日志事件。該模塊提供與RotatingFileHandler類似的文件循環方案。
該模塊的首要任務是保留您的日志記錄,這意味著日志文件將大于指定的最大大小(RotatingFileHandler是嚴格遵守最大文件大小),如果有多個腳本的實例同時運行并寫入同一個日志文件,那么所有腳本都應該使用ConcurrentLogHandler,不應該混合和匹配這這個類。
并發訪問通過使用文件鎖來處理,該文件鎖應確保日志消息不會被丟棄或破壞。這意味著將為寫入磁盤的每個日志消息獲取并釋放文件鎖。(在Windows上,您可能還會遇到臨時情況,必須為每個日志消息打開和關閉日志文件。)這可能會影響性能。在我的測試中,性能綽綽有余,但是如果您需要大容量或低延遲的解決方案,建議您將其放在其他地方。
這個包捆綁了portalocker來處理文件鎖定。由于使用了portalocker模塊,該模塊當前僅支持“nt”和“posix”平臺。
安裝:
pip install ConcurrentLogHandler
該模塊支持Python2.6及以后版本。
ConcurrentLogHandler的使用方法與其他handler類一致,如與RotatingFileHandler的使用方法一樣。
初始化函數及參數:
class ConcurrentRotatingFileHandler(BaseRotatingHandler):
"""
Handler for logging to a set of files, which switches from one file to the
next when the current file reaches a certain size. Multiple processes can
write to the log file concurrently, but this may mean that the file will
exceed the given size.
"""
def __init__(self, filename, mode='a', maxBytes=0, backupCount=0,
encoding=None, debug=True, delay=0):
參數含義同Python內置RotatingFileHandler類相同,具體可參考上一篇博文Python logging日志管理 - 簡書。同樣繼承自BaseRotatingHandler類。
簡單的示例:
import logging
from cloghandler import ConcurrentRotatingFileHandler
?
logger = logging.getLogger()
rotateHandler = ConcurrentRotatingFileHandler('./logs/my_logfile.log', "a", 1024*1024, 5)
logger.addHandler(rotateHandler)
logger.setLevel(logging.DEBUG)
?
logger.info('This is a info message.')
為了適應沒有ConcurrentRotatingFileHandler包的情況,增加回退使用RotatingFileHandler的代碼:
try:
from cloghandler import ConcurrentRotatingFileHandler as RFHandler
except ImportError:
from warning import warn
warn('ConcurrentRotatingFileHandler package not installed, Using builtin log handler')
from logging.handlers import RotatingFileHandler as RFHandler
運行后可以發現,會自動創建一個.lock文件,通過鎖的方式來安全的寫日志文件。
3.2 對日志輸出加鎖
TimedRotatingFileHandler類doRollover函數的主要部分如下:
def doRollover(self):
....
dfn = self.rotation_filename(self.baseFilename + "." +
time.strftime(self.suffix, timeTuple))
# -------begin-------
if os.path.exists(dfn): # 判斷如果存在dfn,則刪除
os.remove(dfn)
self.rotate(self.baseFilename, dfn) # 將當前日志文件重命名為dfn
# --------end--------
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)
if not self.delay:
self.stream = self._open()
....
修改思路:
判斷dfn文件是否已經存在,如果存在,表示已經被rename過了;如果不存在,則只允許一個進程去rename,其他進程需等待。
新建一個類繼承自TimeRotatingFileHandler,修改doRollover函數,只需處理上面代碼的注釋部分即可。如下:
class MPTimeRotatingFileHandler(TimeRotatingFileHandler):
def doRollover(self):
....
dfn = self.rotation_filename(self.baseFilename + "." +
time.strftime(self.suffix, timeTuple))
# ----modify start----
if not os.path.exists(dfn):
f = open(self.baseFilename, 'a')
fcntl.lockf(f.fileno(), fcntl.LOCK_EX)
if os.path.exists(self.baseFilename): # 判斷baseFilename是否存在
self.rotate(self.baseFilename, dfn)
# ----modify end-----
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)
....
3.3 重寫FileHandler類
logging.handlers.py中各類的繼承關系如下圖所示:
繼承關系
TimeRotatingFileHandler類就是繼承自該類,在FileHandler類中增加一些處理。
具體可參考以下博文:
在Python官方手冊中,提供了多進程中日志記錄至單個文件的方法。
logging是線程安全的,將單個進程中的多個線程日志記錄至單個文件也是支持的。但將多個進程中的日志記錄至單個文件中則不支持,因為在Python中并沒有在多個進程中實現對單個文件訪問的序列化的標準方案。
將多個進程中日志記錄至單個文件中,有以下幾個方案:
讓所有進程都將日志記錄至一個 SocketHandler,然后用一個實現了套接字服務器的單獨進程一邊從套接字中讀取一邊將日志記錄至文件。
使用 Queue 和 QueueHandler 將所有的日志事件發送至你的多進程應用的一個進程中。
3.4 單獨進程負責日志事件
一個單獨監聽進程負責監聽其他進程的日志事件,并根據自己的配置記錄。
示例:
import logging
import logging.handlers
import multiprocessing
?
from random import choice, random
import time
?
def listener_configurer():
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler('test.log', 'a', 300,10) # rotate file設置的很小,以便于查看結果
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
def listenser_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
trackback.print_exc(file=sys.stderr)
?
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]
?
LOGGERS = ['a.b.c', 'd.e.f']
?
MESSAGES = [
'Random message #1',
'Random message #2',
'Random message #3',
]
?
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue)
root = logging.getLogger()
root.addHandler(h)
root.setLevel(logging.DEBUG)
# 該循環僅記錄10個事件,這些事件具有隨機的介入延遲,然后終止
def worker_process(queue, configurer):
configurer(queue)
name = multiprocessing.current_process().name
print('Worker started:%s'%name)
for i in range(10):
time.sleep(random())
logger = logging.getLogger(choice(LOGGERS))
level = choice(LEVELS)
message = choice(MESSAGES)
logger.log(level, message)
# 創建隊列,創建并啟動監聽器,創建十個工作進程并啟動它們,等待它們完成,然后將None發送到隊列以通知監聽器完成
def main():
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, listener_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
listener.join()
if __name__ == '__main__':
main()
使用主進程中一個單獨的線程記錄日志
下面這段代碼展示了如何使用特定的日志記錄配置,例如foo記錄器使用了特殊的處理程序,將foo子系統中所有的事件記錄至一個文件mplog-foo.log。在主進程(即使是在工作進程中產生的日志事件)的日志記錄機制中將直接使用恰當的配置。
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time
?
def logger_thread(q):
while True:
record = q.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def worker_process(q):
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz', 'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lv1=l = random.choice(levles)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
?
for __name__ == '__main__':
q = Queue()
d = {
'version': 1,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed',
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed',
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'level': 'ERROR',
'formatter': 'detailed',
},
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console', 'file', 'errors']
},
}
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d'%(i+1), args=(q,))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
for wp in workers:
wp.join()
q.put(None)
lp.join()
3.5 logging.SocketHandler的方案
具體實現參考如下博客:
4、參考文獻
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的python多线程怎么写日志_Python日志记录在多进程下的使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 无线网能连但是没网络连接不了怎么办 无线
- 下一篇: websocket python爬虫_p