提供一個可接入亞馬遜 Iot 平臺的客戶端,用于管理亞馬遜 MQTT 連接和影子設備。
初始化客戶端?
?Aws
class Aws(client_id,server,port,keep_alive,ssl,ssl_params)
參數:
client_id?
(str) - 客戶端唯一標識。server?
(str) - 亞馬遜 Iot 平臺服務器地址。port?
(int) - MQTT 服務端口(默認端口為 1883,TLS 默認端口 為 8883)。keep_alive?
(int) - 客戶端 keep-alive 超時值,單位:秒(默認 60 秒)。ssl?
(bool) - 是否使能 SSL/TLS(默認:False。如果設置為True,則必須設置?ssl_params?
參數)。ssl_params?
(dict) - SSL/TLS 連接參數。
示例:
>>> # Create Aws object
>>> import aws
>>> aws = aws.Aws(client_id, server, port, keep_alive=60,ssl=True,ssl_params={"cert": certificate_content,"key": private_content})
MQTT 接入?
使用?umqtt?
模塊接口來建立 AWS 云平臺連接。
?aws.connect?
connect()
該方法用于建立 AWS 平臺連接。
返回值:
None
?aws.disconnect?
disconnect()
改方法用于斷開 AWS 平臺連接。
返回值:
None
?aws.subscribe?
subscribe(topic)
該方法用于訂閱 mqtt 主題。
參數:
topic?
(str) - 訂閱主題。
返回值:
None
?aws.publish?
publish(topic, payload)
該方法用于發布消息到對應topic。
參數:
topic?
(str) - 發布主題。payload?
(dict) - 需要發送的數據。
返回值:
None
影子設備管理?
?aws.create_shadow?
create_shadow(shadow_name="", state="")
該方法用于創建影子設備。
參數:
shadow_name?
(str) - 影子設備名稱(可選)。state?
(dict) - 影子設備的初始狀態參數(可選)。
返回值:
None
?aws.update_shadow?
update_shadow(shadow_name="", state="")
該方法用于更新影子設備名稱和狀態。
參數:
shadow_name?
(str) - 影子設備名稱。state?
(dict) - 影子設備的狀態參數(可選)。
返回值:
None
?aws.get_shadow?
get_shadow(shadow_name="")
該方法用于獲取影子設備狀態。
參數:
shadow_name?
(str) - 影子設備名稱。
返回值:
當前影子設備的狀態數據。
?aws.delete_shadow?
delete_shadow(shadow_name="")
該方法用于刪除一個影子設備。
參數:
shadow_name?
(str) - 影子設備的名稱。
返回值:
None
?aws.connect_shadow?
connect_shadow(shadow_name="", topics=None)
該方法用于連接影子設備。
參數:
shadow_name?
(str) - 影子設備名稱(可選)。topics?
(list) - 關聯 影子設備的 MQTT 主題列表(可選)。
返回值:
None
?aws.set_callback?
set_callback(topic_name, callback)
參數:
-
topic_name?
(str) - MQTT 主題。 -
callback?
(function) -?topic_name?
主題對應的回調函數。 函數原型:callback_function(msg)
-
回調函數參數:
msg?
: 字典類型,接收到的消息。
-
返回值:
None
Python Example?
import usr.aws as aws
import modem
import ujson
import sim # Check if PIN verification is needed for your SIM card
import net# AWS IoT credentials
certificate_content = """
-----BEGIN CERTIFICATE----------END CERTIFICATE-----
"""private_content = """
-----BEGIN RSA PRIVATE KEY----------END RSA PRIVATE KEY-----
"""client_id = 'qpthing'
server = 'abgka7vzgjoa0-ats.iot.eu-west-3.amazonaws.com'
port = 8883def aws_callback(data):print("HELLO from 1234 topic callback")def shadow_callback_get(data):print("HELLO from get accepted callback")def shadow_callback_update(data):print("HELLO from update accepted callback")def shadow_callback_delta(data):print("HELLO from update delta callback")# Create AWS object
aws_obj = aws.Aws(client_id, server, port, keep_alive=60, ssl=True,ssl_params={"cert": certificate_content, "key": private_content})
print("Created AWS object")# Connect to AWS IoT
aws_obj.connect()
print("Connected to AWS IoT")# Subscribe and publish
aws_obj.set_callback("1234", aws_callback)
aws_obj.subscribe("1234")
aws_obj.publish("7777", "Hello from QuecPython")
aws_obj.start()# Shadow operations
aws_obj.create_shadow()
aws_obj.connect_shadow()
aws_obj.set_callback("$aws/things/qpthing/shadow/get/accepted", shadow_callback_get)
aws_obj.set_callback("$aws/things/qpthing/shadow/update/accepted", shadow_callback_update)
aws_obj.set_callback("$aws/things/qpthing/shadow/update/delta", shadow_callback_delta)
aws_obj.get_shadow()
aws_obj.update_shadow(state={"state": {"reported": {"wel