Flask爱家租房--celery(发送验证短信)
生活随笔
收集整理的這篇文章主要介紹了
Flask爱家租房--celery(发送验证短信)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
0.配置文件
# coding:utf-8BROKER_URL = "redis://127.0.0.1:6379/1" CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'1.啟動文件
# coding:utf-8from celery import Celery from ihome.tasks import config# 定義celery對象 celery_app = Celery("ihome")# 引入配置信息 celery_app.config_from_object(config)# 自動搜尋異步任務 celery_app.autodiscover_tasks(["ihome.tasks.sms"])2.發送短信輔助類
# coding=utf-8from CCPRestSDK import REST# 主帳號 accountSid = '8aaf0708568d4143015697b0f4960888'# 主帳號Token accountToken = '42d3191f0e6745d6a9ddc6c795da0bed'# 應用Id appId = '8aaf0708568d4143015697b0f56e088f'# 請求地址,格式如下,不需要寫http:// serverIP = 'app.cloopen.com'# 請求端口 serverPort = '8883'# REST版本號 softVersion = '2013-12-26'# 發送模板短信# @param to 手機號碼# @param datas 內容數據 格式為列表 例如:['12','34'],如不需替換請填 ''# @param $tempId 模板Idclass CCP(object):"""自己封裝的發送短信的輔助類"""# 用來保存對象的類屬性instance = Nonedef __new__(cls):# 判斷CCP類有沒有已經創建好的對象,如果沒有,創建一個對象,并且保存# 如果有,則將保存的對象直接返回if cls.instance is None:obj = super(CCP, cls).__new__(cls)# 初始化REST SDKobj.rest = REST(serverIP, serverPort, softVersion)obj.rest.setAccount(accountSid, accountToken)obj.rest.setAppId(appId)cls.instance = objreturn cls.instancedef send_template_sms(self, to, datas, temp_id):""""""result = self.rest.sendTemplateSMS(to, datas, temp_id)# for k, v in result.iteritems():## if k == 'templateSMS':# for k, s in v.iteritems():# print '%s:%s' % (k, s)# else:# print '%s:%s' % (k, v)# smsMessageSid:ff75e0f84f05445ba08efdd0787ad7d0# dateCreated:20171125124726# statusCode:000000status_code = result.get("statusCode")if status_code == "000000":# 表示發送短信成功return 0else:# 發送失敗return -13.發送短信后端邏輯
# GET /api/v1.0/sms_codes/<mobile>?image_code=xxxx&image_code_id=xxxx @api.route("/sms_codes/<re(r'1[34578]\d{9}'):mobile>") def get_sms_code(mobile):"""獲取短信驗證碼"""# 獲取參數image_code = request.args.get("image_code")image_code_id = request.args.get("image_code_id")# 校驗參數if not all([image_code_id, image_code]):# 表示參數不完整return jsonify(errno=RET.PARAMERR, errmsg="參數不完整")# 業務邏輯處理# 從redis中取出真實的圖片驗證碼try:real_image_code = redis_store.get("image_code_%s" % image_code_id)except Exception as e:current_app.logger.error(e)return jsonify(errno=RET.DBERR, errmsg="redis數據庫異常")# 判斷圖片驗證碼是否過期if real_image_code is None:# 表示圖片驗證碼沒有或者過期return jsonify(errno=RET.NODATA, errmsg="圖片驗證碼失效")# 刪除redis中的圖片驗證碼,防止用戶使用同一個圖片驗證碼驗證多次try:redis_store.delete("image_code_%s" % image_code_id)except Exception as e:current_app.logger.error(e)# 與用戶填寫的值進行對比if real_image_code.lower() != image_code.lower():# 表示用戶填寫錯誤return jsonify(errno=RET.DATAERR, errmsg="圖片驗證碼錯誤")# 判斷對于這個手機號的操作,在60秒內有沒有之前的記錄,如果有,則認為用戶操作頻繁,不接受處理try:send_flag = redis_store.get("send_sms_code_%s" % mobile)except Exception as e:current_app.logger.error(e)else:if send_flag is not None:# 表示在60秒內之前有過發送的記錄return jsonify(errno=RET.REQERR, errmsg="請求過于頻繁,請60秒后重試")# 判斷手機號是否存在try:user = User.query.filter_by(mobile=mobile).first()except Exception as e:current_app.logger.error(e)else:if user is not None:# 表示手機號已存在return jsonify(errno=RET.DATAEXIST, errmsg="手機號已存在")# 如果手機號不存在,則生成短信驗證碼sms_code = "%06d" % random.randint(0, 999999)# 保存真實的短信驗證碼try:redis_store.setex("sms_code_%s" % mobile, constants.SMS_CODE_REDIS_EXPIRES, sms_code)# 保存發送給這個手機號的記錄,防止用戶在60s內再次出發發送短信的操作redis_store.setex("send_sms_code_%s" % mobile, constants.SEND_SMS_CODE_INTERVAL, 1)except Exception as e:current_app.logger.error(e)return jsonify(errno=RET.DBERR, errmsg="保存短信驗證碼異常")# 發送短信# 使用celery異步發送短信, delay函數調用后立即返回(非阻塞)#send_sms.delay(mobile, [sms_code, int(constants.SMS_CODE_REDIS_EXPIRES/60)], 1)# 返回異步任務的對象result_obj = send_sms.delay(mobile, [sms_code, int(constants.SMS_CODE_REDIS_EXPIRES/60)], 1)print(result_obj.id)# 通過異步任務對象的get方法獲取異步任務的結果, 默認get方法是阻塞的ret = result_obj.get()print("ret=%s" % ret)# 返回值# 發送成功return jsonify(errno=RET.OK, errmsg="發送成功")4.client相關代碼
# coding:utf-8from celery import Celery from ihome.libs.yuntongxun.sms import CCP# 定義celery對象 celery_app = Celery("ihome", broker="redis://127.0.0.1:6379/1")@celery_app.task def send_sms(to, datas, temp_id):"""發送短信的異步任務"""ccp = CCP()ccp.send_template_sms(to, datas, temp_id)# celery開啟的命令 # celery -A ihome.tasks.task_sms worker -l info5.worker相關代碼
# coding:utf-8from ihome.tasks.main import celery_app from ihome.libs.yuntongxun.sms import CCP# # @celery_app.task # def send_sms(to, datas, temp_id): # """發送短信的異步任務""" # pass@celery_app.task def send_sms(to, datas, temp_id):"""發送短信的異步任務"""ccp = CCP()try:result = ccp.send_template_sms(to, datas, temp_id)except Exception as e:result = -2return result總結
以上是生活随笔為你收集整理的Flask爱家租房--celery(发送验证短信)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 递归和非递归分别实现求n的阶乘
- 下一篇: android fmod,Android