I am now able to read bluetooth advertisements…
# coding=utf-8
import copy
import time
import datetime
import traceback
from mycodo.config_translations import TRANSLATIONS
from mycodo.databases.models import InputChannel
from mycodo.inputs.base_input import AbstractInput
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import add_measurements_influxdb
from mycodo.utils.inputs import parse_measurement
from mycodo.inputs.sensorutils import calculate_dewpoint
from mycodo.inputs.sensorutils import calculate_vapor_pressure_deficit
# Measurements
measurements_dict = {
0: {
'measurement': 'temperature',
'unit': 'F'
},
1: {
'measurement': 'humidity',
'unit': 'percent'
},
2: {
'measurement': 'dewpoint',
'unit': 'F'
},
3: {
'measurement': 'vapor_pressure_deficit',
'unit': 'Pa'
},
4: {
'measurement': 'battery',
'unit': 'percent'
},
}
# Channels
channels_dict = {
0: {}
}
# Input information
INPUT_INFORMATION = {
'input_name_unique': 'H5101',
'input_manufacturer': 'Govee',
'input_name': 'Govee H5101',
'input_name_short': 'GVH5101',
'input_library': 'bleson',
'measurements_name': 'Humidity/temperature',
'measurements_dict': measurements_dict,
'channels_dict': channels_dict,
'measurements_variable_amount': True,
'listener': True,
'url_datasheet': 'https://m.media-amazon.com/images/I/C150u6m4M6S.pdf',
'url_product_purchase': 'https://www.amazon.com/Govee-temperature-Thermometer-Hygrometer-Greenhouse/dp/B08CGM8DC7/ref=sr_1_1_sspa?crid=CN9AEFJIB9HQ&keywords=govee+5101&qid=1670900819&s=industrial&sprefix=govee+5101%2Cindustrial%2C127&sr=1-1-spons&psc=1&spLa=ZW5jcnlwdGVkUXVhbGlmaWVyPUEyRDVHVTNQOU1NTjhGJmVuY3J5cHRlZElkPUEwMjYxNDM5MVU2TjRTSEJZSVg4OSZlbmNyeXB0ZWRBZElkPUEwMDE3Njk0M0pSTVZJMzBBVDRNWiZ3aWRnZXROYW1lPXNwX2F0ZiZhY3Rpb249Y2xpY2tSZWRpcmVjdCZkb05vdExvZ0NsaWNrPXRydWU=',
'url_manufacturer': 'https://us.govee.com/',
'message': 'A channel is used for each unique device. You must name the channel the same as the device name as '
'received in the bluetooth advertisement. Each of the measurements will be stored for the device when a '
'bluetooth advertisement is received for the device name.',
'options_enabled': [
'measurements_select',
],
'options_disabled': ['interface'],
'interfaces': ['Mycodo'],
'dependencies_module': [
('pip-pypi', 'bleson', 'bleson==0.1.8')
],
'custom_channel_options': [
{
'id': 'name',
'type': 'text',
'default_value': '',
'required': True,
'name': TRANSLATIONS['name']['title'],
'phrase': TRANSLATIONS['name']['phrase']
},
]
}
class InputModule(AbstractInput):
"""
A sensor support class that measures the GVH5101's humidity and temperature
and calculates the dew point and vapor pressure deficit.
Based on Home Assistant tutorial.
You can find the initial implementation here:
- https://austinsnerdythings.com/2021/12/27/using-the-govee-bluetooth-thermometer-with-home-assistant-python-and-mqtt/
"""
def __init__(self, input_dev, testing=False):
super().__init__(input_dev, testing=testing, name=__name__)
self.adapter =None
self.observer = None
self.isListening = False
if not testing:
self.try_initialize()
def initialize(self):
input_channels = db_retrieve_table_daemon(
InputChannel).filter(InputChannel.input_id == self.input_dev.unique_id).all()
self.options_channels = self.setup_custom_channel_options_json(
INPUT_INFORMATION['custom_channel_options'], input_channels)
def close(self):
"""Stop reading sensor, remove callbacks."""
self.observer = None
self.adapter = None
def listener(self):
self.logger.debug("Subscribing to advertisements...")
self.logger.debug("Aquiring bluetooth adapter...")
from bleson import get_provider, Observer
self.adapter = get_provider().get_adapter()
self.logger.debug("Creating observer...")
self.observer = Observer(self.adapter)
self.observer.on_advertising_data = self.on_advertisement
self.observer.start()
self.isListening = True
self.logger.debug("Listening for ble advertisements...")
def stop_input(self):
"""Called when Input is deactivated."""
self.logger.debug("Un-Subscribing from advertisements...")
self.isListening = False
self.observer.stop()
self.observer.on_advertising_data = None
def celsiusToFahrenheit(self, val):
self.logger.debug("Start celsiusToFahrenheit.")
return round(32 + 9*val/5, 2)
def processAdvertisement(self, input):
try:
self.logger.debug("Start processAdvertisement.")
values = int.from_bytes(input, 'big')
# self.logger.debug("values: {}".format(values))
tempC = float(values / 10000)
# self.logger.debug("tempC: {}".format(tempC))
tempF = self.celsiusToFahrenheit(tempC)
# self.logger.debug("tempF: {}".format(tempF))
humidity = float((values % 1000) / 10)
# self.logger.debug("hum: {}".format(humidity))
dew_point = self.celsiusToFahrenheit(calculate_dewpoint(tempC, humidity))
# self.logger.debug("dew: {}".format(dew_point))
vpd = calculate_vapor_pressure_deficit(tempC, humidity)
# self.logger.debug("vpd: {}".format(vpd))
# self.logger.debug("Calculated Values:`n temp: {} 'n hum: {} 'n dew: {} 'n vpd: {}".format(
# tempF, humidity, dew_point, vpd))
return tempF, humidity, dew_point, vpd
except:
"processAdvertisement failed`n{}".format(traceback.print_exc())
def on_advertisement(self, advertisement):
mfg_data = advertisement.mfg_data
try:
if mfg_data is not None:
if advertisement.name is not None and advertisement.name.startswith('GVH5101'):
name = advertisement.name
address = advertisement.address
self.logger.debug("Processing:`n name: {} 'n address: {}".format(
name,address))
battery = mfg_data[7]
self.logger.debug("batt: {}".format(battery))
temp, hum, dew, vpd = self.processAdvertisement(mfg_data[4:7])
self.logger.info("Device: {} [addr: {}; batt: {}; temp: {}; hum: {}; vpd: {}; dew: {}]".format(
name, address, battery, temp, hum, vpd, dew))
except:
"on_advertisement failed`n{}".format(traceback.print_exc())