Visualize MQTT Data with InfluxDB and Grafana

Visualize MQTT Data with InfluxDB and Grafana

In this tutorial we follow up a first article to build a complete DIY Smart Home indoor weather station.

In this second article we want to save the published messages in an InfluxDB database and create a beautiful dashboard with Grafana.

This step by step tutorial is divided into 3 parts:

  1. Setup an InfluxDB database
  2. Create a MQTT Bridge
  3. Setup Grafana and create the dashboard
Grafana_thumbnail

Let us quickly summarize what we did in the first article in this series.

In this first article we build up a basic MQTT communication with a ESP8266 NodeMCU as publisher, collecting temperature and humidity sensor information. From this publisher we sent the temperature and humidity with a topic and a payload via MQTT to the MQTT broker on a Raspberry Pi. The last step was to build a subscriber as basic python script to show the last entries of the topics.

Arduino 2 Raspberry Pi WiFi Communication
NodeMCU Raspberry Pi MQTT

Now we extend our application with a InfluxDB database to store all the sensor data and the visualization tool Grafana to show us nice charts on a dashboard. The following picture shows you the big picture of the communication between all parts of this application.

InfluxDB Grafana Overview

The subscriber will be a little bit different in this article because we want to create a MQTT InfluxDB bridge which stores the MQTT data into the database. First we start with the installation and configuration of the InfluxDB.

I use the following parts for this tutorial

  • A Raspberry Pi

You find all links to the parts on the Components and Parts page

InfluxDB Setup

In our example we use InfluxDB to store the data because it is optimized for time series data. Of cause it would also be possible to work with other databases like MariaDB or mongoDB but InfluxDB work right out of the box with Grafana, we use to visualize the data.

If you are new to InfluxDB it makes sense to gain some knowledge about the database system itself. You can read the getting started guide of InfluxDB for example.

The first thing we have to do is to install InfluxDB on the Raspberry Pi. Because the Raspberry Pi is a Linux system the installation is pretty easy. Execute the following 2 commands in the Raspberry Pi terminal.

Install InfluxDB

sudo apt install influxdb

Install InfluxDB Clients

sudo apt install influxdb-client

After the successful installation you can start InfluxDB and control the current status with the following commands.

Start InfluxDB Service

sudo service influxdb start

Get the current status of InfluxDB

sudo service influxdb status

The current status should be active so that InfluxDB is running.

Influxdb Status

After the installation of InfluxDB we have to make one change in the configuration to enable the HTTP endpoint. This is necessary because we want to write data from the MQTT subscriber to an existing database. To change the configuration, use the following statement.

Open the InfluxDB configuration

sudo nano /etc/influxdb/influxdb.conf

In the configuration file scroll down with the arrow key on your keyboard to the [http] part of the configuration. Uncomment the first setting by deleting the # on “Determines whether HTTP endpoint is enabled.” and set the enabled = true. Click Ctrl + X, then Y to confirm to save and hit the enter button to save to the existing file.

Influxdb configuration

After every change in the configuration InfluxDB has to be restarted that the changes in the configuration are active. Restart InfluxDB with the following command.

Restart InfluxDB Service

sudo service influxdb restart

Now the configuration is finished and we create a database where all MQTT information are stored and a user that write the MQTT data to the database.
First start InfluxDB with the following command in the Raspberry Pi terminal

Start InfluxDB

influxdb or influx

Now we create a new database with whatever name you like. I choose “weather_stations” as name. Tpye in the following commands:

CREATE DATABASE weather_stations

The database is created and now we create a new user and give this user the rights to access the database we created before. I choose mqtt as username and password. Feel free to find a better of saver username and password.

CREATE USER mqtt WITH PASSWORD ‘mqtt’
GRANT ALL ON weather_stations TO mqtt

That was it for the InfluxDB: We installed InfluxDB, get the configuration right and created a database and user for the database.

