ESP32 send values to Rpi via BLE

Hi!
First of all, thank you Kyle for Mycodo!

I have a hydroponic system on my balcony. My problem is that my wifi does not reach there.
I’m running Mycodo inside my apartment on a Rpi zero W.
I’m trying to send temperature values from an ESP32 via BLE to my RPI using a modified version of this code:

Would it be possible to add this to a Python3 code input or am I wasting my time trying?

1 Like

Yes, it’s possible. A Listener type Input should be created, like the MQTT Input, which basically just listens for a measurement being received for it to process and store the measurement in the database. The code you’re refencing is already Python, so you should be able to just drop it into an Input module.

I am trying to implement something similar. I have a Govee temp/humid device that broadcasts the data in advertisements. I have a test python3 script using bleson but it requires root or I can use setcap to grant python3 access to the bluetooth device.

how do I grant the mycodo user access to the bluetooth device?

1 Like

The Mycodo daemon runs as root, so any Inputs that are activated will have root permissions and would be able to access any device.

@KyleGabriel thanks for the info!

I’m not a python developer so I’m just trying to cobble together a module based on the pahoo mqtt module as they should both be listeners. To get started I’m just trying to write the advertisements data to the logs. The code is below. It passes validation and doesn’t throw any exceptions. It just appears that the callback function never gets called from the bluetooth module. However, the same code works just fine when run as a python script.

# coding=utf-8
import copy
import time
import datetime

from mycodo.config_translations import TRANSLATIONS
from mycodo.databases.models import InputChannel
from mycodo.inputs.base_input import AbstractInput
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import add_measurements_influxdb
from mycodo.utils.inputs import parse_measurement
from mycodo.inputs.sensorutils import calculate_dewpoint
from mycodo.inputs.sensorutils import calculate_vapor_pressure_deficit


# Measurements
measurements_dict = {
    0: {
        'measurement': 'temperature',
        'unit': 'F'
    },
    1: {
        'measurement': 'humidity',
        'unit': 'percent'
    },
    2: {
        'measurement': 'dewpoint',
        'unit': 'F'
    },
    3: {
        'measurement': 'vapor_pressure_deficit',
        'unit': 'Pa'
    },
    4: {
        'measurement': 'battery',
        'unit': 'percent'
    },
}

# Channels
channels_dict = {
    0: {}
}

# Input information
INPUT_INFORMATION = {
    'input_name_unique': 'H5101',
    'input_manufacturer': 'Govee',
    'input_name': 'Govee H5101',
    'input_name_short': 'GVH5101',
    'input_library': 'bleson',
    'measurements_name': 'Humidity/Temperature',
    'measurements_dict': measurements_dict,
    'channels_dict': channels_dict,
    'measurements_variable_amount': True,
    'listener': True,
    'url_datasheet': 'https://m.media-amazon.com/images/I/C150u6m4M6S.pdf',
    'url_product_purchase': 'https://www.amazon.com/Govee-Temperature-Thermometer-Hygrometer-Greenhouse/dp/B08CGM8DC7/ref=sr_1_1_sspa?crid=CN9AEFJIB9HQ&keywords=govee+5101&qid=1670900819&s=industrial&sprefix=govee+5101%2Cindustrial%2C127&sr=1-1-spons&psc=1&spLa=ZW5jcnlwdGVkUXVhbGlmaWVyPUEyRDVHVTNQOU1NTjhGJmVuY3J5cHRlZElkPUEwMjYxNDM5MVU2TjRTSEJZSVg4OSZlbmNyeXB0ZWRBZElkPUEwMDE3Njk0M0pSTVZJMzBBVDRNWiZ3aWRnZXROYW1lPXNwX2F0ZiZhY3Rpb249Y2xpY2tSZWRpcmVjdCZkb05vdExvZ0NsaWNrPXRydWU=',
    'url_manufacturer': 'https://us.govee.com/',

    'message': 'A channel is used for each unique device.  You must name the channel the same as the device name as '
    'received in the bluetooth advertisement.  Each of the measurements will be stored for the device when a '
    'bluetooth advertisement is received for the device name.',

    'options_enabled': [
        'measurements_select',
    ],

    'options_disabled': ['interface'],

    'interfaces': ['Mycodo'],

    'dependencies_module': [
        ('pip-pypi', 'bleson', 'bleson==0.1.8')
    ],

    'custom_channel_options': [
        {
            'id': 'name',
            'type': 'text',
            'default_value': '',
            'required': True,
            'name': TRANSLATIONS['name']['title'],
            'phrase': TRANSLATIONS['name']['phrase']
        },
    ]
}

