I just successfully deployed my first custom input module and wanted to share it for feedback, and ask if I should put it on GitHub for others to use. I am a fairly amateur programmer, so I was stoked to get this input up and running. My wife wasn’t as excited about this achievement as I had hoped she would be, so I decided to make a post here, where people will appreciate it more than she does.
I run a 4-site DWC hydroponic system monitored and conditioned by a Pi running mycodo. The initial design included two float sensors in the “brain” tank; one for the maximum liquid level, and one for the minimum liquid level. mycodo can then top-off the system from a fresh water reservoir when the level is low, but won’t overfill the system.
One drawback to this design is that I can’t tell if the plants are taking up water until the low level sensor is triggered, and similarly can’t monitor the uptake rate and make adjustments to EC or environmental factors based on that data. I found the 10cm Grove Water Level Sensor for sale and it seemed like it could provide the granular data I was looking for-- and was cheap enough!
The sensor uses 21 capacitive pads to determine what percentage of the 10cm board is immersed, and is designed to return a %.
This week, the sensor arrived, and I set about making my first custom input module. I added custom options to input the low and high level volumes, so that the percentage returned can be translated to the volume of liquid in the system. This should work in any system where the difference between the maximum and minimum liquid level is less than 10cm.
# A mycodo custom input module for the Grove Water Level Sensor (10cm)
# version 1.2
# coding=utf-8
import copy
import time
import smbus2 as smbus
import logging
from mycodo.inputs.base_input import AbstractInput
from flask_babel import lazy_gettext
#Function to check for positive value
def constraints_pass_positive_value(mod_input, value):
"""Check if the user input is acceptable"""
errors = []
all_passed = True
if value <= 0: # Ensure value is positive
all_passed = False
errors.append("Must be a positive value")
return all_passed, errors, mod_input
# Measurements
measurements_dict = {
0: {
'measurement': 'liquid_level',
'unit': 'percent'
},
1: {
'measurement': 'volume',
'unit': 'l'
}
}
INPUT_INFORMATION = {
'input_name_unique': 'Grove - Liquid Level Sensor',
'input_manufacturer': 'Seeedstudio',
'input_name': 'Liquid Level Sensor',
'input_library': 'smbus2',
'measurements_name': 'Solution Level',
'measurements_dict': measurements_dict,
'url_manufacturer': 'https://wiki.seeedstudio.com/Grove-Water-Level-Sensor/',
'url_datasheet': 'https://raw.githubusercontent.com/SeeedDocument/Grove-Water-LEVER-Sensor/master/res/Grove%20-%20Water%20Level%20Sensor%20(10CM)_SCH%26PCB.zip',
'url_product_purchase': 'https://www.seeedstudio.com/Grove-Water-Level-Sensor-10CM-p-4443.html',
'dependencies_module': [
('pip-pypi', 'smbus2', 'smbus2==0.4.1'),
],
'interfaces': ['I2C'],
'i2c_location': ['0x77', '0x78'],
'i2c_address_editable': False,
'options_enabled': [
'i2c_location',
'period',
'pre_output'
],
'options_disabled': ['interface'],
'custom_options': [
{ 'id': 'volume_calibration',
'type': 'message',
'default_value': """The Grove Liquid Level sensor returns a value that is the percentage of the sensor that is below the liquid level. To set channel 1 to return the volume of liquid represented by the percentage value, please enter the volume of liquid present when the sensor reports 0% and 100%."""
},
{ # This starts a new line for the next options
'type': 'new_line'
},
{
'id': 'low_end_volume',
'type': 'integer',
'default_value': 0,
'required': True,
'constraints_pass': constraints_pass_positive_value,
'name': "{}: {}".format(lazy_gettext('% to Volume Conversion:'), lazy_gettext('Low-end Volume (L)')),
'phrase': lazy_gettext('Volume of liquid in Liters at 0%:')
},
{ # This starts a new line for the next options
'type': 'new_line'
},
{
'id': 'hi_end_volume',
'type': 'integer',
'default_value': 100,
'required': True,
'constraints_pass': constraints_pass_positive_value,
'name': "{}: {}".format(lazy_gettext('% to Volume Conversion:'), lazy_gettext('High-end Volume (L)')),
'phrase': lazy_gettext('Volume of liquid in Liters at 100%:')
}
]
}
class InputModule(AbstractInput):
""" A sensor support class that uses a Grove sensor to monitor the water level """
def __init__(self, input_dev, *args, **kwargs):
super().__init__(input_dev, *args, **kwargs)
# I2C Bus (can be adjusted based on your setup)
self.i2c_bus = smbus.SMBus(1)
# I2C addresses
self.low_addr = 0x77
self.high_addr = 0x78
# Threshold and sensor range
self.THRESHOLD = 100
self.sensorvalue_min = 250
self.sensorvalue_max = 255
# Setup logger for the input module
self.logger = logging.getLogger(__name__)
# Setup volume option variables
self.low_end_volume = None
self.hi_end_volume = None
self.sensor = None
# Set custom option variables to defaults or user-set values
self.setup_custom_options(INPUT_INFORMATION['custom_options'], input_dev)
self.logger.info("Variable values: {}, {}".format(
self.low_end_volume,
self.hi_end_volume))
self.initialize_input()
def initialize_input(self):
""" Initialize the Grove water level sensor class """
try:
# Assuming sensor setup can involve reading from I2C
self.sensor = True # Placeholder for any sensor-specific initialization
self.logger.info("Sensor initialized successfully!")
except Exception as e:
self.logger.exception(f"Exception encountered while initializing input: {e}")
def get_measurement(self):
""" Gets the Grove Sensor's water level in % """
self.return_dict = copy.deepcopy(measurements_dict)
if not self.sensor:
self.logger.error("Sensor not set up")
return
try:
low_data = self.get_low_8_section_value()
high_data = self.get_high_12_section_value()
water_level = self.calculate_water_level(low_data, high_data)
# Calculate the volume from the percent and user parameters
volume = (water_level / 100) * (self.hi_end_volume - self.low_end_volume) + self.low_end_volume
self.value_set(0, water_level)
self.value_set(1, volume)
self.logger.debug(f"Value returned from the sensor library: {water_level}%, {volume}L. Saving to database.")
return self.return_dict
except Exception as msg:
self.logger.exception(f"Input read failure: {msg}")
return None
def get_low_8_section_value(self):
"""Fetch data from the low 8 sections of the sensor."""
try:
low_data = self.i2c_bus.read_i2c_block_data(self.low_addr, 0, 8)
return low_data
except IOError as e:
self.logger.error(f"Error reading from low section address {hex(self.low_addr)}: {e}")
return [0] * 8 # Return zeros if error
def get_high_12_section_value(self):
"""Fetch data from the high 12 sections of the sensor."""
try:
high_data = self.i2c_bus.read_i2c_block_data(self.high_addr, 0, 12)
return high_data
except IOError as e:
self.logger.error(f"Error reading from high section address {hex(self.high_addr)}: {e}")
return [0] * 12 # Return zeros if error
def calculate_water_level(self, low_data, high_data):
"""Calculate the water level percentage."""
touch_val = 0
# Process low section data
for i in range(8):
if low_data[i] > self.THRESHOLD:
touch_val |= 1 << i
# Process high section data
for i in range(12):
if high_data[i] > self.THRESHOLD:
touch_val |= (1 << (8 + i))
trig_section = 0
while touch_val & 0x01:
trig_section += 1
touch_val >>= 1
water_level = trig_section * 5 # 20 sections, each representing 5%
return water_level