日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python分布式定时任务_分布式定时任务框架——python定时任务框架APScheduler扩展...

發(fā)布時間:2023/12/20 python 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python分布式定时任务_分布式定时任务框架——python定时任务框架APScheduler扩展... 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

如果將定時任務(wù)部署在一臺服務(wù)器上,那么這個定時任務(wù)就是整個系統(tǒng)的單點(diǎn),這臺服務(wù)器出現(xiàn)故障的話會影響服務(wù)。對于可以冗余的任務(wù)(重復(fù)運(yùn)行不影響服務(wù)),可以部署在多臺服務(wù)器上,讓他們同時執(zhí)行,這樣就可以很簡單的避免單點(diǎn)。但是如果任務(wù)不允許冗余,最多只能有一臺服務(wù)器執(zhí)行任務(wù),那么前面的方法顯然行不通。本篇文章就向大家介紹如何避免這種互斥任務(wù)的單點(diǎn)問題,最后再介紹一下基于APScheduler的分布式定時任務(wù)框架,這個框架是通過多個項(xiàng)目的實(shí)踐總結(jié)而成的。

對于運(yùn)行在同一臺服務(wù)器上的兩個進(jìn)程,可以通過加鎖實(shí)現(xiàn)互斥執(zhí)行,而對于運(yùn)行在多個服務(wù)器上的任務(wù)仍然可以通過用加鎖實(shí)現(xiàn)互斥,不過這個鎖是分布式鎖。這個分布式鎖并沒有那么神秘,實(shí)際上只要一個提供原子性的數(shù)據(jù)庫即可。比如,在數(shù)據(jù)庫的locks表里有一個記錄(lock record),包含屬性:

name:鎖的名字,互斥的任務(wù)需要用名字相同的鎖。

active_ip:持有鎖的服務(wù)器的ip。

update_time:上次持有鎖的時間,其他非活躍的服務(wù)器通過這個屬性判斷活躍的服務(wù)器是否超時,如果超時,則會爭奪鎖。

一個持有鎖的服務(wù)器通過不斷的發(fā)送心跳,來更新這個記錄,心跳的內(nèi)容就是持有鎖的時間戳(update_time),以及本機(jī)ip。也就是說,通過發(fā)送心跳來保證當(dāng)前的服務(wù)器是活躍的,而其他服務(wù)器通過lock record中的update_time來判斷當(dāng)前活躍的服務(wù)器是否超時,一旦超時,其他的服務(wù)器就會去爭奪鎖,接管任務(wù)的執(zhí)行,并發(fā)送心跳更新active_ip。

通過上面描述,這個框架中最重要的兩個概念就是分布式鎖和心跳。下面看一下分布式定時任務(wù)框架中是如何實(shí)現(xiàn)這兩點(diǎn)的。當(dāng)然,這個框架依賴于APScheduler,所以必須安裝這個模塊,具體APScheduler的介紹見我的另一篇文章,因?yàn)橐蕾嘇PScheduler,所以這個框架很簡單,只有一個類:

from apscheduler.scheduler import Scheduler

import datetime

import time

import socket

import struct

import fcntl

def get_ip(ifname):

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

return socket.inet_ntoa(fcntl.ioctl(

s.fileno(),

0x8915, # SIOCGIFADDR

struct.pack('256s', ifname[:15])

)[20:24])

class MutexScheduler(Scheduler):

def __init__(self, gconfig={}, **options):

Scheduler.__init__(self, gconfig, **options)

self.ip = get_ip('eth0')

def mutex(self, lock = None, heartbeat = None, lock_else = None,

unactive_interval = datetime.timedelta(seconds = 30)):

def mutex_func_gen(func):

def mtx_func():

if lock:

lock_rec = lock()

now = datetime.datetime.now()

# execute mutex job when the server is active, or the other server is timeout.

if not lock_rec or lock_rec['active_ip'] == self.ip or (lock_rec['update_time'] and now - lock_rec['update_time'] >= unactive_interval):

if lock_rec:

del lock_rec['active_ip']