You exit InfluxDB from the command line with exit.

The next step is to make sure the database is filled with MQTT messages.

Subscriber Setup (MQTT InfluxDB Bridge)

The job of the subscriber is to listen to all MQTT messages, pull out the ones we want to save in the database and save the messages in the right format into the InfluxDB. The bridge between the MQTT broker and the InfluxDB is a python3 script. Therefore we have to make sure that the needed packages are installed for python3. If you have installed Raspberry Pi OS on your Raspberry Pi you have also installed python3. If you have to install Raspberry Pi OS, you find a tutorial in this article. With the following commands you install the MQTT and InfluxDB packages for python3.

Install Python3 MQTT language bindings

sudo pip3 install paho-mqtt

Install Python3 InfluxDB package

sudo pip3 install influxdb

Now all python3 packages are installed and we create a new python file called MQTTInfluxDBBridge.

Create the python file for the MQTT subscriber

sudo nano MQTTInfluxDBBridge.py

The following part of this article describes the python bridge script step by step. At the end of this section you find a download button to download the whole python file as zip folder because I can not upload python files directly.

import re
from typing import NamedTuple

import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient

INFLUXDB_ADDRESS = '192.168.0.8'
INFLUXDB_USER = 'mqtt'
INFLUXDB_PASSWORD = 'mqtt'
INFLUXDB_DATABASE = 'weather_stations'

MQTT_ADDRESS = '192.168.0.8'
MQTT_USER = 'cdavid'
MQTT_PASSWORD = 'cdavid'
MQTT_TOPIC = 'home/+/+'
MQTT_REGEX = 'home/([^/]+)/([^/]+)'
MQTT_CLIENT_ID = 'MQTTInfluxDBBridge'

influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, 8086, INFLUXDB_USER, INFLUXDB_PASSWORD, None)

At the start of the script we load all packages we use:

  • re: Identify the MQTT topic with a corresponding matching pattern
  • NamedTuple: Create Name Tuples for the right dtype
  • MQTT: Access MQTT broker
  • InfluxDB: Access InfluxDB database

After all packages are loaded the InfluxDB variables are defined. The Address is the IP of the Raspberry Pi. The username name and password are the same we defined in the setup of the InfluxDB and the name of the database is where we want to store our data.

The MQTT variables are in parts the same as the first article in this series. It is important to follow the path structure using wildcards after the home folder. This is because I use the following structure:

  • home
    • livingroom
      • temperature
      • humidity
    • bathroom
      • temperature

Therefor I need in total a depth of 3 in my path structure. So for your publisher you have to define the topics like:

const char* humidity_topic = “home/livingroom/humidity”;
const char* temperature_topic = “home/livingroom/temperature”;

If you want to download the Arduino script, use the following download button to get the complete script.

Due to the variable MQTT_REGEX and the python package “re” it is possible to identify the wildcards.

class SensorData(NamedTuple):
    location: str
    measurement: str
    value: float

I create a class called SensorData to get the correct dtypes of the variables for the location, measurement and value:

Variable

Definition

Dtype

location

Where does the sensor stand (for example: livingroom)

string

measurement

What is the measurement (for example: temperature)

string

value

What is the actual sensor value (form example: 20.12 degree Celsius)

float

def on_connect(client, userdata, flags, rc):
    """ The callback for when the client receives a CONNACK response from the server."""
    print('Connected with result code ' + str(rc))
    client.subscribe(MQTT_TOPIC)

The on_connect function handle what happens when the MQTT client connects to the broker. If our clients connects to the broker we want to subscribe to all topics which starts with “home” and print the message that the connection was established. The variable rc holds and error code if the connection is not successful so the debugging is easier.

def _parse_mqtt_message(topic, payload):
    match = re.match(MQTT_REGEX, topic)
    if match:
        location = match.group(1)
        measurement = match.group(2)
        if measurement == 'status':
            return None
        return SensorData(location, measurement, float(payload))
    else:
        return None

