My first custom input: the Grove Water Level Sensor

I just successfully deployed my first custom input module and wanted to share it for feedback, and ask if I should put it on GitHub for others to use. I am a fairly amateur programmer, so I was stoked to get this input up and running. My wife wasn’t as excited about this achievement as I had hoped she would be, so I decided to make a post here, where people will appreciate it more than she does.

I run a 4-site DWC hydroponic system monitored and conditioned by a Pi running mycodo. The initial design included two float sensors in the “brain” tank; one for the maximum liquid level, and one for the minimum liquid level. mycodo can then top-off the system from a fresh water reservoir when the level is low, but won’t overfill the system.

One drawback to this design is that I can’t tell if the plants are taking up water until the low level sensor is triggered, and similarly can’t monitor the uptake rate and make adjustments to EC or environmental factors based on that data. I found the 10cm Grove Water Level Sensor for sale and it seemed like it could provide the granular data I was looking for-- and was cheap enough!

The sensor uses 21 capacitive pads to determine what percentage of the 10cm board is immersed, and is designed to return a %.

This week, the sensor arrived, and I set about making my first custom input module. I added custom options to input the low and high level volumes, so that the percentage returned can be translated to the volume of liquid in the system. This should work in any system where the difference between the maximum and minimum liquid level is less than 10cm.

# A mycodo custom input module for the Grove Water Level Sensor (10cm)
# version 1.2
# coding=utf-8
import copy
import time
import smbus2 as smbus
import logging
from mycodo.inputs.base_input import AbstractInput
from flask_babel import lazy_gettext

#Function to check for positive value
def constraints_pass_positive_value(mod_input, value):
    """Check if the user input is acceptable"""
    errors = []
    all_passed = True
    if value <= 0:  # Ensure value is positive
        all_passed = False
        errors.append("Must be a positive value")
    return all_passed, errors, mod_input

# Measurements
measurements_dict = {
    0: {
        'measurement': 'liquid_level',
        'unit': 'percent'
    },
    1: {
        'measurement': 'volume',
        'unit': 'l'
    }
}

INPUT_INFORMATION = {
    'input_name_unique': 'Grove - Liquid Level Sensor',
    'input_manufacturer': 'Seeedstudio',
    'input_name': 'Liquid Level Sensor',
    'input_library': 'smbus2',
    'measurements_name': 'Solution Level',
    'measurements_dict': measurements_dict,
    'url_manufacturer': 'https://wiki.seeedstudio.com/Grove-Water-Level-Sensor/',
    'url_datasheet': 'https://raw.githubusercontent.com/SeeedDocument/Grove-Water-LEVER-Sensor/master/res/Grove%20-%20Water%20Level%20Sensor%20(10CM)_SCH%26PCB.zip',
    'url_product_purchase': 'https://www.seeedstudio.com/Grove-Water-Level-Sensor-10CM-p-4443.html',

    'dependencies_module': [
        ('pip-pypi', 'smbus2', 'smbus2==0.4.1'),
    ],

    'interfaces': ['I2C'],
    'i2c_location': ['0x77', '0x78'],
    'i2c_address_editable': False,

    'options_enabled': [
        'i2c_location',
        'period',
        'pre_output'
    ],
    'options_disabled': ['interface'],
    
    'custom_options': [
        {   'id': 'volume_calibration',
            'type': 'message',
            'default_value': """The Grove Liquid Level sensor returns a value that is the percentage of the sensor that is below the liquid level. To set channel 1 to return the volume of liquid represented by the percentage value, please enter the volume of liquid present when the sensor reports 0% and 100%."""
        },
        {  # This starts a new line for the next options
            'type': 'new_line'
        },
        {
            'id': 'low_end_volume',
            'type': 'integer',
            'default_value': 0,
            'required': True,
            'constraints_pass': constraints_pass_positive_value,
            'name': "{}: {}".format(lazy_gettext('% to Volume Conversion:'), lazy_gettext('Low-end Volume (L)')),
            'phrase': lazy_gettext('Volume of liquid in Liters at 0%:')
        },
        {  # This starts a new line for the next options
            'type': 'new_line'
        },
        {
            'id': 'hi_end_volume',
            'type': 'integer',
            'default_value': 100,
            'required': True,
            'constraints_pass': constraints_pass_positive_value,
            'name': "{}: {}".format(lazy_gettext('% to Volume Conversion:'), lazy_gettext('High-end Volume (L)')),
            'phrase': lazy_gettext('Volume of liquid in Liters at 100%:')
        }
    ]
}

