Reading sensor data and sending your categorized results via Service Bus to multiple users

Imagine you have a sensor that needs to be monitored by many people, maybe from different locations.
Using ‘queues’ can provide a simple way to send messages to a single receiver, but what if you want more receivers to process your data? Further more, what if you want different receivers to get only the data that is relevant to them?

Taking our Geiger counter example a bit further, we can also monitor its readings and have certain users only get critical  data generated.
The first example of the “Geiger counter data publisher using Azure Service Bus” showed how to interface a Geiger counter to a BeagleBoard xm and publish the readings to a message queue using Azure Service Bus.
Now we want not only to read the data but also to inform certain groups of people about it.

One group of people (or computers) is maybe building a long-term log of all the data generated, and therefore they need ‘All Messages’. This is pretty much the same as a queue.

Another group may be interested in off-the-chart values, so that they only receive messages with content indicating radiation above a certain level. They may then check out what is causing the surplus of radiation.

A third group may want to receive only the messages in which the content indicates dangerous radiation levels detected. For example this group can be connected to the evacuation alarm. In the case of a radiation level above X, the alarm sounds and everybody must evacuate…

The Service Bus Topics and Subscriptions is a great way to filter data and publish it to the right users.

Setup

The same initial procedure to set up an Azure Service Bus account is required:

Follow this http://azure.microsoft.com/en-us/documentation/articles/service-bus-python-how-to-use-topics-subscriptions/

I recommend to create 3 different files. The first will be used to create the settings for Service Bus, the second is the file that runs on your BeagleBoard and the third is the file that the readers will use to receive the messages.

 

First File

Settings for TOPICS and SUBSCRIPTIONS.
You can create and run this file on any PC, you don’t need your BeagleBoard yet.

Import the libraries for Service Bus.
Then create a TOPIC, in this case called ‘GeigerCounter’

from azure.servicebus import *


bus_service = ServiceBusService(service_namespace='sensornetwork', account_key='s8S0GXhTNLOHrfsEdww/QE0+UYZIVxss8TK7CQkg29Y=', issuer='owner')
topicname = 'GeigerCounter'
bus_service.create_topic(topicname)

print "Topic called " + topicname + " created"

In this example I will publish my sensor data to three different groups.
We will now create a SUBSCRIPTION for the TOPIC.
This means that anyone who request data to this subscription will receive only the messages that have the properties allowed by the filter.
The first group will receive ALL the messages, regardless of their value. No rules or filters are required yet:

#first group:
bus_service.create_subscription(topicname, 'AllMessages')
print "Subscription called " + "AllMessages" + " created"

The second group will receive only the values above a certain radiation level. I want this to be an indicator of an alarm, therefore this subscription is called ‘Alarm’:

#second group:
bus_service.create_subscription(topicname, 'Alarm')
rule = Rule()
rule.filter_type = 'SqlFilter'
rule.filter_expression = 'radiationlevel = 2'
bus_service.create_rule(topicname, 'Alarm', 'AlarmFilter', rule)
bus_service.delete_rule(topicname, 'Alarm', DEFAULT_RULE_NAME)
print "Subscription called " + "Alarm" + " created"

I created a rule that filters out all messages labelled with ‘radiationlevel = 2’
I chose number 2 because it is intended for the second group, but you can use anything.
Dont worry about what ‘radiationlevel = 2’ means, this will be explained in the tutorial for the second python file (the one on your Beagleboard).
SqlFilter is the default filter type recommended by Azure.

