python分布式定时任务_分布式定时任务框架——python定时任务框架APScheduler扩展...
如果將定時任務(wù)部署在一臺服務(wù)器上,那么這個定時任務(wù)就是整個系統(tǒng)的單點(diǎn),這臺服務(wù)器出現(xiàn)故障的話會影響服務(wù)。對于可以冗余的任務(wù)(重復(fù)運(yùn)行不影響服務(wù)),可以部署在多臺服務(wù)器上,讓他們同時執(zhí)行,這樣就可以很簡單的避免單點(diǎn)。但是如果任務(wù)不允許冗余,最多只能有一臺服務(wù)器執(zhí)行任務(wù),那么前面的方法顯然行不通。本篇文章就向大家介紹如何避免這種互斥任務(wù)的單點(diǎn)問題,最后再介紹一下基于APScheduler的分布式定時任務(wù)框架,這個框架是通過多個項(xiàng)目的實(shí)踐總結(jié)而成的。
對于運(yùn)行在同一臺服務(wù)器上的兩個進(jìn)程,可以通過加鎖實(shí)現(xiàn)互斥執(zhí)行,而對于運(yùn)行在多個服務(wù)器上的任務(wù)仍然可以通過用加鎖實(shí)現(xiàn)互斥,不過這個鎖是分布式鎖。這個分布式鎖并沒有那么神秘,實(shí)際上只要一個提供原子性的數(shù)據(jù)庫即可。比如,在數(shù)據(jù)庫的locks表里有一個記錄(lock record),包含屬性:
name:鎖的名字,互斥的任務(wù)需要用名字相同的鎖。
active_ip:持有鎖的服務(wù)器的ip。
update_time:上次持有鎖的時間,其他非活躍的服務(wù)器通過這個屬性判斷活躍的服務(wù)器是否超時,如果超時,則會爭奪鎖。
一個持有鎖的服務(wù)器通過不斷的發(fā)送心跳,來更新這個記錄,心跳的內(nèi)容就是持有鎖的時間戳(update_time),以及本機(jī)ip。也就是說,通過發(fā)送心跳來保證當(dāng)前的服務(wù)器是活躍的,而其他服務(wù)器通過lock record中的update_time來判斷當(dāng)前活躍的服務(wù)器是否超時,一旦超時,其他的服務(wù)器就會去爭奪鎖,接管任務(wù)的執(zhí)行,并發(fā)送心跳更新active_ip。
通過上面描述,這個框架中最重要的兩個概念就是分布式鎖和心跳。下面看一下分布式定時任務(wù)框架中是如何實(shí)現(xiàn)這兩點(diǎn)的。當(dāng)然,這個框架依賴于APScheduler,所以必須安裝這個模塊,具體APScheduler的介紹見我的另一篇文章,因?yàn)橐蕾嘇PScheduler,所以這個框架很簡單,只有一個類:
from apscheduler.scheduler import Scheduler
import datetime
import time
import socket
import struct
import fcntl
def get_ip(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24])
class MutexScheduler(Scheduler):
def __init__(self, gconfig={}, **options):
Scheduler.__init__(self, gconfig, **options)
self.ip = get_ip('eth0')
def mutex(self, lock = None, heartbeat = None, lock_else = None,
unactive_interval = datetime.timedelta(seconds = 30)):
def mutex_func_gen(func):
def mtx_func():
if lock:
lock_rec = lock()
now = datetime.datetime.now()
# execute mutex job when the server is active, or the other server is timeout.
if not lock_rec or lock_rec['active_ip'] == self.ip or (lock_rec['update_time'] and now - lock_rec['update_time'] >= unactive_interval):
if lock_rec:
del lock_rec['active_ip']
del lock_rec['update_time']
if not lock_rec:
lock_rec = {}
lock_attrs = func(**lock_rec)
if not lock_attrs:
lock_attrs = {}
# send heart beat
heartbeat(self.ip, now, **lock_attrs)
else:
lock_else(lock_rec)
else:
func()
return mtx_func
self.mtx_func_gen = mutex_func_gen
def inner(func):
return func
return inner
def cron_schedule(self, **options):
def inner(func):
if hasattr(self, 'mtx_func_gen'):
func = self.mtx_func_gen(func)
func.job = self.add_cron_job(func, **options)
return func
return inner
mutex方法是核心,通過裝飾器的方式提供互斥功能。在使用時:
@sched.mutex(lock = my_lock, heartbeat = my_heartbeat)
@sched.cron_schedule(second = '*')
def my_job(**attrs):
print 'my_job ticks'
mutex裝飾器必須用在cron_schedule裝飾器之前,mutex主要是組裝job。mutex的參數(shù)有:
lock:函數(shù),用于獲取鎖記錄(lock record),函數(shù)原型:lock()。lock的返回值時dict,就是鎖記錄內(nèi)容。
heartbeat:函數(shù),用于發(fā)出心跳,函數(shù)原型:heartbeat(ip, now, **attrs)。ip是本機(jī)ip;now是當(dāng)前時間戳;attrs是一個dict,用于在鎖記錄中存放一些其他用戶自定義信息。
lock_else:函數(shù),在沒有獲得鎖時執(zhí)行,函數(shù)原型:lock_else(lock_rec)。lock_rec是鎖記錄,包含active_ip,update_time以及用戶自定義的屬性。
unactive_interval:datetime.timedelta類型,超時時間,也就是說當(dāng)前時間減去update_time大于unactive_interval的話,就代表超時。
在使用這個類時,必須實(shí)現(xiàn)自己的lock,heartbeat以及l(fā)ock_else函數(shù)。
job的原型是job(**attrs),attrs就是存放在鎖記錄中的用戶自定義屬性,job可以有dict類型的返回值,這個返回值會存入鎖記錄中。
下面,看一下具體使用的例子,使用的mongodb存放分布式鎖。
import apscheduler.events
import datetime
import time
import pymongo
import sys
sys.path.append('../src/')
import mtxscheduler
sched = mtxscheduler.MutexScheduler()
mongo = pymongo.Connection(host = '127.0.0.1', port = 27017)
lock_store = mongo['lockstore']['locks']
def lock():
print 'lock()'
now = datetime.datetime.now() - datetime.timedelta(seconds = 3)
lck = lock_store.find_one({'name': 't'})
return lck
def hb(ip, now, **attrs):
print 'heartbeat()'
attrs['active_ip'] = ip
attrs['update_time'] = now
lock_store.update({'name': 't'}, {'$set': attrs}, upsert = True)
def le(lock_rec):
if lock_rec:
print 'active ip', lock_rec['active_ip']
else:
print 'lock else'
i = 0
@sched.mutex(lock = lock, heartbeat = hb, lock_else = le)
@sched.cron_schedule(second = '*')
def job(**attr):
global i
i += 1
print i
def err_listener(ev):
if ev.exception:
print sys.exc_info()
sched.add_listener(err_listener, apscheduler.events.EVENT_JOB_ERROR)
sched.start()
time.sleep(10)
這里用到了mongodb的python driver,可以通過命令安裝:
easy_install pymongo
easy_install的安裝件另一篇文章。
這個任務(wù)很簡單就是定時打印整數(shù)序列。同時在兩臺服務(wù)器上部署運(yùn)行,可以發(fā)現(xiàn)只有一臺服務(wù)器會輸出整數(shù)序列。
使用起來還是很方便的。源代碼見github,其中還有使用redis存儲鎖,已經(jīng)在鎖記錄中存放自定義信息的例子。
總結(jié)
以上是生活随笔為你收集整理的python分布式定时任务_分布式定时任务框架——python定时任务框架APScheduler扩展...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python进阶到高阶大全(强烈推荐)
- 下一篇: python 定时任务 全局变量_APS