class InputModule(AbstractInput):
    """
    A sensor support class that measures the GVH5101's humidity and temperature
    and calculates the dew point and vapor pressure deficit.

    Based on Home Assistant tutorial.
    You can find the initial implementation here:
    - https://austinsnerdythings.com/2021/12/27/using-the-govee-bluetooth-thermometer-with-home-assistant-python-and-mqtt/

    """
    def __init__(self, input_dev, testing=False):
        super().__init__(input_dev, testing=testing, name=__name__)

        self.adapter =None
        self.observer = None
        self.isListening = False

        if not testing:
            self.try_initialize()

    def initialize(self):
        input_channels = db_retrieve_table_daemon(
            InputChannel).filter(InputChannel.input_id == self.input_dev.unique_id).all()
        self.options_channels = self.setup_custom_channel_options_json(
            INPUT_INFORMATION['custom_channel_options'], input_channels)


    def close(self):
        """Stop reading sensor, remove callbacks."""
        self.observer = None
        self.adapter = None

    def listener(self):
        self.logger.debug("Subscribing to advertisements...")
        try:
            self.logger.debug("Aquiring bluetooth adapter...")
            from bleson import get_provider, Observer
            self.adapter = get_provider().get_adapter()
            self.logger.debug("Creating observer...")
            self.observer = Observer(self.adapter)
            self.observer.on_advertising_data = self.on_advertisement
            self.observer.start()
            self.isListening = True
        except:
            self.logger.error("Unable to subscribe to ble advertisements.")

        from time import sleep
        while self.isListening:
            sleep(60)

    def stop_input(self):
        """Called when Input is deactivated."""
        self.logger.debug("Un-Subscribing from advertisements...")
        self.isListening = False
        self.observer.stop()
        self.observer.on_advertising_data = None

    def c2f(val):
        return round(32 + 9*val/5, 2)

    def temp_hum(values, battery, address):
        global last
        values = int.from_bytes(values, 'big')
        if address not in last or last[address] != values:
            temp = float(values / 10000)
            hum = float((values % 1000) / 10)
            temperature = c2f(temp)
            humidity = hum
            battery = battery
            dew_point = calculate_dewpoint(self.temp_temperature, self.temp_humidity)
            vpd = calculate_vapor_pressure_deficit(self.temp_temperature, self.temp_humidity)
            self.logger.debug("Calculated Values:`n temp: {} 'n hum: {} `n batt: {} 'n dew: {} 'n vpd: {}".format(
                temperature, humidity, battery, dew_point, vpd))

    def on_advertisement(advertisement):
        self.logger.debug("Received advertisement...")
        self.name = None
        self.temp_temperature = 0
        self.temp_humidity = 0
        self.temp_dew_point = None
        self.temp_vpd = None
        self.temp_battery = 0

        mfg_data = advertisement.mfg_data
        if mfg_data is not None:
            if advertisement.name is not None and advertisement.name.startswith('GVH5101'):
                name = advertisement.name
                address = advertisement.address
                self.logger.debug("Processing:`n name: {} 'n address: {}".format(
                    name,address))
                self.temp_hum(mfg_data[4:7], mfg_data[7], address)

looks like I needed to pass self to the on_advertiesment function

I am now able to read bluetooth advertisements…

# coding=utf-8
import copy
import time
import datetime
import traceback

from mycodo.config_translations import TRANSLATIONS
from mycodo.databases.models import InputChannel
from mycodo.inputs.base_input import AbstractInput
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import add_measurements_influxdb
from mycodo.utils.inputs import parse_measurement
from mycodo.inputs.sensorutils import calculate_dewpoint
from mycodo.inputs.sensorutils import calculate_vapor_pressure_deficit


# Measurements
measurements_dict = {
    0: {
        'measurement': 'temperature',
        'unit': 'F'
    },
    1: {
        'measurement': 'humidity',
        'unit': 'percent'
    },
    2: {
        'measurement': 'dewpoint',
        'unit': 'F'
    },
    3: {
        'measurement': 'vapor_pressure_deficit',
        'unit': 'Pa'
    },
    4: {
        'measurement': 'battery',
        'unit': 'percent'
    },
}

# Channels
channels_dict = {
    0: {}
}

