2019-09-19

RabbitMQ/MQTT

RabbitMQ 是實現進階訊息佇列協議 AMQP (Advanced Message Queuing Protocol) 的中間件軟體, 支援的協議版本為 0.9.1,也有 plugin 可以支援 1.0。

官網上有提供安裝檔案可以使用,如果在 openSUSE 15.1,可以使用下列的指令安裝:
sudo zypper in rabbitmq-server rabbitmq-server-plugins

如果要啟動服務,可以使用下列的指令:
sudo service rabbitmq-server start

如果要重啟服務,可以使用下列的指令:
sudo service rabbitmq-server restart

如果知道服務的狀態,可以使用下列的指令:
sudo service rabbitmq-server status

如果要停止服務,可以使用下列的指令:
sudo service rabbitmq-server stop


RabbitMQ 有支援 MQTT 3.1 的 plugin。下面是啟用的方式(啟用後需要重開 server 才會生效):
sudo rabbitmq-plugins enable rabbitmq_mqtt

設定檔案在 /etc/rabbitmq/rabbitmq.conf,因為只是要測試,所以我把 port 改為 1883,啟用預設的 username/password, 並且關閉允許匿名連線:
## ----------------------------------------------------------------------------
## RabbitMQ MQTT Adapter
##
## See https://github.com/rabbitmq/rabbitmq-mqtt/blob/stable/README.md
## for details
## ----------------------------------------------------------------------------

# =======================================
# MQTT section
# =======================================

## TCP listener settings.
##
mqtt.listeners.tcp.1 = 127.0.0.1:1883
mqtt.listeners.tcp.2 = ::1:1883

## TCP listener options (as per the broker configuration).
##
# mqtt.tcp_listen_options.backlog = 4096
# mqtt.tcp_listen_options.recbuf  = 131072
# mqtt.tcp_listen_options.sndbuf  = 131072
#
# mqtt.tcp_listen_options.keepalive = true
# mqtt.tcp_listen_options.nodelay   = true
#
# mqtt.tcp_listen_options.exit_on_close = true
# mqtt.tcp_listen_options.send_timeout  = 120

## TLS listener settings
## ## See https://rabbitmq.com/mqtt.html and https://rabbitmq.com/ssl.html for details.
#
# mqtt.listeners.ssl.default = 8883
#
# ssl_options.cacertfile = /path/to/tls/ca_certificate_bundle.pem
# ssl_options.certfile   = /path/to/tls/server_certificate.pem
# ssl_options.keyfile    = /path/to/tls/server_key.pem
# ssl_options.verify     = verify_peer
# ssl_options.fail_if_no_peer_cert  = true
#


## Number of Erlang processes that will accept connections for the TCP
## and TLS listeners.
##
# mqtt.num_acceptors.tcp = 10
# mqtt.num_acceptors.ssl = 10

## Whether or not to enable proxy protocol support.
## Once enabled, clients cannot directly connect to the broker
## anymore. They must connect through a load balancer that sends the
## proxy protocol header to the broker at connection time.
## This setting applies only to STOMP clients, other protocols
## like STOMP or AMQP have their own setting to enable proxy protocol.
## See the plugins or broker documentation for more information.
##
# mqtt.proxy_protocol = false

## Set the default user name and password used for anonymous connections (when client
## provides no credentials). Anonymous connections are highly discouraged!
##
mqtt.default_user = guest
mqtt.default_pass = guest

## Enable anonymous connections. If this is set to false, clients MUST provide
## credentials in order to connect. See also the mqtt.default_user/mqtt.default_pass
## keys. Anonymous connections are highly discouraged!
##
mqtt.allow_anonymous = false

## If you have multiple vhosts, specify the one to which the
## adapter connects.
##
mqtt.vhost = /

## Specify the exchange to which messages from MQTT clients are published.
##
mqtt.exchange = amq.topic

## Specify TTL (time to live) to control the lifetime of non-clean sessions.
##
mqtt.subscription_ttl = 1800000

## Set the prefetch count (governing the maximum number of unacknowledged
## messages that will be delivered).
##
mqtt.prefetch = 10

下面是使用 tcl.mqttc 測試 Subscribe 與 Publish 功能的測試程式:
package require Thread
package require mqttc
catch {console show}

set ::gThread [thread::create {thread::wait} ]
set result 0

proc subscribe { } {
    thread::send -async $::gThread {
        package require mqttc

        mqttc client "tcp://localhost:1883" "USERTest1" 1 -cleansession 1 \
                     -username guest -password guest
        client subscribe  "MQTT Examples" 1
        while 1 {
            if {[catch {set result [client  receive]}]} {
                puts "Receive error!!!"
                break
            }
            if {[llength $result] > 0} {
                puts "[lindex $result 0] - [lindex $result 1]"
                if {![string compare -nocase [lindex $result 1] "Exit"]} {
                    break
                }
            }
        }
        client unsubscribe  "MQTT Examples"
        client close
    } ::result
}

subscribe
puts "started test..."

after 250

mqttc client "tcp://localhost:1883" "USERTest2" 1 -timeout 1000 \
             -username guest -password guest
client publishMessage "MQTT Examples" "Hello MQTT!" 1 0
client publishMessage "MQTT Examples" "Exit" 1 0
client close

vwait ::result

2019-09-10

tcl.mqttc v0.6

因為 paho.mqtt.c 有一個新的版本 1.3.1,所以我做了 tcl.mqttc 相關的更新,同時將版本設成 v0.6。

不過我只有測試 tcp 部份,使用 ActiveMQ 測試 MQTT 3.1,以及使用 EMQX 測試 MQTT 3.1 以及 5 的部份。

我忘記是不是有在這個部落格貼過了,下面是 MQTT 3.1 測試 Subscribe 與 Publish 功能的測試程式:
package require Thread
package require mqttc
catch {console show}

set ::gThread [thread::create {thread::wait} ]
set result 0

proc subscribe { } {
    thread::send -async $::gThread {
        package require mqttc

        mqttc client "tcp://localhost:1883" "USERTest1" 1 -cleansession 1
        client subscribe  "MQTT Examples" 1
        while 1 {
            if {[catch {set result [client  receive]}]} {
                puts "Receive error!!!"
                break
            }
            if {[llength $result] > 0} {
                puts "[lindex $result 0] - [lindex $result 1]"
                if {![string compare -nocase [lindex $result 1] "Exit"]} {
                    break
                }
            }
        }
        client unsubscribe  "MQTT Examples"
        client close
    } ::result
}

subscribe
puts "started test..."

after 250

mqttc client "tcp://localhost:1883" "USERTest2" 1 -timeout 1000
client publishMessage "MQTT Examples" "Hello MQTT!" 1 0
client publishMessage "MQTT Examples" "Exit" 1 0
client close

vwait ::result