The function _parse_mqtt_message reads the location, measurement and the sensor data from the topic path structure and identifies the variables. The function returns the variables as NamedTuple.

def _send_sensor_data_to_influxdb(sensor_data):
    json_body = [
        {
            'measurement': sensor_data.measurement,
            'tags': {
                'location': sensor_data.location
            },
            'fields': {
                'value': sensor_data.value
            }
        }
    ]
    influxdb_client.write_points(json_body)

The function _send_sensor_data_to_influxdb gets the variables as NamedTuple as input and create a json structure which will later fits to the query in Grafana. The measurement is saved directly. The location is stored in a tag structure and the value also nested in a field variable. In the last step the json structure is written to the InfluxDB database.

def on_message(client, userdata, msg):
    """The callback for when a PUBLISH message is received from the server."""
    print(msg.topic + ' ' + str(msg.payload))
    sensor_data = _parse_mqtt_message(msg.topic, msg.payload.decode('utf-8'))
    if sensor_data is not None:
        _send_sensor_data_to_influxdb(sensor_data)

The on_message function is called every time the topic is published to. In this case the topic and the message payload is saved to the two variables and printed to the terminal. Also we use the function _parse_mqtt_message to get the location, measurement and value as NamedTuple. If the sensor data is not empty we use the function _send_sensor_data_to_influxdb to store the sensor data into the InfluxDB database.

def _init_influxdb_database():
    databases = influxdb_client.get_list_database()
    if len(list(filter(lambda x: x['name'] == INFLUXDB_DATABASE, databases))) == 0:
        influxdb_client.create_database(INFLUXDB_DATABASE)
    influxdb_client.switch_database(INFLUXDB_DATABASE)

The function _init_influxdb_database() initializes the InfluxDB database. If the database does not exist, it will be created through the function.

def main():
    _init_influxdb_database()

    mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
    mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
    mqtt_client.on_connect = on_connect
    mqtt_client.on_message = on_message

    mqtt_client.connect(MQTT_ADDRESS, 1883)
    mqtt_client.loop_forever()


if __name__ == '__main__':
    print('MQTT to InfluxDB bridge')
    main()

This is the main part of the python script where all defined function are executed. First the database is initialized. Then we create a client object and set the username and password for the MQTT client. We tell the client which functions are to be run on connecting, and on receiving a message.
Once everything has been set up, we can connect to the broker with the broker IP, port and keep alive interval.
Once we have told the client to connect, let the client object run itself with the loop forever function.

Click Ctrl + X, then Y to confirm to save and hit the enter button to save to the existing file. It is also possible to download the whole script with the following download button and send the python script via SSH to your Raspberry Pi. You can use for example FileZilla for this task.

Test MQTT InfluxDB Bridge

Now we test if everything is running correctly. Run python3 MQTTInfluxDBBridge.py and watch the terminal if you receive data. You should see the temperature and humidity in the terminal like the following picture.

Before we continue the tutorial, we have to make sure that the values from the weather station are saved to the InfluxDB. Therefore we start InfluxDB with the known command: influx.

To enter the database we created before use the following command: USE weather_stations

The first thing we want to make sure is, that all variables are stored. In our example the variables are the temperature and the humidity. In InfluxDB we use the following command: SHOW measurements

Influxdb show measurements

We see that the measurements humidity and temperature are stored in the weather_stations database.

Now I also want to make sure that the sublocation and the value are stored. Therefore we use the following SQL statement to see the last 10 entries of the humidity measurement.

Influxdb last entries of measurement

Run MQTT InfluxDB Bridge at Startup

At this point of the tutorial we know that our script is running correctly we want to run it not only if we are logged in to our Raspberry Pi. Now I show you how you can define that the python script is running when the Raspberry Pi starts. Therefor we use a shell script which we execute as cron job when the Raspberry Pi is starting.