Let`s add the third group setting:

#third group:
bus_service.create_subscription(topicname, 'Evacuate')
rule = Rule()
rule.filter_type = 'SqlFilter'
rule.filter_expression = 'radiationlevel = 3'
bus_service.create_rule(topicname, 'Evacuate', 'EvacuateFilter', rule)
bus_service.delete_rule(topicname, 'Evacuate', DEFAULT_RULE_NAME)
print "Subscription called " + "Evacuate" + " created"

This is similar to the second group, but the subscription is called ‘Evacuate’ and the filter is called ‘radiationlevel = 3’

Save the file and run it once.
This file will tell Azure servers what rules and filters you want to create.
You will need to run this file only one time, the filter will remain forever or until you delete the topics and subscriptions manually.
You should get this:

Topic called GeigerCounter created
Subscription called AllMessages created
Subscription called Alarm created
Subscription called Evacuate created

Congratulations, you have finished creating the TOPIC and SUBSCRIPTIONS.

Second file:

This is the file that will run in your BeagleBoard. It includes both the sensor data readings, interpretation and classification into the different subscriptions:

Create a file called geigertotopic.py       (Geiger to topic).
Go to the location where you want to create your file. If you want a bit more detail on how to do this, check the previous tutorial for queues.

sudo touch geigertotopic.py

Open it to add content:

sudo nano geigertotopic.py

Default on the beaglebone XM:
User: ubuntu
password: temppwd
Import libraries:

import serial
import time
import datetime
from azure.servicebus import *

Select serial port and settings:

#select serial port, baudrate (default = 9600), timeout (default = 1)
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)                    #for BeagleBoard xm
#ser = serial.Serial('/dev/tty.usbserial-A9VB6QRM', 9600, timeout=1)      #for Macbook

Make sure you know which ttyUSB port you want.
For any details on how to detect and configure your serial connection, refer to the previous tutorial.

Add the details of your Service Bus namespace and key

bus_service = ServiceBusService(service_namespace='sensornetwork', account_key='s8S0GXhTNLOHrfsEdww/QE0+UYZIVxss8TK7CQkg29Y=', issuer='owner')
topicname='GeigerCounter'
queuename = 'taskqueue'

Add the code to read the Geiger Counter data:

a=b=c=z=minutes=0
d=uSv=0.0
while True:
    timeout = time.time() + 60
    while True:
        x = ser.read()  # read one byte
        if x is '0':
            d=d+1
            #print '.'
        elif x is '1':
            d=d+1
            #print '.'
        if time.time() > timeout:
            minutes = minutes + 1
            break
    z=z+d
    print "CPM = " ,d, " total= ", z, " minutes: ", minutes
    if d > 0:
        now = datetime.now()
        uSv = round((d/108),3)

Every minute I count the detections of the Geiger tube and translate them to microSieverts per hour.
I am translating counts per minute (CPM) to microSieverts per hour (uSv/hr).
Depending on your Geiger tube the factor used in the last line needs to change, make sure you use the correct value. Refer to the previous tutorial for more details.

Now it is time to add a property so we can filter the results:

#send to group 1
        msg = Message("CPM Geiger: " + str(d) + " equivalent to: "+ str(uSv)+" uSv/hr" + "  date and time: "+ str(now) [:19], custom_properties={'radiationlevel':1})
        bus_service.send_topic_message(topicname, msg)
        #print 'Message sent to the topic:'+ topicname + ' ' + msg.body
        #print 'Body:', msg.body

This is the default group and it receives all the readings.

#send to group 2 
if uSv > 0.06: 
   msg = Message("CPM Geiger: " + str(d) + " equivalent to: "+ str(uSv)+" uSv/hr" + " date and time: "+ str(now) [:19], custom_properties={'radiationlevel':2}) 
   bus_service.send_topic_message(topicname, msg) 
   print 'ALARM! Message sent to the topic: '+ topicname + ' labelled: ' + 'radiationlevel = 2 '+ msg.body

For the second group we want to discard every reading that is below 0.06 uSv/hr:

Above this value the message will be labelled with ‘radiationlevel = 2’
Any reading with value below 0.06 uSv/hr will keep the label ‘radiationlevel = 1’

Next is the filter for group 3, with readings above 0.1 uSv/hr

 #send to group 3
        if uSv > 0.1:
            msg = Message("CPM Geiger: " + str(d) + " equivalent to: "+ str(uSv)+" uSv/hr" + "  date and time: "+ str(now) [:19], custom_properties={'radiationlevel':3})
            bus_service.send_topic_message(topicname, msg)
            print 'EVACUATE! Message sent to the topic: '+ topicname + ' filtered to: ' + 'radiationlevel = 3 ' + msg.body

This will label any reading higher than 0.1uSV/hr with ‘radiationlevel = 3’ .

    d=uSv=0.0

The last line resets the counts to start over, don’t forget it!

CTRL+X to save the file.
Run the file form your BeagleBoard:

python geigertotopic.py

After 15 minutes with my counter I got this:

CPM =  5.0  total=  5.0  minutes:  1
CPM =  6.0  total=  11.0  minutes:  2
CPM =  9.0  total=  20.0  minutes:  3
ALARM! Message sent to the topic: GeigerCounter labelled: radiationlevel = 2 CPM Geiger: 9.0 equivalent to: 0.083 uSv/hr  date and time: 2014-08-27 17:47:48
CPM =  7.0  total=  27.0  minutes:  4
ALARM! Message sent to the topic: GeigerCounter labelled: radiationlevel = 2 CPM Geiger: 7.0 equivalent to: 0.065 uSv/hr  date and time: 2014-08-27 17:48:58
CPM =  9.0  total=  36.0  minutes:  5
ALARM! Message sent to the topic: GeigerCounter labelled: radiationlevel = 2 CPM Geiger: 9.0 equivalent to: 0.083 uSv/hr  date and time: 2014-08-27 17:49:58
CPM =  5.0  total=  41.0  minutes:  6
CPM =  12.0  total=  53.0  minutes:  7
ALARM! Message sent to the topic: GeigerCounter labelled: radiationlevel = 2 CPM Geiger: 12.0 equivalent to: 0.111 uSv/hr  date and time: 2014-08-27 17:52:00
EVACUATE! Message sent to the topic: GeigerCounter filtered to: radiationlevel = 3 CPM Geiger: 12.0 equivalent to: 0.111 uSv/hr  date and time: 2014-08-27 17:52:00
CPM =  10.0  total=  63.0  minutes:  8
ALARM! Message sent to the topic: GeigerCounter labelled: radiationlevel = 2 CPM Geiger: 10.0 equivalent to: 0.093 uSv/hr  date and time: 2014-08-27 17:53:02
CPM =  17.0  total=  80.0  minutes:  9
ALARM! Message sent to the topic: GeigerCounter labelled: radiationlevel = 2 CPM Geiger: 17.0 equivalent to: 0.157 uSv/hr  date and time: 2014-08-27 17:54:03
EVACUATE! Message sent to the topic: GeigerCounter filtered to: radiationlevel = 3 CPM Geiger: 17.0 equivalent to: 0.157 uSv/hr  date and time: 2014-08-27 17:54:03
CPM =  8.0  total=  88.0  minutes:  10
ALARM! Message sent to the topic: GeigerCounter labelled: radiationlevel = 2 CPM Geiger: 8.0 equivalent to: 0.074 uSv/hr  date and time: 2014-08-27 17:55:04
CPM =  17.0  total=  105.0  minutes:  11
ALARM! Message sent to the topic: GeigerCounter labelled: radiationlevel = 2 CPM Geiger: 17.0 equivalent to: 0.157 uSv/hr  date and time: 2014-08-27 17:56:05
EVACUATE! Message sent to the topic: GeigerCounter filtered to: radiationlevel = 3 CPM Geiger: 17.0 equivalent to: 0.157 uSv/hr  date and time: 2014-08-27 17:56:05
CPM =  7.0  total=  112.0  minutes:  12
ALARM! Message sent to the topic: GeigerCounter labelled: radiationlevel = 2 CPM Geiger: 7.0 equivalent to: 0.065 uSv/hr  date and time: 2014-08-27 17:57:06
CPM =  6.0  total=  118.0  minutes:  13
CPM =  9.0  total=  127.0  minutes:  14
ALARM! Message sent to the topic: GeigerCounter labelled: radiationlevel = 2 CPM Geiger: 9.0 equivalent to: 0.083 uSv/hr  date and time: 2014-08-27 17:59:08
CPM =  8.0  total=  135.0  minutes:  15
ALARM! Message sent to the topic: GeigerCounter labelled: radiationlevel = 2 CPM Geiger: 8.0 equivalent to: 0.074 uSv/hr  date and time: 2014-08-27 18:00:09

Whenever I got a high count reading, both the ALARM and the EVACUATE were addressed as expected.

Let’s read the data we just sent and check if it is actually filtered by the Service Bus:

Third file:

Create a new file in your computer, name it recvsubsmsg.py
This “receive subscription message” python file will request to the service bus only the messages that match the filter labels configured with the first file:

from azure.servicebus import *
import sys

We want to make this file as generic as possible so that you can filter by topic, so I added import sys.

args = sys.argv

# Specify Service Bus address. Change this to your required address.
bus_service = ServiceBusService(service_namespace='sensornetwork', account_key='s8S0GXhTNLOHrfsEdww/QE0+UYZIVxss8TK7CQkg29Y=', issuer='owner')

while True:
# Receive messages from specified topic and subscription
  msg = bus_service.receive_subscription_message(args[1], args[2])
  print(msg.body)

args is used to read parameters from the command line.
Save the file.
In order to run it and receive data follow this generic syntax:

python recvsubsmsg.py <nameoftopic> <nameofsubscription>

Edit this for group 2:

python recvsubsmsg.py GeigerCounter Alarm

or for group 3:

python recvsubsmsg.py GeigerCounter Evacuate

You will only get the filtered messages, for Evacuate I get:

CPM Geiger: 12.0 equivalent to: 0.111 uSv/hr  date and time: 2014-08-27 17:52:00
CPM Geiger: 17.0 equivalent to: 0.157 uSv/hr  date and time: 2014-08-27 17:54:03
CPM Geiger: 17.0 equivalent to: 0.157 uSv/hr  date and time: 2014-08-27 17:56:05

That’s only three times I have to evacuate the building! Not bad.

NOTE: These values and labels are just examples. I am only reading harmless background radiation. The values of uSv/hr or CPM should be much much higher to actually worry about them.

Thats it!
You can now create TOPICS and SUBSCRIPTIONS using Azure Service Bus with python.

 

by Martin Garcia (plusmartin.com)

Leave a Reply

Your email address will not be published. Required fields are marked *