日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

码农技术炒股之路——任务管理器

發布時間:2023/11/27 生活经验 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 码农技术炒股之路——任务管理器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

? ? ? ? 系統任務和普通任務都是通過任務管理器調度的。它們的區別是:系統任務在程序運行后即不會被修改,而普通任務則會被修改。(轉載請指明出于breaksoftware的csdn博客)

? ? ? ? 為什么要有這樣的設計?因為我希望它是一個可以不用停止服務就可以更新相關配置的系統。比如我們現在要加一個普通任務,我們只要修改下普通任務配置文件即可。再比如我們需要修改數據庫中表結構,我們也不用停止服務修改代碼來保證數據格式的一致性。

? ? ? ? 我們程序需要知道配置文件是否被修改。如何去做?一種方法是借用一些系統方法監聽相應配置文件的修改,一旦文件有變化,馬上通知我們的主程序去處理。另一種則是采用輪詢檢查機制,即定期去生成差異結果。為了不讓這個系統更加復雜,我選擇后者。而它就是我所謂的系統任務。

? ? ? ? 不管是系統任務還是普通任務,實現的類都要繼承于job_base

from abc import ABCMeta,abstractmethod
class job_base:__metaclass__ = ABCMeta@abstractmethoddef run(self):pass

? ? ? ? 有這個限制主要是為了保證每個任務都是run方法。調度框架將執行該方法以完成任務執行。

? ? ? ? 在《碼農技術炒股之路——架構和設計》一文中,介紹了我們將基于APScheduler實現任務調度功能。首先我們需要啟動BackgroundScheduler對象

from apscheduler.schedulers.background import BackgroundScheduler@singleton
class job_center():def __init__(self):self._sched = Noneself._job_conf_path = ""self._job_id_handle = {}self._static_job_id_handle = {}def start(self):self._sched = BackgroundScheduler()self._sched.start()

? ? ? ? 當我們需要加入任務時,則調用下面這個方法

    def add_jobs(self, jobs_info, is_static = False):if None == self._sched:LOG_WARNING("job center must call start() first")returnfor (job_name,job_info) in jobs_info.items():if is_static and job_name in self._static_job_id_handle.keys():continuejob_type = job_info["type"]class_name = job_info["class"]job_handle = self._get_obj(class_name)if is_static:self._static_job_id_handle[job_name] = job_handleelse:self._job_id_handle[job_name] = job_handlecmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"params = self._join_params(job_info)if 0 != len(params):cmd += " , "cmd += paramscmd += ")"#print cmdeval(cmd)

? ? ? ? jobs_info保存的是任務配置文件中任務信息,我們看個樣例

[update_share_base_info]
type=cron
class=update_stock_base_info
day_of_week=1-5
hour=9
minute=30
second=10
timezone = Asia/Shanghai

? ? ? ? 這個配置中除了type和class,其他都是APScheduler框架中add_job方法中的參數。上面配置意思是:以上海時間,從周一到周五,早上9點30分10秒執行一次。

? ? ? ??add_jobs中通過class字段,獲取該class對應的一個對象

    def _get_obj(self, _cls_name):  _packet_name = _cls_name  _module_home = __import__(_packet_name,globals(),locals(),[_cls_name])obj =  getattr(_module_home,_cls_name)  class_obj = obj()return class_obj

? ? ? ? 這兒又要提到我之前特別強調過的單例使用方式。經過測試,《碼農技術炒股之路——配置管理器、日志管理器》中單例的實現可以保證上面這個方法獲取的是同一個對象,而網上其他單例模式則不行。
? ? ? ? 獲取對象后,我們要組裝出要執行的命令。cmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"中job_handle就是上面獲取的對象,而run則是每個job都要有的方法。這也是為什么要求每個任務類都要繼承于job_base的原因。

? ? ? ? 之后調用_join_params將配置文件中其他信息組裝成參數拼接出完整命令

    def _join_params(self, job_info):params = ""param = ""job_type = job_info["type"]for key in job_info.keys():if key in conf_keys.job_conf_info_dict[job_type]:if  0 != len(params):params += ' , 'value = job_info[key]if value.isdigit():param = key + " = " + valueelse:param = key + " = '" + value + "'"if 0 != len(param):params += paramreturn params

? ? ? ? 如此我們配置中的任務就會被加入到APScheduler調度隊列中。

? ? ? ? 我們再看下如何刪除一個任務

    def remove_jobs(self, jobs_info):if None == self._sched:LOG_WARNING("job center must call start() first")returnfor job_name in jobs_info.keys():self._sched.remove_job(job_name)self._job_id_handle.pop(job_name)

? ? ? ? 第7行通過任務名稱在APScheduler中把任務刪除。第8行將任務對應的對象從列表中刪除。為什么要使用_job_id_handle去保存這些任務對象呢?因為如果不在一個更大的生命周期內保存它,它就會被認為是一個局部變量,從而被釋放,導致之后APScheduler再也調用不了它。
? ? ? ? 我們看個管理普通任務的系統任務代碼

@singleton
class j_load_job_conf(job_base):def __init__(self):self._pre_jobs_info = {}self._frame_conf_inst = scheduler_frame_conf_inst()self._job_center = job_center()def run(self):section_name = "strategy_job"option_name = "conf_path"if False == self._frame_conf_inst.has_option(section_name, option_name):LOG_WARNING("no %s %s" % (section_name, option_name))returnconf_path = self._frame_conf_inst.get(section_name, option_name)LOG_DEBUG("Load %s %s %s" % (section_name, option_name, conf_path))job_conf_parser_obj = job_conf_parser()jobs_info = job_conf_parser_obj.parse(conf_path)self._execute_jobs(jobs_info)def _execute_jobs(self, jobs_info):add_dict = {}remove_dict = {}modify_dict = {}frame_tools.dict_diff(jobs_info, self._pre_jobs_info, add_dict, remove_dict, modify_dict)add_jobs_info = dict(add_dict, **modify_dict)remove_jobs_info = {}for item in modify_dict.keys():remove_jobs_info[item] = self._pre_jobs_info[item]LOG_INFO("add jobs %s" % (json.dumps(add_jobs_info)))LOG_INFO("remove jobs %s" % (json.dumps(remove_jobs_info)))if 0 == len(add_jobs_info) and 0 == len(remove_jobs_info):returnself._pre_jobs_info = jobs_infoself._job_center.remove_jobs(remove_jobs_info)self._job_center.add_jobs(add_jobs_info)

? ? ? ? run方法將會定期執行。它會從固定目錄讀取普通任務配置文件信息。然后在_execute_jobs方法中,通過和上一次讀取的任務信息對比,生成三個字典:需要刪除的任務、需要新增的任務和需要修改的任務。需要修改的任務將變成先刪除后新增的方式實現修改。所以最后操作的將是兩個字段信息。
? ? ? ? 普通任務本文就不介紹了,之后介紹的每個抓取和離線計算業務都是普通任務。

總結

以上是生活随笔為你收集整理的码农技术炒股之路——任务管理器的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。