celery 官方文檔英文版:http://docs.celeryproject.org/en/latest/index.html
組件
任務(wù)Tasks:用戶(hù)需要實(shí)現的功能。分為異步任務(wù)和定時(shí)任務(wù)。
中間人Broker:任務(wù)隊列,存放任務(wù)的地方,worker執行單元獲取任務(wù)的地方。中間人采用Redis或者是RabbitMQ。
執行者Worker:監聽(tīng)任務(wù)隊列里面是否有任務(wù),有就處理任務(wù)
存儲Backend:把執行Tasks返回的結果進(jìn)行存儲
異步任務(wù):將耗時(shí)操作任務(wù)提交給Celery去異步執行,比如發(fā)送短信/郵件、消息推送、音視頻處理等
定時(shí)任務(wù):定時(shí)執行某件事情
導入模塊
pip install celery配置django
在項目根目錄下面創(chuàng )建一個(gè)包celery_tasks
在創(chuàng )建的celery_tasks包里面創(chuàng )建一個(gè)config.py用來(lái)專(zhuān)門(mén)配置我們需要的信息或者是在settings里面配置,然后在調用。使用redis來(lái)作為中間人
broker_url = "redis://127.0.0.1/15" # 任務(wù)隊列存儲的地方 result_backend = "redis://127.0.0.1/14" # 任務(wù)執行結果存儲的地方在 celery_tasks 包里面創(chuàng )建一個(gè)文件夾用來(lái)放需要執行的任務(wù):里面放任務(wù)的處理流程
在celery_tasks包里面創(chuàng )建一個(gè)main.py文件,用來(lái)作為celery的client啟動(dòng)文件
import osfrom celery import Celery# 為celery使用django配置文件進(jìn)行設置if not os.getenv('DJANGO_SETTINGS_MODULE'): os.environ['DJANGO_SETTINGS_MODULE'] = '項目名.settings'# 創(chuàng )建celery實(shí)例app = Celery('任務(wù)名') # 自己創(chuàng )建一個(gè)# 導入celery配置app.config_from_object('celery_tasks.config')# 自動(dòng)注冊celery任務(wù)app.autodiscover_tasks(['celery_tasks.任務(wù)文件名'])使用 celery 開(kāi)啟服務(wù)
celery -A celery_tasks.main worker -l info # -A 選項指定 celery 實(shí)例 app 的位置 # -l 選項指定日志級別, -l 是 --loglevel 的縮略形式# -*- coding: utf-8 -*-from celery import Celery# 為celery使用django配置文件進(jìn)行設置import osif not os.getenv('DJANGO_SETTINGS_MODULE'): os.environ['DJANGO_SETTINGS_MODULE'] = 'alyBlog.settings'# 創(chuàng )建celery應用/實(shí)例app = Celery('aly-blog')# 導入celery配置app.config_from_object('celery_tasks.config')# 自動(dòng)注冊celery任務(wù)app.autodiscover_tasks(['celery_tasks.sms'])# -*- coding: utf-8 -*-broker_url = "redis://127.0.0.1/15"result_backend = "redis://127.0.0.1/14"import loggingfrom celery_tasks.main import appfrom utils.yuntongxun.sms import CCPlogger = logging.getLogger("django")@app.task(name='send_sms_code')def send_sms_code(mobile, sms_num, expires, temp_id): """ 發(fā)送短信驗證碼 :param mobile: 手機號 :param sms_num: 驗證碼 :param expires: 有效期 :return: None """ try: result = CCP().send_Template_sms(mobile, [sms_num, expires], temp_id) except Exception as e: logger.error("發(fā)送驗證碼短信[異常][ mobile: %s, message: %s ]" % (mobile, e)) else: if result == 0: logger.info("發(fā)送驗證碼短信[正常][ mobile: %s sms_code: %s]" % (mobile, sms_num)) else: logger.warning("發(fā)送驗證碼短信[失敗][ mobile: %s ]" % mobile)from celery_tasks.sms.tasks import send_sms_codeclass SmsCodeView(View): """ # 1. 創(chuàng )建一個(gè)SmsCodeView類(lèi) param: mobile、image_text、image_code_id """ # 2. 創(chuàng )建一個(gè)post方法用來(lái)處理邏輯 def post(self, request): # 3. 獲取前端傳來(lái)的數據 json_data = request.body # 4. 將數據轉化為字典 dict_data = json.loads(json_data) # 5. 將數據傳遞給SmsCodeForm表單進(jìn)行校驗 form = SmsCodeForm(data=dict_data) # 6. 校驗成功處理方式 if form.is_valid(): # 7. 獲取校驗后的數據 mobile = form.cleaned_data.get("mobile") # 8. 生成短信驗證碼 sms_text = "%06d" % random.randint(0, 999999) # 9. 將短信驗證碼和和過(guò)期時(shí)間保存到redis中 redis_obj = get_redis_connection("verify_code") sms_text_key = "sms_code_{}".format(mobile).encode("utf8") sms_repeat_key = "sms_sixty_{}".format(mobile).encode("utf8") redis_obj.setex(sms_text_key, contains.SMS_CODE_EXPIRE_TIME, sms_text) # key, expire_time, value redis_obj.setex(sms_repeat_key, contains.SMS_CODE_EXPIRE_TIME, contains.SMS_REPEAT_EXPIRE_TIME) # logger.info("發(fā)送短信正常[mobile:%s sms_num:%s]" % (mobile, sms_text)) # 調試代碼時(shí)候用,在控制臺顯示 # print(sms_text) # return to_json_data(errmsg="短信發(fā)送成功") # 短信調試 # # 9. 使用用通訊插件發(fā)送短信 # try: # result = CCP().send_Template_sms(mobile, [sms_text, contains.SMS_CCP_EXPIRE_TIME], contains.SMS_TEMPLATE) # except Exception as e: # logger.error("短信發(fā)送異常[mobile:{},error:{}]".format(mobile, e)) # return to_json_data(errno=Code.SMSERROR, errmsg=error_map[Code.SMSERROR]) # 短信發(fā)送異常 # else: # if result == 0: # 發(fā)送成功 # logger.info("短信發(fā)送成功[mobile:{},sms_code:{}]".format(mobile, sms_text)) # return to_json_data(errmsg="短信發(fā)送正常") # else: # 發(fā)送失敗 # logger.warning("短信發(fā)送失敗[mobile:{}]".format(mobile)) # return to_json_data(errno=Code.SMSFAIL, errmsg=error_map[Code.SMSFAIL]) # 使用celery異步處理短信發(fā)動(dòng)任務(wù) 修改的就這里一處 expires = 300 send_sms_code.delay(mobile,sms_text,expires,contains.SMS_TEMPLATE) return to_json_data(errno=Code.OK,errmsg="短信驗證碼發(fā)送成功") # 校驗未通過(guò) else: err_msg_list = [] for item in form.errors.values(): err_msg_list.append(item[0]) err_info = '/'.join(err_msg_list) return to_json_data(errno=Code.PARAMERR, errmsg=err_info)這里采用網(wǎng)易云163郵箱進(jìn)行發(fā)送
main任務(wù)執行方法和config配置方法不變
# 配置郵箱EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'EMAIL_USE_SSL = TrueEMAIL_HOST = 'smtp.163.com' # SMTP服務(wù)器: smtp.163.comEMAIL_PORT = 465 # 端口EMAIL_FROM = '郵箱驗證<優(yōu)先>' # 收件人看到的發(fā)件人# -*- coding: utf-8 -*-"""@Time : 2020/3/23 21:21@Author : 半紙梁@File : tasks.py"""from celery_tasks.main import appfrom django.core.mail import send_mailfrom alyBlog import settingsfrom itsdangerous import TimedJSONWebSignatureSerializer as Serializerfrom utils.user_config import user_config@app.task(name="send_verify_email")def send_verify_email(username, to_email, rank): # send_mail(subject, message, from_email, recipient_list, # fail_silently=False, auth_user=None, auth_password=None, # connection=None, html_message=None): auth_user = user_config.EMAIL_HOST_USER auth_password = user_config.EMAIL_HOST_PASSWORD # 創(chuàng )建一個(gè)JWT的對象,以SECRET_KEY為加密的參數,過(guò)期時(shí)間為1h serializer_obj = Serializer(secret_key=settings.SECRET_KEY, expires_in=3600) user_info = {"username": username} # 加密的數據dict user_info = serializer_obj.dumps(user_info) # 加密 token = user_info.decode() # 轉碼為字符串 subject = '新用戶(hù)激活郵箱驗證' message = '測試信息' from_email = settings.EMAIL_FROM recipient_list = [to_email] host = settings.SERVER_DOMAIN html_message = '<h1>歡迎成為博客的第{}位讀者</h1>請點(diǎn)擊下面鏈接激活您的賬戶(hù):<br/> <a href="{}/users/active/{}">{}/users/active/{}</a>'.format(rank, host, token, host, token) send_mail(subject, message, from_email, recipient_list, auth_user=auth_user, auth_password=auth_password, html_message=html_message, fail_silently=False)class EmailVerifyView(View): """ 用戶(hù)郵箱驗證 """ def get(self,request,token): serializer_obj = Serializer(settings.SECRET_KEY,3600) try: username = serializer_obj.loads(token) except SignatureExpired: return HttpResponse("鏈接已過(guò)期") username = username.decode() user = models.Users.objects.only("is_active").filter(username=username).first() user.is_active = True user.save(update_fields=["is_active"]) return render(request,'users/login.html')def test(request): """測試email激活使用""" username = "xxx" to_email = "接收者郵箱" send_verify_email.delay(username, to_email, 1) return HttpResponse("郵件已發(fā)送")# -*- coding: utf-8 -*-"""@Time : 2020/3/23 18:00@Author : 半紙梁@File : tasks.py"""from celery_tasks.main import appfrom utils.fast.fdfs import clientfrom utils.upload_image.bd_upload_image import BdUploadImage@app.task(name='upload_server_images')def upload_server_images(data, file_ext_name): """ 圖片上傳到服務(wù)器 :param data: 圖片二進(jìn)制數據 :param file_ext_name: 圖片后綴名 :return: """ client.upload_by_buffer(data, file_ext_name)@app.task(name="upload_bos_images")def upload_bos_images(bos_host, ak, sk, bucket, image_name, data): """ :param bos_host: bos域名 :param ak: 百度云access_key_id :param sk: 百度云secret_access_key :param bucket: 百度云總圖片存儲庫名 :param image_name: 文件名 :param data: 圖片二進(jìn)制數據 :return: """ upload_image = BdUploadImage(bos_host, ak, sk) res = upload_image.upload(bucket, image_name, data) return resbd_upload_image.py
百度云上傳圖片封裝函數,有疑問(wèn)可以查看百度云BOS存儲圖片SDK
# -*- coding: utf-8 -*-"""@Time : 2020/3/17 17:29@Author : 半紙梁@File : bd_upload_image.py"""from baidubce.services.bos.bos_client import BosClientfrom baidubce.auth.bce_credentials import BceCredentialsfrom baidubce.bce_client_configuration import BceClientConfigurationfrom user_config import user_configclass BdUploadImage(object): """ 百度云BOS存儲圖片插件 """ def __init__(self, bos_host, access_key_id, secret_access_key): self.bos_host = bos_host self.config = BceClientConfiguration(credentials=BceCredentials(access_key_id, secret_access_key),endpoint=bos_host) self.client = BosClient(config=self.config) def upload(self, bucket, key, data): """ 圖片存儲庫名、文件名、圖片二進(jìn)制數據 :param bucket:圖片存儲庫名 :param key:文件名 :param data:圖片二進(jìn)制數據 :return: """ res = self.client.append_object_from_string(bucket_name=bucket, key=key, data=data) return resif __name__ == '__main__': """測試內容""" bos_host = 'https://bj.bcebos.com' image_name = "測試" bucket = "banzhiliang" with open("2018.png", "rb") as f: data = f.read() upload_image = BdUploadImage(bos_host, user_config.access_key_id, user_config.secret_access_key) upload_image.upload(bucket, image_name, data)
聯(lián)系客服