需求描述
給mqtt的topic發送信息
對應的topic接收請求后,執行發送短信指令
信息內容
SMS,10086,102
lua的mqtt里面,截取判斷即可
–>可以實現
=====
定時任務,
每月開機一次
發短信?
或者使用開機通知?
定時消費方式,
定時執行
cron表達式,每隔2個月執行一次
cron定時表達式–每兩月執行一次_corn 每月執行一次-CSDN博客
https://blog.csdn.net/weixin_44234800/article/details/129949900
0 0 2 1 /2 ?
0 0 1 1/2 * ?
# -*- coding: utf-8 -*-
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
from datetime import datetime
# topic='led001'
# 用戶私鑰
uid='7801e4ba1bf7406593d47250797860fd'
# 定義中文日期時間格式
chinese_format = "%Y年%m月%d日 %H時%M分%S秒"
# 存放結果信息
msg_success =datetime.now().month+'月份消費情況\n生成時間:'+datetime.now().strftime(chinese_format)+'\n========\n'msg_failed=''
phone_list=[13123456701,13123456702,13123456703,13123456704]
# 根據手機號獲取發送的短信內容
def getSmsContent(phone_num):# 獲取當前日期和時間now = datetime.now()# 格式化日期和時間formatted_date = now.strftime("%Y-%m-%d %H:%M:%S")print("當前日期和時間:", formatted_date)sms_content= '{}\t{}\t執行消費'.format(formatted_date,phone_num)print("sms_content:",sms_content)return sms_content
# 請求mqtt
def post_mqtt(phone_num):content= getSmsContent(phone_num)msg= 'SMS,{},{}'.format(phone_num,content)print("msg:",msg)url ='https://apis.bemfa.com/va/postJsonMsg'# print("url:",url)json_text ={"uid": uid,"topic": topic,"type": 1,"msg": msg}headers = {'Content-Type': 'application/json;charset=utf-8'}response_str = requests.post(url, json.dumps(json_text), headers=headers)print('response_str:',response_str)response_dict = json.loads(response_str) if response_dict['code'] == 0 and response_dict['message'] == 'ok':print("請求成功")msg_success+ phone_num+ '\t發短信成功\t消費成功\n========\n'else:print("請求失敗")msg_failed+phone_num+ '\t發短信失敗\t消費失敗\n========\n'# 遍歷list取出號碼
def do_work():for index, item in enumerate(phone_list):print(f"Index: {index}, Item: {item}")post_mqtt(item)# 等待20stime.sleep(20)print("==============")if __name__ == '__main__':print("111111")# 打印返回結果# print("result:\n",result)do_work()