del lock_rec['update_time']

if not lock_rec:

lock_rec = {}

lock_attrs = func(**lock_rec)

if not lock_attrs:

lock_attrs = {}

# send heart beat

heartbeat(self.ip, now, **lock_attrs)

else:

lock_else(lock_rec)

else:

func()

return mtx_func

self.mtx_func_gen = mutex_func_gen

def inner(func):

return func

return inner

def cron_schedule(self, **options):

def inner(func):

if hasattr(self, 'mtx_func_gen'):

func = self.mtx_func_gen(func)

func.job = self.add_cron_job(func, **options)

return func

return inner

mutex方法是核心,通過裝飾器的方式提供互斥功能。在使用時:

@sched.mutex(lock = my_lock, heartbeat = my_heartbeat)

@sched.cron_schedule(second = '*')

def my_job(**attrs):

print 'my_job ticks'

mutex裝飾器必須用在cron_schedule裝飾器之前,mutex主要是組裝job。mutex的參數(shù)有:

lock:函數(shù),用于獲取鎖記錄(lock record),函數(shù)原型:lock()。lock的返回值時dict,就是鎖記錄內(nèi)容。

heartbeat:函數(shù),用于發(fā)出心跳,函數(shù)原型:heartbeat(ip, now, **attrs)。ip是本機(jī)ip;now是當(dāng)前時間戳;attrs是一個dict,用于在鎖記錄中存放一些其他用戶自定義信息。

lock_else:函數(shù),在沒有獲得鎖時執(zhí)行,函數(shù)原型:lock_else(lock_rec)。lock_rec是鎖記錄,包含active_ip,update_time以及用戶自定義的屬性。

unactive_interval:datetime.timedelta類型,超時時間,也就是說當(dāng)前時間減去update_time大于unactive_interval的話,就代表超時。

在使用這個類時,必須實(shí)現(xiàn)自己的lock,heartbeat以及l(fā)ock_else函數(shù)。

job的原型是job(**attrs),attrs就是存放在鎖記錄中的用戶自定義屬性,job可以有dict類型的返回值,這個返回值會存入鎖記錄中。

下面,看一下具體使用的例子,使用的mongodb存放分布式鎖。

import apscheduler.events

import datetime

import time

import pymongo

import sys

sys.path.append('../src/')

import mtxscheduler

sched = mtxscheduler.MutexScheduler()

mongo = pymongo.Connection(host = '127.0.0.1', port = 27017)

lock_store = mongo['lockstore']['locks']

def lock():

print 'lock()'

now = datetime.datetime.now() - datetime.timedelta(seconds = 3)

lck = lock_store.find_one({'name': 't'})

return lck

def hb(ip, now, **attrs):

print 'heartbeat()'

attrs['active_ip'] = ip

attrs['update_time'] = now

lock_store.update({'name': 't'}, {'$set': attrs}, upsert = True)

def le(lock_rec):

if lock_rec:

print 'active ip', lock_rec['active_ip']

else:

print 'lock else'

i = 0

@sched.mutex(lock = lock, heartbeat = hb, lock_else = le)

@sched.cron_schedule(second = '*')

def job(**attr):

global i

i += 1

print i

def err_listener(ev):

if ev.exception:

print sys.exc_info()

sched.add_listener(err_listener, apscheduler.events.EVENT_JOB_ERROR)

sched.start()

time.sleep(10)

這里用到了mongodb的python driver,可以通過命令安裝:

easy_install pymongo

easy_install的安裝件另一篇文章。

這個任務(wù)很簡單就是定時打印整數(shù)序列。同時在兩臺服務(wù)器上部署運(yùn)行,可以發(fā)現(xiàn)只有一臺服務(wù)器會輸出整數(shù)序列。

使用起來還是很方便的。源代碼見github,其中還有使用redis存儲鎖,已經(jīng)在鎖記錄中存放自定義信息的例子。

總結(jié)

以上是生活随笔為你收集整理的python分布式定时任务_分布式定时任务框架——python定时任务框架APScheduler扩展...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。