# Input information
INPUT_INFORMATION = {
    'input_name_unique': 'H5101',
    'input_manufacturer': 'Govee',
    'input_name': 'Govee H5101',
    'input_name_short': 'GVH5101',
    'input_library': 'bleson',
    'measurements_name': 'Humidity/temperature',
    'measurements_dict': measurements_dict,
    'channels_dict': channels_dict,
    'measurements_variable_amount': True,
    'listener': True,
    'url_datasheet': 'https://m.media-amazon.com/images/I/C150u6m4M6S.pdf',
    'url_product_purchase': 'https://www.amazon.com/Govee-temperature-Thermometer-Hygrometer-Greenhouse/dp/B08CGM8DC7/ref=sr_1_1_sspa?crid=CN9AEFJIB9HQ&keywords=govee+5101&qid=1670900819&s=industrial&sprefix=govee+5101%2Cindustrial%2C127&sr=1-1-spons&psc=1&spLa=ZW5jcnlwdGVkUXVhbGlmaWVyPUEyRDVHVTNQOU1NTjhGJmVuY3J5cHRlZElkPUEwMjYxNDM5MVU2TjRTSEJZSVg4OSZlbmNyeXB0ZWRBZElkPUEwMDE3Njk0M0pSTVZJMzBBVDRNWiZ3aWRnZXROYW1lPXNwX2F0ZiZhY3Rpb249Y2xpY2tSZWRpcmVjdCZkb05vdExvZ0NsaWNrPXRydWU=',
    'url_manufacturer': 'https://us.govee.com/',

    'message': 'A channel is used for each unique device.  You must name the channel the same as the device name as '
    'received in the bluetooth advertisement.  Each of the measurements will be stored for the device when a '
    'bluetooth advertisement is received for the device name.',

    'options_enabled': [
        'measurements_select',
    ],

    'options_disabled': ['interface'],

    'interfaces': ['Mycodo'],

    'dependencies_module': [
        ('pip-pypi', 'bleson', 'bleson==0.1.8')
    ],

    'custom_channel_options': [
        {
            'id': 'name',
            'type': 'text',
            'default_value': '',
            'required': True,
            'name': TRANSLATIONS['name']['title'],
            'phrase': TRANSLATIONS['name']['phrase']
        },
    ]
}

class InputModule(AbstractInput):
    """
    A sensor support class that measures the GVH5101's humidity and temperature
    and calculates the dew point and vapor pressure deficit.

    Based on Home Assistant tutorial.
    You can find the initial implementation here:
    - https://austinsnerdythings.com/2021/12/27/using-the-govee-bluetooth-thermometer-with-home-assistant-python-and-mqtt/

    """
    def __init__(self, input_dev, testing=False):
        super().__init__(input_dev, testing=testing, name=__name__)

        self.adapter =None
        self.observer = None
        self.isListening = False

        if not testing:
            self.try_initialize()

    def initialize(self):
        input_channels = db_retrieve_table_daemon(
            InputChannel).filter(InputChannel.input_id == self.input_dev.unique_id).all()
        self.options_channels = self.setup_custom_channel_options_json(
            INPUT_INFORMATION['custom_channel_options'], input_channels)


    def close(self):
        """Stop reading sensor, remove callbacks."""
        self.observer = None
        self.adapter = None

    def listener(self):
        self.logger.debug("Subscribing to advertisements...")
        self.logger.debug("Aquiring bluetooth adapter...")
        from bleson import get_provider, Observer
        self.adapter = get_provider().get_adapter()
        self.logger.debug("Creating observer...")
        self.observer = Observer(self.adapter)
        self.observer.on_advertising_data = self.on_advertisement
        self.observer.start()
        self.isListening = True
        self.logger.debug("Listening for ble advertisements...")

    def stop_input(self):
        """Called when Input is deactivated."""
        self.logger.debug("Un-Subscribing from advertisements...")
        self.isListening = False
        self.observer.stop()
        self.observer.on_advertising_data = None

    def celsiusToFahrenheit(self, val):
        self.logger.debug("Start celsiusToFahrenheit.")
        return round(32 + 9*val/5, 2)

    def processAdvertisement(self, input):
        try:
            self.logger.debug("Start processAdvertisement.")
            values = int.from_bytes(input, 'big')
            # self.logger.debug("values: {}".format(values))
            tempC = float(values / 10000)
            # self.logger.debug("tempC: {}".format(tempC))
            tempF = self.celsiusToFahrenheit(tempC)
            # self.logger.debug("tempF: {}".format(tempF))
            humidity = float((values % 1000) / 10)
            # self.logger.debug("hum: {}".format(humidity))
            dew_point = self.celsiusToFahrenheit(calculate_dewpoint(tempC, humidity))
            # self.logger.debug("dew: {}".format(dew_point))
            vpd = calculate_vapor_pressure_deficit(tempC, humidity)
            # self.logger.debug("vpd: {}".format(vpd))
            # self.logger.debug("Calculated Values:`n temp: {} 'n hum: {} 'n dew: {} 'n vpd: {}".format(
            #     tempF, humidity, dew_point, vpd))
            return tempF, humidity, dew_point, vpd
        except:
            "processAdvertisement failed`n{}".format(traceback.print_exc())

    def on_advertisement(self, advertisement):
        mfg_data = advertisement.mfg_data
        try:
            if mfg_data is not None:
                if advertisement.name is not None and advertisement.name.startswith('GVH5101'):
                    name = advertisement.name
                    address = advertisement.address
                    self.logger.debug("Processing:`n name: {} 'n address: {}".format(
                        name,address))
                    battery =  mfg_data[7]
                    self.logger.debug("batt: {}".format(battery))
                    temp, hum, dew, vpd = self.processAdvertisement(mfg_data[4:7])
                    self.logger.info("Device: {} [addr: {}; batt: {}; temp: {}; hum: {}; vpd: {}; dew: {}]".format(
                        name, address, battery, temp, hum, vpd, dew))
        except:
            "on_advertisement failed`n{}".format(traceback.print_exc())