Learnt how to publish messages to a topic in the previous article, now it's time to learn how to subscribe to a topic and process application messages.
Calling CreateSubscriptionBuilder a topic filter must be specified, all messages sent to a matching topic will be sent to the subscribing client. While the subscription would work without adding a message callback, usually we want to process application messages in a callback that we can add with WithMessageCallback:
client.CreateSubscriptionBuilder("best_mqtt/test_topic").WithMessageCallback(OnMessage).BeginSubscribe();privatevoidOnMessage(MQTTClientclient,SubscriptionTopictopic,stringtopicName,ApplicationMessagemessage){// Convert the raw payload to a stringvarpayload=Encoding.UTF8.GetString(message.Payload.Data,message.Payload.Offset,message.Payload.Count);Debug.Log($"Content-Type: '{message.ContentType}' Payload: '{payload}'");}
In this example we subscribe to a concrete topic 'best_mqtt/test_topic' without using a wildcard. Any message sent to this concrete topic the server will forward to the subscribing client.
The OnMessage callback has the following parameters:
Type
Name
Description
MQTTClient
client
The MQTTClient instance that we created the subscription with.
SubscriptionTopic
topic
A SubscriptionTopic instance that contains the original topic filter that the subscription subscribed to.
string
topicName
The topic name that matched with the topic filter. Because the topic filter can have wildecard topicName can be different from the topic filter the subscription created with.
string
message
The application message sent by the server. Among other fields, it has the Payload and ContentType fields we used in the previous article.
Do not keep a reference to the message's payload, it will be recycled after the event handler!
Because MQTT packets (subscription, application messages, etc.) are sent and processed in order, the subscription request is received by the server first and right after the application message too. The application message's topic matches with the subscription's topic filter so the server will send back the application message to our client.
Each subscription can define its supported QoS level that the server can deliver application messages with. This is the maximum level the client wants to support for that subscription, but the server can choose a lower maximum too.
In some cases we might want to know when and how successfully the subscribe operation gone. For example whether the subscription succeeded or not, or what QoS level is granted by the server. For these cases we can add an acknowledgement callback:
client.CreateSubscriptionBuilder("best_mqtt/test_topic")// ....WithAcknowledgementCallback(OnSubscriptionAcknowledged).BeginSubscribe();privatevoidOnSubscriptionAcknowledged(MQTTClientclient,SubscriptionTopictopic,SubscribeAckReasonCodesreasonCode){if(reasonCode<=SubscribeAckReasonCodes.GrantedQoS2)Debug.Log($"Successfully subscribed with topic filter '{topic.Filter.OriginalFilter}'. QoS granted by the server: {reasonCode}");elseDebug.Log($"Could not subscribe with topic filter '{topic.Filter.OriginalFilter}'! Error code: {reasonCode}");}
reasonCode is a success/error code. If it's less than or equal to SubscribeAckReasonCodes.GrantedQoS2 the client successfully subscribed with the given topic filter. Otherwise reasonCode is an error code describing why the subscription attempt failed.
It's possible to subscribe to multiple topics in one go. Using MQTTClient's CreateBulkSubscriptionBuilder, WithTopic can be used multiple times and the client will send the subscribe request in one packet. SubscribeTopicBuilder has the same options to set as the builder returned with CreateSubscriptionBuilder.
To unsubscribe from a topic filter the CreateUnsubscribePacketBuilder/CreateBulkUnsubscribePacketBuilder functions can be used, similarly as theirs subscription counterparts:
client.CreateUnsubscribePacketBuilder("best_mqtt/test_topic").WithAcknowledgementCallback((client,topicFilter,reasonCode)=>Debug.Log($"Unsubscribe request to topic filter '{topicFilter}' returned with code: {reasonCode}")).BeginUnsubscribe();
Adding the above code to the OnMessage callback to unsubscribe from the topic after a message is received produces the following output:
usingSystem;usingSystem.Text;usingSystem.Collections;usingSystem.Collections.Generic;usingUnityEngine;usingBest.MQTT;usingBest.MQTT.Packets.Builders;usingBest.MQTT.Packets;publicclassMQTT:MonoBehaviour{MQTTClientclient;// Start is called before the first frame updatevoidStart(){client=newMQTTClientBuilder()#if !UNITY_WEBGL || UNITY_EDITOR.WithOptions(newConnectionOptionsBuilder().WithTCP("broker.emqx.io",1883))#else.WithOptions(newConnectionOptionsBuilder().WithWebSocket("broker.emqx.io",8084).WithTLS())#endif.WithEventHandler(OnConnected).WithEventHandler(OnDisconnected).WithEventHandler(OnStateChanged).WithEventHandler(OnError).CreateClient();client.BeginConnect(ConnectPacketBuilderCallback);}privatevoidOnConnected(MQTTClientclient){client.AddTopicAlias("best_mqtt/test_topic");client.CreateSubscriptionBuilder("best_mqtt/test_topic").WithMessageCallback(OnMessage).WithAcknowledgementCallback(OnSubscriptionAcknowledged).WithMaximumQoS(QoSLevels.ExactlyOnceDelivery).BeginSubscribe();client.CreateApplicationMessageBuilder("best_mqtt/test_topic").WithPayload("Hello MQTT World!").WithQoS(QoSLevels.ExactlyOnceDelivery).WithContentType("text/plain; charset=UTF-8").BeginPublish();}privatevoidOnMessage(MQTTClientclient,SubscriptionTopictopic,stringtopicName,ApplicationMessagemessage){// Convert the raw payload to a stringvarpayload=Encoding.UTF8.GetString(message.Payload.Data,message.Payload.Offset,message.Payload.Count);Debug.Log($"Content-Type: '{message.ContentType}' Payload: '{payload}'");client.CreateUnsubscribePacketBuilder("best_mqtt/test_topic").WithAcknowledgementCallback((client,topicFilter,reasonCode)=>Debug.Log($"Unsubscribe request to topic filter '{topicFilter}' returned with code: {reasonCode}")).BeginUnsubscribe();}privatevoidOnSubscriptionAcknowledged(MQTTClientclient,SubscriptionTopictopic,SubscribeAckReasonCodesreasonCode){if(reasonCode<=SubscribeAckReasonCodes.GrantedQoS2)Debug.Log($"Successfully subscribed with topic filter '{topic.Filter.OriginalFilter}'. QoS granted by the server: {reasonCode}");elseDebug.Log($"Could not subscribe with topic filter '{topic.Filter.OriginalFilter}'! Error code: {reasonCode}");}privatevoidOnDestroy(){client?.CreateDisconnectPacketBuilder().WithReasonCode(DisconnectReasonCodes.NormalDisconnection).WithReasonString("Bye").BeginDisconnect();}privateConnectPacketBuilderConnectPacketBuilderCallback(MQTTClientclient,ConnectPacketBuilderbuilder){returnbuilder;}privatevoidOnStateChanged(MQTTClientclient,ClientStatesoldState,ClientStatesnewState){Debug.Log($"{oldState} => {newState}");}privatevoidOnDisconnected(MQTTClientclient,DisconnectReasonCodescode,stringreason){Debug.Log($"OnDisconnected - code: {code}, reason: '{reason}'");}privatevoidOnError(MQTTClientclient,stringreason){Debug.Log($"OnError reason: '{reason}'");}}