0.配置文件
BROKER_URL = "redis://127.0.0.1:6379/1"
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
1.啟動文件
from celery import Celery
from ihome. tasks import config
celery_app = Celery( "ihome" )
celery_app. config_from_object( config)
celery_app. autodiscover_tasks( [ "ihome.tasks.sms" ] )
2.發送短信輔助類
from CCPRestSDK import REST
accountSid = '8aaf0708568d4143015697b0f4960888'
accountToken = '42d3191f0e6745d6a9ddc6c795da0bed'
appId = '8aaf0708568d4143015697b0f56e088f'
serverIP = 'app.cloopen.com'
serverPort = '8883'
softVersion = '2013-12-26' class CCP ( object ) : """自己封裝的發送短信的輔助類""" instance = None def __new__ ( cls) : if cls. instance is None : obj = super ( CCP, cls) . __new__( cls) obj. rest = REST( serverIP, serverPort, softVersion) obj. rest. setAccount( accountSid, accountToken) obj. rest. setAppId( appId) cls. instance = objreturn cls. instancedef send_template_sms ( self, to, datas, temp_id) : "" "" "" result = self. rest. sendTemplateSMS( to, datas, temp_id) status_code = result. get( "statusCode" ) if status_code == "000000" : return 0 else : return - 1
3.發送短信后端邏輯
@api. route( "/sms_codes/<re(r'1[34578]\d{9}'):mobile>" )
def get_sms_code ( mobile) : """獲取短信驗證碼""" image_code = request. args. get( "image_code" ) image_code_id = request. args. get( "image_code_id" ) if not all ( [ image_code_id, image_code] ) : return jsonify( errno= RET. PARAMERR, errmsg= "參數不完整" ) try : real_image_code = redis_store. get( "image_code_%s" % image_code_id) except Exception as e: current_app. logger. error( e) return jsonify( errno= RET. DBERR, errmsg= "redis數據庫異常" ) if real_image_code is None : return jsonify( errno= RET. NODATA, errmsg= "圖片驗證碼失效" ) try : redis_store. delete( "image_code_%s" % image_code_id) except Exception as e: current_app. logger. error( e) if real_image_code. lower( ) != image_code. lower( ) : return jsonify( errno= RET. DATAERR, errmsg= "圖片驗證碼錯誤" ) try : send_flag = redis_store. get( "send_sms_code_%s" % mobile) except Exception as e: current_app. logger. error( e) else : if send_flag is not None : return jsonify( errno= RET. REQERR, errmsg= "請求過于頻繁,請60秒后重試" ) try : user = User. query. filter_by( mobile= mobile) . first( ) except Exception as e: current_app. logger. error( e) else : if user is not None : return jsonify( errno= RET. DATAEXIST, errmsg= "手機號已存在" ) sms_code = "%06d" % random. randint( 0 , 999999 ) try : redis_store. setex( "sms_code_%s" % mobile, constants. SMS_CODE_REDIS_EXPIRES, sms_code) redis_store. setex( "send_sms_code_%s" % mobile, constants. SEND_SMS_CODE_INTERVAL, 1 ) except Exception as e: current_app. logger. error( e) return jsonify( errno= RET. DBERR, errmsg= "保存短信驗證碼異常" ) result_obj = send_sms. delay( mobile, [ sms_code, int ( constants. SMS_CODE_REDIS_EXPIRES/ 60 ) ] , 1 ) print ( result_obj. id ) ret = result_obj. get( ) print ( "ret=%s" % ret) return jsonify( errno= RET. OK, errmsg= "發送成功" )
4.client相關代碼
from celery import Celery
from ihome. libs. yuntongxun. sms import CCP
celery_app = Celery( "ihome" , broker= "redis://127.0.0.1:6379/1" ) @celery_app. task
def send_sms ( to, datas, temp_id) : """發送短信的異步任務""" ccp = CCP( ) ccp. send_template_sms( to, datas, temp_id)
5.worker相關代碼
from ihome. tasks. main import celery_app
from ihome. libs. yuntongxun. sms import CCP
@celery_app. task
def send_sms ( to, datas, temp_id) : """發送短信的異步任務""" ccp = CCP( ) try : result = ccp. send_template_sms( to, datas, temp_id) except Exception as e: result = - 2 return result