class InputModule(AbstractInput):
    """ A sensor support class that uses a Grove sensor to monitor the water level """
    def __init__(self, input_dev, *args, **kwargs):
        super().__init__(input_dev, *args, **kwargs)
        
        # I2C Bus (can be adjusted based on your setup)
        self.i2c_bus = smbus.SMBus(1)

        # I2C addresses
        self.low_addr = 0x77
        self.high_addr = 0x78

        # Threshold and sensor range
        self.THRESHOLD = 100
        self.sensorvalue_min = 250
        self.sensorvalue_max = 255

        # Setup logger for the input module
        self.logger = logging.getLogger(__name__)
        
        # Setup volume option variables
        self.low_end_volume = None
        self.hi_end_volume = None
        
        self.sensor = None
        
        # Set custom option variables to defaults or user-set values
        self.setup_custom_options(INPUT_INFORMATION['custom_options'], input_dev)
        
        self.logger.info("Variable values: {}, {}".format(
            self.low_end_volume,
            self.hi_end_volume))

        self.initialize_input()

    def initialize_input(self):
        """ Initialize the Grove water level sensor class """
        try:
            # Assuming sensor setup can involve reading from I2C
            self.sensor = True  # Placeholder for any sensor-specific initialization
            self.logger.info("Sensor initialized successfully!")
        except Exception as e:
            self.logger.exception(f"Exception encountered while initializing input: {e}")

    def get_measurement(self):
        """ Gets the Grove Sensor's water level in % """
        self.return_dict = copy.deepcopy(measurements_dict)
        if not self.sensor:
            self.logger.error("Sensor not set up")
            return
        try:
            low_data = self.get_low_8_section_value()
            high_data = self.get_high_12_section_value()
            water_level = self.calculate_water_level(low_data, high_data)
            
            # Calculate the volume from the percent and user parameters
            volume = (water_level / 100) * (self.hi_end_volume - self.low_end_volume) + self.low_end_volume
            self.value_set(0, water_level)
            self.value_set(1, volume)
            self.logger.debug(f"Value returned from the sensor library: {water_level}%, {volume}L. Saving to database.")
            return self.return_dict
        except Exception as msg:
            self.logger.exception(f"Input read failure: {msg}")
        return None
        
    def get_low_8_section_value(self):
        """Fetch data from the low 8 sections of the sensor."""
        try:
            low_data = self.i2c_bus.read_i2c_block_data(self.low_addr, 0, 8)
            return low_data
        except IOError as e:
            self.logger.error(f"Error reading from low section address {hex(self.low_addr)}: {e}")
            return [0] * 8  # Return zeros if error

    def get_high_12_section_value(self):
        """Fetch data from the high 12 sections of the sensor."""
        try:
            high_data = self.i2c_bus.read_i2c_block_data(self.high_addr, 0, 12)
            return high_data
        except IOError as e:
            self.logger.error(f"Error reading from high section address {hex(self.high_addr)}: {e}")
            return [0] * 12  # Return zeros if error

    def calculate_water_level(self, low_data, high_data):
        """Calculate the water level percentage."""
        touch_val = 0

        # Process low section data
        for i in range(8):
            if low_data[i] > self.THRESHOLD:
                touch_val |= 1 << i

        # Process high section data
        for i in range(12):
            if high_data[i] > self.THRESHOLD:
                touch_val |= (1 << (8 + i))

        trig_section = 0
        while touch_val & 0x01:
            trig_section += 1
            touch_val >>= 1

        water_level = trig_section * 5  # 20 sections, each representing 5%
        return water_level
1 Like

This is great. Thanks for sharing the code. I’ll include it in the next release. I can think of some interesting features we could add to this, such as converting each step to a volume of liquid, that can be stored in addition to percentage. This would simply need the user set an option that specifies the volume of liquid between steps.

1 Like

Actually, I should have looked closer, because it appears you already added volume!

Volume rate could be another useful metric. It sounds like your initial goal was to calculate that, but having the input do it would save even more time.

Thanks Kyle! I appreciate all the time you’ve put into this.

Yeah, after I got the module running, I went back and added those custom options. It is really slick how the custom inputs work and how much the user can customize the input interface.

After I get the sensor installed in my system and run it for a month or so, I’ll add any updates/insight.

1 Like