First we create the shell script with the following command.

Create shell script

nano launcher.sh

The following script will wait for 60 seconds because in my case I got the problem that during the startup the connection to the database was not possible. After I included the short pause, the script runs without any problems.
After the pause we navigate into the main folder where the python script is and execute the script with python3.

#!/bin/sh
# launcher.sh

sleep 60

cd /
cd home/pi
sudo python3 MQTTInfluxDBBridge.py
cd /
shellscript

Press Cntl-X, return to save. Now we have to make the script executable. Use the following statement.

Makes the shell script executable

chmod 755 launcher.sh

If there occur errors we want to save the error logs for debugging the application. We create a new dictionary and save the logs in a separate folder.

Create new folder to save error logs

mkdir logs

The last and final step is to add the shell script to crontab. Crontab is a background process that is able to execute scripts at defined times. Type in the following command to open crontab.

Open crontab

sudo nano crontab -e

Add the following line to execute your shell script:
@reboot sh /home/pi/launcher.sh >/home/pi/logs/cronlog 2>&1

Press Cntl-X, return to save. Now we can reboot the Raspberry Pi. There should be no errors but if so than you can open the log files with the following command.

Open error logs from contab

cd logs
cat cronlog

The last part of this tutorial is the Grafana setup to create beautiful charts and dashboards from you saved MQTT data.

Grafana Setup

To install grafana first we have to look after the latest version: https://github.com/grafana/grafana/releases

In my case version 6.5.1 is the newest version. To install grafana, use the following commands.

Install Grafana on the Raspberry Pi with version 6.5.1

wget https://dl.grafana.com/oss/release/grafana_6.5.1_armhf.deb

sudo dpkg -i grafana_6.2.2_armhf.deb

sudo apt-get update

sudo apt-get install grafana

After the installation you can start the grafana server.

Start Grafana

sudo service grafana-server start

You can access the Grafana GUI (graphical user interface) on port 3000. Typ in your browser the combination of the IP of your Raspberry Pi and the port. In my case it is

Access Grafana GUI with your browser

192.168.0.8:3000

Grafana signin

Use the following combination for your first login:

  • Default username: admin
  • Default password: admin

During your first login you have to change the password. Use a password you like and continue.

Now you choose your default data sources. Of cause we use InfluxDB. Now we have to setup the settings for the connection to the InfluxDB:

 

Grafana Database Settings
  • URL: http://localhost:8086
    Use the localhost because the InfluxDB is on the same Raspberry Pi installed. The InfluxDB can be accessed on port 8086.
    If you get an gateway error, try to use the IP of your Raspberry Pi instead of the localhost. In my example, the URL would be http://192.168.0.8:8086
  • Database: weather_stations
    Use the name of the database where the MQTT data is stored. In my case I named the database “weather_stations”.
  • User: mqtt
    Here we use the user we created during the setup of the InfluxDB.
  • Password: mqtt
    Here we use the user we created during the setup of the InfluxDB.

At the bottom of the site you can save and test the connection. You should get a message that the data source is working.

On the left menu panel you can choose the “Explore” button where you define SQL queries to get an overview about your data.

Grafana explore 1

As measurement we can select the temperature for example. Use the GROUP BY time to get a curve instead of dots. For example I choose to group by a time of 1 minute. Also I want to group by the tag (location) to get not one curve but one curve for every weather station I have in my apartment.

The following picture shows you the visualization of the temperature of my four temperature sensors.

Grafana explore 2

If you want to create visualizations for your dashboard click the add panel button on the top (see the following picture) and define your custom SQL query. It would be too much to explain every single detail of Grafana in this article but you should active the auto refreshing also on the top as last button. I choose to refresh the dashboard every minute.

Feel free to explore Grafana for yourself. But If you struggle at some point or if you have any questions, use the comment section below and I will answer your questions as soon as possible.

31 Responses

Leave A Comment