Getting Started
Here is a very simple example that subscribes to the broker $SYS topic tree and prints out the resulting messages:
import paho.mqtt.client as mqtt # The callback for when the client receives a CONNACK response from the server. def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe("$SYS/#") # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect("iot.eclipse.org", 1883, 60) # Blocking call that processes network traffic, dispatches callbacks and # handles reconnecting. # Other loop*() functions are available that give a threaded interface and a # manual interface. client.loop_forever()
Client
你可以把Client 类当做一个实例/类/子类 来用,一般用法如下:
创建一个client 实例
用connect*()函数来连接中间人
调用一个loop*()函数来保持与中间人的网络通信
用subscribe()来订阅主题并接收信息
用publish()来发布消息给中间人
用disconnect()来断开与中间人的连接
回调函数可以使得应用及时的处理事件。回调函数描述如下。
Constructor / reinitialise
Client()
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311)
The Client() constructor takes the following arguments:
client_id : 唯一的client id 字符串,当连接中间人时使用。如果client_id 长度为0,则会自动随机产生一个client_id。这种情况下 clean_session 参数必须为 True.
clean_session : 决定client 类型的bool 类型 参数。如果为True,当client 失去连接时中间人将删除所有关于这个client 的信息。 如果为 False,则说明这是一个持久的client,当失去连接时发布信息和队列中的消息将会被保持。注意,当失去连接时client 永远不会抛弃自己需要发出的信息,当调用connect()/reconnect() 后将会重新发送这些信息。 调用 reinitialise() 重置client到它初始化时的状态。
userdata:用户定义的任何类型的数据将被传输为 userdata 参数到回调函数。后面可以使用user_data_set()函数来更新这个参数。
protocol:client 使用的mqtt 版本。可以是 MQTTv31 或 MQTTv311
Example
import paho.mqtt.client as mqtt mqttc = mqtt.Client()
reinitialise()
reinitialise(client_id="", clean_session=True, userdata=None)
如果client 已经被创建了,reinitialise() 函数重置 client 到它起始的状态. 它与Client()构造函数有相同的参数。
Example
mqttc.reinitialise()
Option functions
这些函数可以设置client 以改变它的行为。大多数情况下必须在连接中间人之前调用。
max_inflight_messages_set()
max_inflight_messages_set(self, inflight)
在QoS>0的情况下,设置最大数量的消息 ,that can be part way through their network flow at once.
默认是20。增加这个值将会增加内存的开销但是可以提高吞吐量。
message_retry_set()
message_retry_set(retry)
设置重试时间,当中间人没有相应且QoS>0 时需要重试.
默认是5s,一般不需要修改。
Connect / reconnect / disconnect
connect()
connect(host, port=1883, keepalive=60, bind_address="")
connect() 函数使得client 连接到中间人(broker)。这是一个阻塞的函数,它有以下参数:
host:远程中间人的主机名或IP
port:服务端的端口。 默认是 1883。
keepalive:与中间人交流允许的最大时间段。如果没有信息交流,这控制client发送ping 消息到中间人的频率。
bind_address:本地网络IP
Callback
当client 从中间人接收到CONNACK信息,将触发on_connect()回调函数。
Example
mqttc.connect("iot.eclipse.org")
connect_async()
connect_async(host, port=1883, keepalive=60, bind_address="")
与 connect()相同,但是非阻塞。连接不会完成直到一个loop*()函数被调用。
reconnect()
reconnect()
用先前的信息重新连接中间人,在此之前必须调用过connect*()函数。
Callback
当client 从中间人接收到CONNACK信息,将触发on_connect()回调函数。
disconnect()
disconnect()
从中间人失去连接,中间人会抛弃该client 的信息。调用disconnect()将会导致将要被发送的消息丢失。
Callback
当client发送了disconnect 消息,它导致on_disconnect()被调用。
Network loop
这些函数是client 背后的驱动力量。如果它们没有被调用,进入网路的数据将不会被处理,走出网络的数据也不会被及时的发送。 实现网络循环有4种方法,这里介绍3种
loop()
loop(timeout=1.0, max_packets=1)
通常会调用这个函数来处理网络事件。 这个调用利用select()等待直到网络sokect 可被读或者写,然后处理进来或者出去的数据。这个函数最多阻塞timeout 秒,timeout 不能超过client 的 keepalive 值,否则你的client 将会定期的被中间人disconnected。
max_packets 参数已经被废弃,不用设置。
Example
run = True while run: mqttc.loop()
loop_start() / loop_stop()
loop_start() loop_stop(force=False)
这些函数为网络循环实现了线程化的接口。在connect*()之前或之后调用一次 loop_start(),后台会启动一个线程用来自动调用loop()。这解放了主线程,主线程可以做其它需要阻塞的事情。这也会处理对中间人的重连接(reconnect)。调用loop_stop()来停止后台线程。force 参数现在已经被忽略了。
Example
mqttc.connect("iot.eclipse.org") mqttc.loop_start() while True: temperature = sensor.blocking_read() mqttc.publish("paho/temperature", temperature)
loop_forever()
loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)
这是一个阻塞形式的网络循环,直到client调用disconnect()才会返回。自动处理重连接。
Publishing
从client 给中间人发送消息。
publish()
publish(topic, payload=None, qos=0, retain=False)
这会使得一个消息被发送到中间人,随后从中间人到任何订阅了这个主题的client 。参数如下:
topic:主题,这个消息被发送到的主题。
payload:实际被发送的消息。如果未给出该参数,或者设置为None,那么系统会自动定义为空字符串。如果给出的是int或float,则会被转成字符串。如果你希望发送一个int/float,用struct.pack()来创建这个参数。
qos:质量,服务级别
retain:如果设置为True,这个消息将被设置为这个主题的‘last known good’ 消息。
返回一个元组(result,mid),result == MQTT_ERR_SUCCESS 表示成功,result == MQTT_ERR_NO_CONN 表示client 当前未连接。mid 是publish 请求的消息ID,mid 的值可以被用来追踪publish请求。
如果主题为空,或者qos not in (0,1,2),或者payload 的长度大于268435455字节,则会触发ValueError错误。
Callback
当消息已经被发送给中间人,on_publish()回调将会被触发。
Subscribe / Unsubscribe
subscribe()
subscribe(topic, qos=0)
client 订阅一个或多个主题(topic)
Simple string and integer
e.g. subscribe("my/topic", 2)
topic:指定需要订阅的主题
qos:订阅要求的服务级别,默认为0。
String and integer tuple
e.g. subscribe(("my/topic", 1))
topic:一个(topic,qos)元组,都必须提供。
qos:未被使用。
List of string and integer tuples
e.g. subscribe([("my/topic", 0), ("another/topic", 2)])
这允许一个订阅命令可以订阅多个主题。
这个函数返回元组(result,mid),result == MQTT_ERR_SUCCESS 表示成功,result == MQTT_ERR_NO_CONN 表示client 当前未连接。mid 是publish 请求的消息ID,mid 的值可以被用来追踪publish请求。
Callback
当中间人识别了这个订阅,on_subscribe()回调将被调用。
unsubscribe()
unsubscribe(topic)
取消client 对一个或多个主题的订阅。
topic:可以是字符串,也可以是字符串的列表,表示需要取消订阅的主题。
返回值是元组(result,mid)。
Callback
当中间人识别了unsubscribe,则on_unsubscibe()回调将会被触发。
Callbacks
on_connect()
on_connect(client, userdata, flags, rc)
当中间人回应我们的连接请求时被调用。
client:这个回调的client 实例。
userdata:私人数据,设置在Client()或userdata_set()
flags:中间人发送的返回标志。
rc:连接结果。
flags 是一个中间人返回的包含返回标志的字典:
-
flags[‘session present’] - 这个标志仅对clean session设置为0的client 有效,如果一个client 设置了clean session = 0,当重新连接中间人时,这个标志表示中间人是否任然有这个client 的session 信息。如果是1,则session 信息依然存在。
rc 的值表示成功与否:
0: 连接成功1: 连接被拒绝- 不正确的协议版本2: 连接被拒绝 - 非法的client 标志3: 连接被拒绝 - 服务端不可用4: 连接被拒绝 - 错误的用户名或密码5: 连接被拒绝 - 未被授权
Example
def on_connect(client, userdata, flags, rc): print("Connection returned result: "+connack_string(rc)) mqttc.on_connect = on_connect ...
on_disconnect()
on_disconnect(client, userdata, rc)
当client 失去连接时被调用。
client:回调函数的实例。
userdata:私人数据,设置在Client()或userdata_set()
rc:失去连接的结果。
rc 参数表示失去连接的状态。如果是MQTT_ERR_SUCCESS(0),这该回调函数是为了响应中间人的disconnect(),如果是其它值,则说明是其它事件引起的失去连接,比如网络问题。(意思是,该参数说明了是谁触发了该回调函数,即谁导致了失去连接)
Example
def on_disconnect(client, userdata, rc): if rc != 0: print("Unexpected disconnection.") mqttc.on_disconnect = on_disconnect ...
on_message()
on_message(client, userdata, message)
当有信息被一个订阅的主题接收,则会触发该回调函数。message_callback_add() 可以为指定的主题定义多个回调函数。
client:这个回调函数的client 实例。
userdata:私人数据,设置在Client()或userdata_set()
message:一个MQTTMessage实例。MQTTMessage类有topic,payload,qos,retain成员变量。
Example
def on_message(client, userdata, message): print("Received message '" + str(message.payload) + "' on topic '" + message.topic + "' with QoS " + str(message.qos)) mqttc.on_message = on_message ...
message_callback_add()
这个函数允许你为指定的订阅定义回调函数来处理消息,可以包含通配符。这允许你订阅诸如sensors/#,并且有一个回调函数来处理sensors/temperature 和另一个回调函数来处理sensors/humidity
message_callback_add(sub, callback)
sub:订阅过滤器,用来匹配回调。对每个sub 仅可以定义一个回调函数
callback:回调函数,月on_message()回调函数形式相同。
如果同时使用message_callback_add()和on_message() ,只有当message_callback_add()中没有匹配到时才会传给on_message()回调函数。
message_callback_remove()
Remove a topic/subscription specific callback previously registered using message_callback_add().
message_callback_remove(sub)
sub
the subscription filter to remove
on_publish()
on_publish(client, userdata, mid)
当一个信息被publish()发送给中间人后,会调用on_publish()。对于QoS为1和2的消息(message),这意味着适当的握手都已经完成了。对于QoS为0的消息,这仅仅意味着消息离开了client。mid 变量匹配publish()返回的mid变量,使得出去的消息可以被追踪。
这个回调函数很重要,因为即使publish()成功返回,也不一定总是表示消息被成功发送。
on_subscribe()
on_subscribe(client, userdata, mid, granted_qos)
当中间人响应订阅请求时调用。mid变量匹配subscribe()调用的mid 变量。granted_qos 变量是一个整数列表,为每个中间人授予的不同的订阅请求给出QoS级别。
on_unsubscribe()
on_unsubscribe(client, userdata, mid)
当中间人响应一个取消订阅(unsubscribe)的请求时调用。mid 变量匹配unsubscribe()的mid变量。
原文档:https://pypi.python.org/pypi/paho-mqtt