Edge detection with I2C buttons using PCF8575

Hi.

I have 22 buttons attached to the GPIO pins on a raspberry pi. Using as input “edge detection” and function “trigger edge” I can successfully turn on and off LEDs and relays which are connected via I2C. The problem I have is that I get false triggers when quickly inserting and removing a empty extension power cord from the same socket which powers the raspberry pi.
Things I tried and didn’t helped: changing from pull down to a pull up configuration, reducing resistor value from 10k to 1k, installed EMI filter, installed isolation transformer, using an UPS, using shorter wires, bounce time 3000 ms in mycodo, detecting rising edge, falling edge, both.
I now think I can’t get rid of these false triggers and hope to solve this by using buttons connected somehow to I2C via some sort of IC. (since I2C doesn’t seem to be influenced by this issue)
Could someone please suggest which integrated circuit I could use to connect those 22 buttons via I2C to mycodo? Or is it possible another way to get rid of those false triggers? Maybe by modifying the code to ignore those trigger edges shorter than a specified amount of time?
I’ve attached a picture showing my setup.
Please let me know if you need more information.

Thank you very much.

Have you connected an oscilloscope to understand what’s happening?

Thanks for replying. Unfortunately I don’t have an oscilloscope…

Hi,
Is there any way to solve this issue without an oscilloscope?
Can this be solved in software by waiting for 5mSec, and then reading the input again?
Thank you

You can copy the input, edit it to suit you needs, then import it on the Input Import page.

Hi,
I’ve tried to create an input module to read the input states (on or off) from the PCF8575 with the hope to connect to it buttons, which would solve the false detection issue… .
The code I wrote is here:
input_PCF8575.py (7.0 KB)

Unfortunately it doesn’t work and I need help to get it running…

mycodo.log shows:
2024-08-10 22:19:02,719 - DEBUG - mycodo.inputs.pcf8575_input - Initialized InputModule with input device <Input(id=48)>
2024-08-10 22:19:02,724 - DEBUG - mycodo.controllers.controller_input_5443884c - get_measurement() found
2024-08-10 22:19:02,725 - DEBUG - mycodo - Input controller with ID 5443884c-1fe8-4379-b818-8bfac05de5e1 activated.
2024-08-10 22:19:02,726 - DEBUG - mycodo.controllers.controller_input_5443884c - listener() found
2024-08-10 22:19:02,728 - DEBUG - mycodo.controllers.controller_input_5443884c - Starting listener() thread.
2024-08-10 22:19:02,732 - INFO - mycodo.controllers.controller_input_5443884c - Activated in 1129.5 ms
2024-08-10 22:19:02,734 - DEBUG - mycodo.inputs.pcf8575_input - Current measurement state: {‘button_0_state’: None, ‘button_1_state’: None, ‘button_2_state’: None, ‘button_3_state’: None, ‘button_4_state’: None, ‘button_5_state’: None, ‘button_6_state’: None, ‘button_7_state’: None, ‘button_8_state’: None, ‘button_9_state’: None, ‘button_10_state’: None, ‘button_11_state’: None, ‘button_12_state’: None, ‘button_13_state’: None, ‘button_14_state’: None, ‘button_15_state’: None}
2024-08-10 22:19:02,837 - DEBUG - mycodo.controllers.controller_input_5443884c - Adding measurements to InfluxDB with ID 5443884c-1fe8-4379-b818-8bfac05de5e1: {}

and the live measurement page shows:
On/Off: PCF8575 inp 16-Channel I/O Expander (Input 5443884c)

Input (PCF8575_input), 15.0 second interval
Measurement | Timestamp
0 bool (button_0_state) CH0 | No Data Last 30 sec
0 bool (button_1_state) CH1 | No Data Last 30 sec
0 bool (button_2_state) CH2 | No Data Last 30 sec
0 bool (button_3_state) CH3 | No Data Last 30 sec
0 bool (button_4_state) CH4 | No Data Last 30 sec
0 bool (button_5_state) CH5 | No Data Last 30 sec
0 bool (button_6_state) CH6 | No Data Last 30 sec
0 bool (button_7_state) CH7 | No Data Last 30 sec
0 bool (button_8_state) CH8 | No Data Last 30 sec
0 bool (button_9_state) CH9 | No Data Last 30 sec
0 bool (button_10_state) CH10 | No Data Last 30 sec
0 bool (button_11_state) CH11 | No Data Last 30 sec
0 bool (button_12_state) CH12 | No Data Last 30 sec
0 bool (button_13_state) CH13 | No Data Last 30 sec
0 bool (button_14_state) CH14 | No Data Last 30 sec
0 bool (button_15_state) CH15 | No Data Last 30 sec

The expected output should be half of them with a value of 1…

The PCF8575 is correctly connected to the PI and also works as expected, with the following code:
$ cat pcf8575


import smbus2
import logging

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# PCF8575 default I2C address
PCF8575_ADDRESS = 0x20

# Initialize I2C bus
bus = smbus2.SMBus(1)  # 1 indicates /dev/i2c-1

try:
   # Direct I2C read test
   logger.debug("Performing direct I2C read test")
   test_data = bus.read_word_data(PCF8575_ADDRESS, 0)
   test_value = ((test_data << 8) & 0xFF00) | (test_data >> 8)
   logger.debug(f"Test Read Data: 0x{test_data:04X}, Processed Value: 0x{test_value:04X}")
except Exception as e:
   logger.error(f"Direct I2C read test failed: {e}")

I get:
python pcf8575
DEBUG:main:Performing direct I2C read test
DEBUG:main:Test Read Data: 0x0FF0, Processed Value: 0xF00F

and with the following code: cat channel


import smbus2
import logging

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# PCF8575 default I2C address
PCF8575_ADDRESS = 0x20

# Initialize I2C bus
bus = smbus2.SMBus(1)  # 1 indicates /dev/i2c-1

try:
    # Direct I2C read test
    logger.debug("Performing direct I2C read test")

    # Read a 16-bit word from the PCF8575
    test_data = bus.read_word_data(PCF8575_ADDRESS, 0)

    # The data returned is in little-endian format, so swap bytes
    test_value = ((test_data << 8) & 0xFF00) | (test_data >> 8)
    logger.debug(f"Test Read Data: 0x{test_data:04X}, Processed Value: 0x{test_value:04X}")

    # Iterate over each of the 16 bits to determine the state of each channel
    for channel in range(16):
        # Extract the state of the current channel
        channel_state = (test_value >> channel) & 1
        logger.info(f"Channel {channel + 1}: {'ON' if channel_state else 'OFF'}")

except Exception as e:
    logger.error(f"Direct I2C read test failed: {e}")

I get the expected output:
~ $ python channel
DEBUG:main:Performing direct I2C read test
DEBUG:main:Test Read Data: 0x0FF0, Processed Value: 0xF00F
INFO:main:Channel 1: ON
INFO:main:Channel 2: ON
INFO:main:Channel 3: ON
INFO:main:Channel 4: ON
INFO:main:Channel 5: OFF
INFO:main:Channel 6: OFF
INFO:main:Channel 7: OFF
INFO:main:Channel 8: OFF
INFO:main:Channel 9: OFF
INFO:main:Channel 10: OFF
INFO:main:Channel 11: OFF
INFO:main:Channel 12: OFF
INFO:main:Channel 13: ON
INFO:main:Channel 14: ON
INFO:main:Channel 15: ON
INFO:main:Channel 16: ON

Any advise/help is much appreciated. Thank you

Hello everybody!
I’m trying to use the PCF8575 as input for buttons. The following code used as custom input works ok, it detects the changes on all channels.

import copy
import datetime
import threading
import time
import smbus2

from mycodo.databases.models import Trigger
from mycodo.inputs.base_input import AbstractInput
from mycodo.mycodo_client import DaemonControl
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import write_influxdb_value

# Measurements
measurements_dict = {}

for each_channel in range(16):
   measurements_dict[each_channel] = {
       'measurement': 'buttons_edge',
       'unit': 'bool'
   }

# Input information
INPUT_INFORMATION = {
   'input_name_unique': 'EDGE_PCF8575_toate_canalele',
   'input_manufacturer': 'XXX',
   'input_name': 'PCF8575 Edge ALL Channels Detection',
   'input_library': 'smbus2',
   'measurements_name': 'Rising/Falling Edge',
   'measurements_dict': measurements_dict,

   'options_enabled': [
       'i2c_location',
       'i2c_bus',
       'pre_output',
       'poll_interval'
   ],
   'options_disabled': ['interface'],

   'dependencies_module': [
       ('pip-pypi', 'smbus2', 'smbus2==0.4.1')
   ],

   'interfaces': ['I2C'],
   'i2c_location': ['0x20', '0x21', '0x22', '0x23', '0x24', '0x25', '0x26', '0x27'],
   'i2c_address_editable': False,
   'i2c_address_default': '0x20', 
   'custom_options': [
       {
           'id': 'poll_interval',
           'type': 'float',
           'default_value': 0.1,
           'name': 'Poll Interval',
           'phrase': 'The interval (in seconds) between checks of the PCF8575 pins for state changes.'
       }
   ]
}

class InputModule(AbstractInput):
   """A sensor support class that listens for rising or falling pin edge events on the PCF8575."""

   def __init__(self, input_dev, testing=False):
       super().__init__(input_dev, testing=testing, name=__name__)

       self.bus = None
       self.address = int(str(self.input_dev.i2c_location), 16)
       self.poll_interval = getattr(self.input_dev, 'poll_interval', 0.1)  # Default to 0.1 if not set
       self.previous_states = None  # Initial state is unknown
       #self.previous_states = 0xFFFF  # Initial state (all pins high) ## asta am scos'o am inlocuit'o cu ce'i deasupra...
       self.control = None

       if not testing:
           self.setup_custom_options(
               INPUT_INFORMATION['custom_options'], input_dev)
           self.try_initialize()

   def initialize(self):
       try:
           self.bus = smbus2.SMBus(self.input_dev.i2c_bus)
           self.control = DaemonControl()

           self.logger.info("PCF8575 initialized without causing an exception!...")
           self.start_polling()

       except Exception as e:
           self.logger.exception("Exception encountered while initializing PCF8575: {}".format(e))

   def read_pcf8575(self):
       """ Reads the state of all pins (16 bits) """
       try:
           data = self.bus.read_word_data(self.address, 0)
           # Correct byte order if necessary
           data = ((data & 0xFF) << 8) | (data >> 8)
           return data
       except Exception as e:
           self.logger.exception(f"Error reading from PCF8575: {e}")
           return None

   def start_polling(self):
       """ Start polling the PCF8575 for pin state changes. """
       self.running = True
       self.poll_thread = threading.Thread(target=self.poll_pins)
       self.poll_thread.start()

   def poll_pins(self):
       """ Poll the PCF8575 pins and detect edges. """
       while self.running:
           current_states = self.read_pcf8575()
           if current_states is not None:
               current_states = ~current_states & 0xFFFF  # Invert the pin states and mask to 16 bits
               if self.previous_states is None:
                   # Skip edge detection for the first cycle
                   self.previous_states = current_states      
               else:
                   changed_pins = current_states ^ self.previous_states
                   if changed_pins:
                       for pin in range(16):
                           if changed_pins & (1 << pin):
                               # Extract the previous and current states for the pin
                               previous_state = (self.previous_states >> pin) & 1
                               current_state = (current_states >> pin) & 1
                               
                               # Log the state change detected on this pin
                               self.logger.debug(
                                   f"State change detected on channel {pin}: {previous_state} -> {current_state}"
                               )
                               
                               # Call edge_detected with the actual state (0 or 1)
                               self.edge_detected(pin, current_state)
                   
                   # Update the previous states to the current ones
                   self.previous_states = current_states
                   
           time.sleep(self.poll_interval)

   def edge_detected(self, pin, state):
       """
       Handle edge detection for a specific pin.
       :param pin: The pin number (0-15).
       :param state: The current state of the pin (1 or 0).
       """
       state_str = "Rising" if state else "Falling"
       rising_or_falling = 1.0 if state else -1.0                    ### aici k sa nu mai dea eroare in influx .. convert to float

       self.logger.debug(f"Edge detected on pin {pin}: {state_str}")

       write_db = threading.Thread(
           target=write_influxdb_value,
           args=(self.unique_id, measurements_dict[pin]['unit'], rising_or_falling,),
           kwargs={'channel': pin,
                   'measure': measurements_dict[pin]['measurement'],
                   'timestamp': datetime.datetime.utcnow()})
       write_db.start()

       trigger = db_retrieve_table_daemon(Trigger)
       trigger = trigger.filter(Trigger.trigger_type == 'trigger_edge')
       trigger = trigger.filter(Trigger.measurement == self.unique_id)
       trigger = trigger.filter(Trigger.is_activated.is_(True))

       for each_trigger in trigger.all():
           if each_trigger.edge_detected in ['both', state_str.lower()]:
               now = time.time()
               timestamp = datetime.datetime.fromtimestamp(now).strftime('%Y-%m-%d %H-%M-%S')
               message = "{ts}\n[Trigger {cid} ({cname})] " \
                         "Input {oid} ({name}) {state} edge detected " \
                         "on pin {pin}".format(
                               ts=timestamp,
                               cid=each_trigger.id,
                               cname=each_trigger.name,
                               oid=self.unique_id,
                               name=self.input_dev.name,
                               state=state_str,
                               pin=pin)
               self.logger.debug("Edge: {}".format(message))

               self.control.trigger_all_actions(
                   each_trigger.unique_id, message=message)

   def stop_input(self):
       """Called when Input is deactivated."""
       self.running = False
       try:
           if self.poll_thread.is_alive():
               self.poll_thread.join()
           self.logger.debug("Stopping PCF8575 polling")
       except Exception as e:
           self.logger.exception("Exception during cleanup: {}".format(e))

What I would need now is a trigger edge function, which would start an action. The code I currently work on is this:

import datetime
import threading
import time
import smbus2

from mycodo.databases.models import CustomController
from mycodo.functions.base_function import AbstractFunction
from mycodo.mycodo_client import DaemonControl
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import write_influxdb_value
from mycodo.config_translations import TRANSLATIONS
from mycodo.databases.models import Actions
from mycodo.databases.models import Output
from mycodo.actions.base_action import AbstractFunctionAction

 
# Define function information
FUNCTION_INFORMATION = {
    'function_name_unique': 'edge_detection_funct3ion',
    'function_name': 'Edge Detection Function',
    'message': 'Monitors PCF8575 for edge detection and activates outputs.',
    'options_enabled': ['custom_options', 'function_status'],
    'dependencies_module': [('pip-pypi', 'smbus2', 'smbus2==0.4.1')],
    'custom_options': [
        {'id': 'i2c_location', 'type': 'text', 'default_value': '0x20', 'name': 'I2C Location', 'phrase': 'I2C address'},
        {'id': 'i2c_bus', 'type': 'integer', 'default_value': 1, 'name': 'I2C Bus', 'phrase': 'I2C bus number'},
        {'id': 'poll_interval', 'type': 'float', 'default_value': 0.1, 'name': 'Poll Interval', 'phrase': 'Poll interval (s)'},
        {'id': 'channel', 'type': 'integer', 'default_value': 0, 'name': 'Channel', 'phrase': 'Channel to monitor'},
        {'id': 'activation_output', 'type': 'select_device', 'default_value': '', 'options_select': ['Output'], 'name': 'Activation Output', 'phrase': 'Output to activate'},
        {'id': 'activation_duration', 'type': 'float', 'default_value': 15.0, 'name': 'Activation Duration', 'phrase': 'Activation duration (s)'}
    ]
}

class EdgeDetectionFunction(AbstractFunction):
    def __init__(self, function, testing=False):
        super().__init__(function, testing=testing, name=__name__)
        self.control = DaemonControl()
        self.listener_running = True
        self.timer_loop = time.time()
        self.i2c_location = None
        self.i2c_bus = None
        self.poll_interval = None
        self.channel = None
        self.activation_output = None
        self.activation_duration = None
        self.previous_state = None
        # Retrieve custom options from the database
        custom_function = db_retrieve_table_daemon(CustomController, unique_id=self.unique_id)
        self.setup_custom_options(FUNCTION_INFORMATION['custom_options'], custom_function)
        self.bus = None
        if not testing:
            self.try_initialize()

    def initialize(self):
        try:
            self.bus = smbus2.SMBus(self.i2c_bus)
            self.logger.info("PCF8575 initialized successfully.")
            self.start_polling()
        except Exception as e:
            self.logger.exception("Initialization error: {}".format(e))

    def read_pcf8575(self):
        """ Reads the state of all pins (16 bits) """
        try:
            data = self.bus.read_word_data(int(self.i2c_location, 16), 0)
            data = ((data & 0xFF) << 8) | (data >> 8)  # Correct byte order
            return data
        except Exception as e:
            self.logger.exception(f"Error reading from PCF8575: {e}")
            return None

    def start_polling(self):
        """ Start polling the PCF8575 for pin state changes. """
        self.running = True
        self.poll_thread = threading.Thread(target=self.poll_pins)
        self.poll_thread.start()

    def poll_pins(self):
        """ Poll the PCF8575 pins and detect edges for the specified channel. """
        while self.running:
            current_states = self.read_pcf8575()
            if current_states is not None:
                current_state = (current_states >> self.channel) & 1

                if self.previous_state is None:
                    self.previous_state = current_state

                if current_state != self.previous_state:
                    self.logger.debug(f"State change detected on channel {self.channel}: {self.previous_state} -> {current_state}")
                    self.edge_detected(self.channel, current_state)
                    self.previous_state = current_state
            time.sleep(self.poll_interval)

    def edge_detected(self, pin, state):
        """
        Handle edge detection for a specific pin.
        :param pin: The pin number (0-15).
        :param state: The current state of the pin (1 or 0).
        """
        state_str = "Rising" if state else "Falling"
        self.logger.debug(f"Edge detected on pin {pin}: {state_str}")

        # Activate the output for a specified duration
        if self.activation_output:
            self.logger.debug(f"Activating output {self.activation_output} for {self.activation_duration} seconds...")
            self.control.output_on(self.activation_output, output_type='sec', amount=self.activation_duration)

        # Record edge detection in the database
        write_db = threading.Thread(
            target=write_influxdb_value,
            args=(self.unique_id, measurements_dict[pin]['unit'], float(state)),
            kwargs={'channel': pin,
                    'measure': measurements_dict[pin]['measurement'],
                    'timestamp': datetime.datetime.utcnow()})
        write_db.start()

    def stop_function(self):
        """Called when the function is deactivated."""
        self.running = False
        try:
            if self.poll_thread.is_alive():
                self.poll_thread.join()
            self.logger.debug("Stopping PCF8575 polling")
        except Exception as e:
            self.logger.exception("Exception during cleanup: {}".format(e))

    def function_status(self):
        return_dict = {
            'string_status': f"Current time: {datetime.datetime.now()} ({time.time()})"
                             f"\nI2C Location: {self.i2c_location}"
                             f"\nI2C Bus: {self.i2c_bus}"
                             f"\nChannel: {self.channel}"
                             f"\nPoll Interval: {self.poll_interval}",
            'error': []
        }
        return return_dict

How can I add the “Actions” drop down menu to it so I can use the internal actions?

Thank you!

Please read all of the Mycodo Documentation before posting questions like this.
If you read the Mycodo Documentation (or scroll through the Functions drop-down list) you will see there is already a Trigger: Edge Function, complete with the Actions drop-down menu…

There is also already an Output Module for the PCF8575 I/O Expander…

The Edge Trigger currently only works with the Edge Detection Input. You could easily add the functionality to your Input by incorporating a few lines to check for the relevant trigger when an edge detection occurs. See:

I will need to update how Edge Inputs are defined, since the Edge Trigger is currently hard-coded to only work with the Edge Input. See:

This kind of update would expand the Edge Trigger to be able to be used with any Input specifically designed to detect binary state changes.

If you would like to go ahead and update your Input with the above code, and are willing to test modifications to the Edge Trigger code, I’ll update it to be usable by other Inputs.

1 Like

@Lucid3y3
Hi, thanks for the reply and sorry for my English.
I would need use the PCF8575 as Input (not Output) and use the Trigger Edge function on the pins of the PCF8575. Currently the PCF8575 can be selected only as and Output and the Trigger Edge function works only for the Raspberry PI GPIO pins.

@KyleGabriel
Hi, thanks for the reply. I would be more than happy to test any code.
I’ve updated my input code with your snippet.
Here is the code:

import copy
import datetime
import threading
import time
import smbus2

from mycodo.databases.models import Trigger
from mycodo.inputs.base_input import AbstractInput
from mycodo.mycodo_client import DaemonControl
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import write_influxdb_value

# Measurements
measurements_dict = {}

for each_channel in range(16):
    measurements_dict[each_channel] = {
        'measurement': 'buttons_edge',
        'unit': 'bool'
    }

# Input information
INPUT_INFORMATION = {
    'input_name_unique': 'EDGE_PCF8575_ALL_Channels',
    'input_manufacturer': 'XXX',
    'input_name': 'PCF8575 Edge ALL Channels Detection',
    'input_library': 'smbus2',
    'measurements_name': 'Rising/Falling Edge',
    'measurements_dict': measurements_dict,

    'options_enabled': [
        'i2c_location',
        'i2c_bus',
        'pre_output',
        'poll_interval'
    ],
    'options_disabled': ['interface'],

    'dependencies_module': [
        ('pip-pypi', 'smbus2', 'smbus2==0.4.1')
    ],

    'interfaces': ['I2C'],
    'i2c_location': ['0x20', '0x21', '0x22', '0x23', '0x24', '0x25', '0x26', '0x27'],
    'i2c_address_editable': False,
    'i2c_address_default': '0x20',
    'custom_options': [
        {
            'id': 'poll_interval',
            'type': 'float',
            'default_value': 0.5,
            'name': 'Poll Interval',
            'phrase': 'The interval (in seconds) between checks of the PCF8575 pins for state changes.'
        }
    ]
}

class InputModule(AbstractInput):
    """A sensor support class that listens for rising or falling pin edge events on the PCF8575."""

    def __init__(self, input_dev, testing=False):
        super().__init__(input_dev, testing=testing, name=__name__)

        self.bus = None
        self.address = int(str(self.input_dev.i2c_location), 16)
        self.poll_interval = getattr(self.input_dev, 'poll_interval', 0.5)  # Default to 0.5 if not set
        self.previous_states = None  # Initial state is unknown
        self.control = None

        if not testing:
            self.setup_custom_options(
                INPUT_INFORMATION['custom_options'], input_dev)
            self.try_initialize()

    def initialize(self):
        try:
            self.bus = smbus2.SMBus(self.input_dev.i2c_bus)
            self.control = DaemonControl()

            self.logger.info("PCF8575 initialized without causing an exception!")
            self.start_polling()

        except Exception as e:
            self.logger.exception("Exception encountered while initializing PCF8575: {}".format(e))

    def read_pcf8575(self):
        """ Reads the state of all pins (16 bits) """
        try:
            data = self.bus.read_word_data(self.address, 0)
            # Correct byte order if necessary
            data = ((data & 0xFF) << 8) | (data >> 8)
            return data
        except Exception as e:
            self.logger.exception(f"Error reading from PCF8575: {e}")
            return None

    def start_polling(self):
        """ Start polling the PCF8575 for pin state changes. """
        self.running = True
        self.poll_thread = threading.Thread(target=self.poll_pins)
        self.poll_thread.start()

    def poll_pins(self):
        """ Poll the PCF8575 pins and detect edges. """
        while self.running:
            current_states = self.read_pcf8575()
            if current_states is not None:
                current_states = ~current_states & 0xFFFF  # Invert the pin states and mask to 16 bits
                if self.previous_states is None:
                    # Skip edge detection for the first cycle
                    self.previous_states = current_states
                else:
                    changed_pins = current_states ^ self.previous_states
                    if changed_pins:
                        for pin in range(16):
                            if changed_pins & (1 << pin):
                                # Extract the previous and current states for the pin
                                previous_state = (self.previous_states >> pin) & 1
                                current_state = (current_states >> pin) & 1

                                # Log the state change detected on this pin
                                self.logger.debug(
                                    f"State change detected on channel {pin}: {previous_state} -> {current_state}"
                                )

                                # Call edge_detected with the actual state (0 or 1)
                                self.edge_detected(pin, current_state)

                    # Update the previous states to the current ones
                    self.previous_states = current_states

            time.sleep(self.poll_interval)

    def edge_detected(self, pin, state):
        """
        Handle edge detection for a specific pin.
        :param pin: The pin number (0-15).
        :param state: The current state of the pin (1 or 0).
        """
        state_str = "Rising" if state else "Falling"
        rising_or_falling = 1.0 if state else -1.0  # Convert to float for InfluxDB

        self.logger.debug(f"Edge detected on pin {pin}: {state_str}")

        # Write the edge detection to the database
        write_db = threading.Thread(
            target=write_influxdb_value,
            args=(self.unique_id, measurements_dict[pin]['unit'], rising_or_falling,),
            kwargs={'channel': pin,
                    'measure': measurements_dict[pin]['measurement'],
                    'timestamp': datetime.datetime.utcnow()},
            daemon=True)   # Add daemon=True
        write_db.start()

        # Check for triggers that need to be activated on edge detection
        trigger = db_retrieve_table_daemon(Trigger)
        trigger = trigger.filter(Trigger.trigger_type == 'trigger_edge')
        trigger = trigger.filter(Trigger.measurement == self.unique_id)
        trigger = trigger.filter(Trigger.is_activated.is_(True))

        for each_trigger in trigger.all():
            if each_trigger.edge_detected in ['both', state_str.lower()]:
                now = time.time()
                timestamp = datetime.datetime.fromtimestamp(now).strftime('%Y-%m-%d %H-%M-%S')
                message = "{ts}\n[Trigger {cid} ({cname})] " \
                          "Input {oid} ({name}) {state} edge detected " \
                          "on pin {pin}".format(
                            ts=timestamp,
                            cid=each_trigger.id,
                            cname=each_trigger.name,
                            oid=self.unique_id,
                            name=self.input_dev.name,
                            state=state_str,
                            pin=pin)
                self.logger.debug("Edge: {}".format(message))

                # Trigger the associated actions
                self.control.trigger_all_actions(
                    each_trigger.unique_id, message=message)

    def stop_input(self):
        """Called when Input is deactivated."""
        self.running = False
        try:
            if self.poll_thread.is_alive():
                self.poll_thread.join()
            self.logger.debug("Stopping PCF8575 polling")
        except Exception as e:
            self.logger.exception("Exception during cleanup: {}".format(e))

Thank you

I just committed the changes necessary for any Input to be easily set as an Edge Input:

You simply need to add 'edge_input': True, to your Input’s INPUT_INFORMATION dictionary:

And your Input will populate the Measurement field of the Edge Trigger Function.

Thank you very very much for this!
Currently I’ve found 3 issues:

1st: after adding the Trigger:Edge function I need to refresh the /function webpage for the Measurement entries to be populated in the Trigger Configuration: Trigger: Edge webpage, under the Measurement drop-down (issue not important)

2nd: after adding (for example) 6 Trigger:Edge functions, each with it’s own measurement:
trigger:Edge0 >>> Measurement Input CH0
trigger:Edge1 >>> Measurement Input CH1
trigger:Edge2 >>> Measurement Input CH2
trigger:Edge3 >>> Measurement Input CH3
trigger:Edge4 >>> Measurement Input CH4
trigger:Edge5 >>> Measurement Input CH5
trigger:Edge6 >>> Measurement Input CH6

and changing the page to Setup > Input for example and going back to Setup > Function, all measurements for the already setup trigger:edge functions are shown as set with Input CH15 (instead of CH0, CH1, CH2, etc.)
(Activating or not the function, or saving the settings multiple times doesn’t help).

3rd: I’ve setup only 2 trigger:edge functions for testing:

Input CH0, if an edge is detected (rising or falling or both) - to this Input I have a switch connected, it should turn on a LED (which is setup on Output CH6)


Input CH1, if any edge is detected -same, another switch connected, it should turn off the same LED.

Problem is that whether I press the button connected to Input 45 CH0 or the one connected to Input 45 CH1, the LED turns on for like half a second and then it turns off. If only Trigger: Edge0 is activated and the button for Input 45 CH0 is pressed, the LED stays On as expected.

I’m wondering if there is a issue with the code I’m using for the PCF8575 input module or there is an issue somewhere else…

The code I’m using is this:

import copy
import datetime
import threading
import time
import smbus2

from mycodo.databases.models import Trigger
from mycodo.inputs.base_input import AbstractInput
from mycodo.mycodo_client import DaemonControl
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import write_influxdb_value

# Measurements
measurements_dict = {}

for each_channel in range(16):
   measurements_dict[each_channel] = {
       'measurement': 'buttons_edge',
       'unit': 'bool'
   }

# Input information
INPUT_INFORMATION = {
   'input_name_unique': 'EDGE_PCF8575_ALL_Channels',
   'input_manufacturer': 'XXX',
   'input_name': 'PCF8575 Edge ALL Channels Detection',
   'input_library': 'smbus2',
   'measurements_name': 'Rising/Falling Edge',
   'measurements_dict': measurements_dict,

   'edge_input': True,  # Treat as an Edge Detection Input
  
   'options_enabled': [
       'i2c_location',
       'i2c_bus',
       'pre_output',
       'poll_interval'
   ],
   'options_disabled': ['interface'],

   'dependencies_module': [
       ('pip-pypi', 'smbus2', 'smbus2==0.4.1')
   ],

   'interfaces': ['I2C'],
   'i2c_location': ['0x20', '0x21', '0x22', '0x23', '0x24', '0x25', '0x26', '0x27'],
   'i2c_address_editable': False,
   'i2c_address_default': '0x20',
   'custom_options': [
       {
           'id': 'poll_interval',
           'type': 'float',
           'default_value': 0.5,
           'name': 'Poll Interval',
           'phrase': 'The interval (in seconds) between checks of the PCF8575 pins for state changes.'
       }
   ]
}

class InputModule(AbstractInput):
   """A sensor support class that listens for rising or falling pin edge events on the PCF8575."""

   def __init__(self, input_dev, testing=False):
       super().__init__(input_dev, testing=testing, name=__name__)

       self.bus = None
       self.address = int(str(self.input_dev.i2c_location), 16)
       self.poll_interval = getattr(self.input_dev, 'poll_interval', 0.5)  # Default to 0.5 if not set
       self.previous_states = None  # Initial state is unknown
       self.control = None

       if not testing:
           self.setup_custom_options(
               INPUT_INFORMATION['custom_options'], input_dev)
           self.try_initialize()

   def initialize(self):
       try:
           self.bus = smbus2.SMBus(self.input_dev.i2c_bus)
           self.control = DaemonControl()

           self.logger.info("PCF8575 initialized without causing an exception!")
           self.start_polling()

       except Exception as e:
           self.logger.exception("Exception encountered while initializing PCF8575: {}".format(e))

   def read_pcf8575(self):
       """ Reads the state of all pins (16 bits) """
       try:
           data = self.bus.read_word_data(self.address, 0)
           # Correct byte order if necessary
           data = ((data & 0xFF) << 8) | (data >> 8)
           return data
       except Exception as e:
           self.logger.exception(f"Error reading from PCF8575: {e}")
           return None

   def start_polling(self):
       """ Start polling the PCF8575 for pin state changes. """
       self.running = True
       self.poll_thread = threading.Thread(target=self.poll_pins)
       self.poll_thread.start()

   def poll_pins(self):
       """ Poll the PCF8575 pins and detect edges. """
       while self.running:
           current_states = self.read_pcf8575()
           if current_states is not None:
               current_states = ~current_states & 0xFFFF  # Invert the pin states and mask to 16 bits
               if self.previous_states is None:
                   # Skip edge detection for the first cycle
                   self.previous_states = current_states
               else:
                   changed_pins = current_states ^ self.previous_states
                   if changed_pins:
                       for pin in range(16):
                           if changed_pins & (1 << pin):
                               # Extract the previous and current states for the pin
                               previous_state = (self.previous_states >> pin) & 1
                               current_state = (current_states >> pin) & 1

                               # Log the state change detected on this pin
                               self.logger.debug(
                                   f"State change detected on channel {pin}: {previous_state} -> {current_state}"
                               )

                               # Call edge_detected with the actual state (0 or 1)
                               self.edge_detected(pin, current_state)

                   # Update the previous states to the current ones
                   self.previous_states = current_states

           time.sleep(self.poll_interval)

   def edge_detected(self, pin, state):
       """
       Handle edge detection for a specific pin.
       :param pin: The pin number (0-15).
       :param state: The current state of the pin (1 or 0).
       """
       state_str = "Rising" if state else "Falling"
       rising_or_falling = 1.0 if state else -1.0  # Convert to float for InfluxDB

       self.logger.debug(f"Edge detected on pin {pin}: {state_str}")

       # Write the edge detection to the database
       write_db = threading.Thread(
           target=write_influxdb_value,
           args=(self.unique_id, measurements_dict[pin]['unit'], rising_or_falling,),
           kwargs={'channel': pin,
                   'measure': measurements_dict[pin]['measurement'],
                   'timestamp': datetime.datetime.utcnow()},
           daemon=True)   # Add daemon=True
       write_db.start()

       # Check for triggers that need to be activated on edge detection
       trigger = db_retrieve_table_daemon(Trigger)
       trigger = trigger.filter(Trigger.trigger_type == 'trigger_edge')
       trigger = trigger.filter(Trigger.measurement == self.unique_id)
       trigger = trigger.filter(Trigger.is_activated.is_(True))

       for each_trigger in trigger.all():
           if each_trigger.edge_detected in ['both', state_str.lower()]:
               now = time.time()
               timestamp = datetime.datetime.fromtimestamp(now).strftime('%Y-%m-%d %H-%M-%S')
               message = "{ts}\n[Trigger {cid} ({cname})] " \
                         "Input {oid} ({name}) {state} edge detected " \
                         "on pin {pin}".format(
                           ts=timestamp,
                           cid=each_trigger.id,
                           cname=each_trigger.name,
                           oid=self.unique_id,
                           name=self.input_dev.name,
                           state=state_str,
                           pin=pin)
               self.logger.debug("Edge: {}".format(message))

               # Trigger the associated actions
               self.control.trigger_all_actions(
                   each_trigger.unique_id, message=message)

   def stop_input(self):
       """Called when Input is deactivated."""
       self.running = False
       try:
           if self.poll_thread.is_alive():
               self.poll_thread.join()
           self.logger.debug("Stopping PCF8575 polling")
       except Exception as e:
           self.logger.exception("Exception during cleanup: {}".format(e))

Thanks again for helping me with this.

First, you have the Edge option set to Both. You need to select either a Rising or Falling edge for each. Second, you need to use a switch, not a pushbutton. When the switch is flipped on, there will be a rising edge, and when the switch is flipped off, there will be a falling edge. What you’re attempting to do can’t be done using a pushbutton and simple edge detection triggers, you will need to write your own Custom Input or Function, or Conditional Function and spawn a thread to monitor for button presses and store the current state in a toggle variable in order to know which of the two actions you want to execute when the button is pressed.

I just thought of one approach that is complex but simpler is a lot of ways:

  1. Create one Edge Trigger with Edge set to Both. It should have one Action, Activate Controller.
  2. Create two Execute Actions Functions.
  3. Set the Actions of one Execute Actions Function to turn on the LED, and the other to turn off the LED.
  4. Create a Conditional Function and set Start Offset to 0 and Period very high (999).
  5. For the Edge Trigger, set the Activate Controller Action to the Conditional Function.
  6. Add 3 Actions to the Conditional Function: One to turn your LED on, one to turn your LED off, and a Deactivate Controller Action that will deactivate the Conditional Function.
  7. In the Conditional Function’s Run Python Code, try this code (changing the Action IDs to your own):
if self.get_custom_option("toggle_state"):
    self.run_action("asdf1234")  # Turn off LED
    self.set_custom_option("toggle_state", False)
else:
    self.run_action("qwer5678")  # Turn on LED
    self.set_custom_option("toggle_state", True)

self.run_action("zxcv0987")  # Deactivate this Conditional Function

The sole purpose of the Condtional Function is to store a toggle state in the database and toggle between executing two Actions when it’s activated, then it will deactivate itself.

I don’t understand what you’re describing. Please provide images and steps to reproduce the issue.

Hi, thanks for the suggestions.
I think I’ve found the issue, it was from my code, specifically starting from here:

# Measurements
measurements_dict = {}

for each_channel in range(16):
    measurements_dict[each_channel] = {
        'measurement': 'buttons_edge',
        'unit': 'bool'
    }

What I did was to create an Input module for each channel, each with it’s own measurement.
That way the 2nd and 3rd issue was solved.

This is the code I use, it needs to be modified for each channel in use, comment added:

import copy
import datetime
import threading
import time
import smbus2

from mycodo.databases.models import Trigger
from mycodo.inputs.base_input import AbstractInput
from mycodo.mycodo_client import DaemonControl
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import write_influxdb_value

# Measurements
measurements_dict = {
    0: {                                                       ###### "0" > NEEDS TO BE CHANGED FOR THE WISHED CHANNEL ######
        'measurement': 'buttons_edge0',                        ###### "0" > NEEDS TO BE CHANGED FOR THE WISHED CHANNEL ######
        'unit': 'bool'
    }
}

# Input information
INPUT_INFORMATION = {
    'input_name_unique': 'EDGE_PCF8575_CH_0',                   ###### "0" > NEEDS TO BE CHANGED FOR THE WISHED CHANNEL ######
    'input_manufacturer': 'XXX',
    'input_name': 'PCF8575  CH_0 Edge Detection',               ###### "0" > NEEDS TO BE CHANGED FOR THE WISHED CHANNEL ######
    'input_library': 'smbus2',
    'measurements_name': 'Rising/Falling Edge',
    'measurements_dict': measurements_dict,
   
    'edge_input': True,  # Treat as an Edge Detection Input
   
    'options_enabled': [
        'i2c_location',
        'i2c_bus',
        'pre_output',
        'poll_interval',
        'channel'
    ],
    'options_disabled': ['interface'],
   
    'dependencies_module': [
        ('pip-pypi', 'smbus2', 'smbus2==0.4.1')
    ],
   
    'interfaces': ['I2C'],
    'i2c_location': ['0x20', '0x21', '0x22', '0x23', '0x24', '0x25', '0x26', '0x27'],
    'i2c_address_editable': False,
    'i2c_address_default': '0x20',
    'custom_options': [
        {
            'id': 'poll_interval',
            'type': 'float',
            'default_value': 0.5,
            'name': 'Poll Interval',
            'phrase': 'The interval (in seconds) between checks of the PCF8575 pins for state changes.'
        },
        {
            'id': 'channel',
            'type': 'float',
            'default_value': 0,            ###### "0" > NEEDS TO BE CHANGED FOR THE WISHED CHANNEL ###### 
            'name': 'Channel',
            'phrase': 'The channel (0-15) to monitor for state changes.'
        }
    ]
}

class InputModule(AbstractInput):
    """A sensor support class that listens for rising or falling pin edge events on the PCF8575."""

    def __init__(self, input_dev, testing=False):
        super().__init__(input_dev, testing=testing, name=__name__)

        # Setup custom options
        self.setup_custom_options(
            INPUT_INFORMATION['custom_options'], input_dev)
        
        # Debug: Check the attributes of input_dev
        self.logger.debug(f"InputDev attributes: {vars(self.input_dev)}")    
        
        self.bus = None
        self.address = int(str(self.input_dev.i2c_location), 16)
        self.poll_interval = getattr(self.input_dev, 'poll_interval', 0.1)  # Default to 0.1 if not set
        self.channel = int(getattr(self.input_dev, 'channel', 0))  # Convert float to int                 ###### "0" > NEEDS TO BE CHANGED FOR THE WISHED CHANNEL ######  
        self.previous_state = None  # Initial state is unknown
        self.control = None

        self.logger.debug(f"Configured channel: {self.channel}")
      
        # Initialize a threading lock for thread safety
        self.lock = threading.Lock()  # Lock object for shared variables

        if not testing:
            self.try_initialize()
    
    def initialize(self):
        try:
            self.bus = smbus2.SMBus(self.input_dev.i2c_bus)
            self.control = DaemonControl()
    
            self.logger.info("PCF8575 initialized without causing an exception! :)")
            self.start_polling()
    
        except Exception as e:
            self.logger.exception("Exception encountered while initializing PCF8575: {}".format(e))
    
    def read_pcf8575(self):
        """ Reads the state of all pins (16 bits) """
        try:
            data = self.bus.read_word_data(self.address, 0)
            # Correct byte order if necessary
            data = ((data & 0xFF) << 8) | (data >> 8)
            return data
        except Exception as e:
            self.logger.exception(f"Error reading from PCF8575: {e}")
            return None
    
    def start_polling(self):
        """ Start polling the PCF8575 for pin state changes. """
        self.running = True
        self.poll_thread = threading.Thread(target=self.poll_pins)
        self.poll_thread.start()
    
    def poll_pins(self):
        """ Poll the PCF8575 pins and detect edges for the specified channel, with thread safety. """
        while True:
            with self.lock:  # Acquire lock to ensure safe access to shared variables
                if not self.running:  # Check running status inside the lock
                    break

            current_states = self.read_pcf8575()
            if current_states is not None:
                current_state = 1 - ((current_states >> self.channel) & 1)  # Invert the current state

                with self.lock:  # Acquire lock before checking/modifying shared variables
                    if self.previous_state is None:
                        self.previous_state = current_state

                    if current_state != self.previous_state:
                        self.logger.debug(f"State change detected on channel {self.channel}: {self.previous_state} -> {current_state}")
                        self.edge_detected(self.channel, current_state)
                        self.previous_state = current_state

            time.sleep(self.poll_interval)
    
    def edge_detected(self, pin, state):
        """
        Handle edge detection for a specific pin.
        :param pin: The pin number (0-15).
        :param state: The current state of the pin (1 or 0).
        """
        state_str = "Rising" if state else "Falling"
        rising_or_falling = 1.0 if state else -1.0  # Convert to float for InfluxDB
    
        self.logger.debug(f"Edge detected on pin {pin}: {state_str}")
    
        # Write the edge detection to the database
        write_db = threading.Thread(
            target=write_influxdb_value,
            args=(self.unique_id, measurements_dict[pin]['unit'], rising_or_falling,),
            kwargs={'channel': pin,
                    'measure': measurements_dict[pin]['measurement'],
                    'timestamp': datetime.datetime.utcnow()},
            daemon=True)   # Add daemon=True
        write_db.start()
    
        # Check for triggers that need to be activated on edge detection
        trigger = db_retrieve_table_daemon(Trigger)
        trigger = trigger.filter(Trigger.trigger_type == 'trigger_edge')
        trigger = trigger.filter(Trigger.measurement == self.unique_id)
        trigger = trigger.filter(Trigger.is_activated.is_(True))
    
        for each_trigger in trigger.all():
            if each_trigger.edge_detected in ['both', state_str.lower()]:
                now = time.time()
                timestamp = datetime.datetime.fromtimestamp(now).strftime('%Y-%m-%d %H-%M-%S')
                message = "{ts}\n[Trigger {cid} ({cname})] " \
                          "Input {oid} ({name}) {state} edge detected " \
                          "on pin {pin}".format(
                            ts=timestamp,
                            cid=each_trigger.id,
                            cname=each_trigger.name,
                            oid=self.unique_id,
                            name=self.input_dev.name,
                            state=state_str,
                            pin=pin)
                self.logger.debug("Edge: {}".format(message))
    
                # Trigger the associated actions
                self.control.trigger_all_actions(
                    each_trigger.unique_id, message=message)
    
    def stop_input(self):
        """Called when Input is deactivated."""
        with self.lock:  # Acquire lock to safely stop the polling
            self.running = False

        try:
            if self.poll_thread.is_alive():
                self.poll_thread.join(timeout=5)  # Wait up to 5 seconds for the thread to stop
            self.logger.debug("Stopping PCF8575 polling")
        except Exception as e:
            self.logger.exception("Exception during cleanup: {}".format(e))

Using I2C with the PCF8575 as input also solved the false trigger issue I had previously using buttons connected to the RPi GPIO pins… Works also with pushbuttons…
Will have to build a pcb for the 2 PCF8575 I plan to use for those 22 pushbuttons and test this further but from the testing I did seems to be working fine…
Thanks again for all the help and the mycodo software.

I wouldn’t recommend creating a different Input for each channel. There’s no benefit from doing so and it will use a much larger amount of resources to run.

Hi, unfortunately I am unable to make a single custom module which works for all channels.
With all my failed tries I get the same behavior previously posted (issue 2 and issue 3).
Starting with this code:

import copy
import datetime
import threading
import time
import smbus2

from mycodo.databases.models import Trigger
from mycodo.inputs.base_input import AbstractInput
from mycodo.mycodo_client import DaemonControl
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import write_influxdb_value

# Measurements
measurements_dict = {}

for each_channel in range(16):
   measurements_dict[each_channel] = {
       'measurement': 'buttons_edge',
       'unit': 'bool'
   }

# Input information
INPUT_INFORMATION = {
   'input_name_unique': 'EDGE_PCF8575_ALL_Channels',
   'input_manufacturer': 'XXX',
   'input_name': 'PCF8575 Edge ALL Channels Detection',
   'input_library': 'smbus2',
   'measurements_name': 'Rising/Falling Edge',
   'measurements_dict': measurements_dict,

   'edge_input': True,  # Treat as an Edge Detection Input
  
   'options_enabled': [
       'i2c_location',
       'i2c_bus',
       'pre_output',
       'poll_interval'
   ],
   'options_disabled': ['interface'],

   'dependencies_module': [
       ('pip-pypi', 'smbus2', 'smbus2==0.4.1')
   ],

   'interfaces': ['I2C'],
   'i2c_location': ['0x20', '0x21', '0x22', '0x23', '0x24', '0x25', '0x26', '0x27'],
   'i2c_address_editable': False,
   'i2c_address_default': '0x20',
   'custom_options': [
       {
           'id': 'poll_interval',
           'type': 'float',
           'default_value': 0.5,
           'name': 'Poll Interval',
           'phrase': 'The interval (in seconds) between checks of the PCF8575 pins for state changes.'
       }
   ]
}

class InputModule(AbstractInput):
   """A sensor support class that listens for rising or falling pin edge events on the PCF8575."""

   def __init__(self, input_dev, testing=False):
       super().__init__(input_dev, testing=testing, name=__name__)

       self.bus = None
       self.address = int(str(self.input_dev.i2c_location), 16)
       self.poll_interval = getattr(self.input_dev, 'poll_interval', 0.5)  # Default to 0.5 if not set
       self.previous_states = None  # Initial state is unknown
       self.control = None

       if not testing:
           self.setup_custom_options(
               INPUT_INFORMATION['custom_options'], input_dev)
           self.try_initialize()

   def initialize(self):
       try:
           self.bus = smbus2.SMBus(self.input_dev.i2c_bus)
           self.control = DaemonControl()

           self.logger.info("PCF8575 initialized without causing an exception!")
           self.start_polling()

       except Exception as e:
           self.logger.exception("Exception encountered while initializing PCF8575: {}".format(e))

   def read_pcf8575(self):
       """ Reads the state of all pins (16 bits) """
       try:
           data = self.bus.read_word_data(self.address, 0)
           # Correct byte order if necessary
           data = ((data & 0xFF) << 8) | (data >> 8)
           return data
       except Exception as e:
           self.logger.exception(f"Error reading from PCF8575: {e}")
           return None

   def start_polling(self):
       """ Start polling the PCF8575 for pin state changes. """
       self.running = True
       self.poll_thread = threading.Thread(target=self.poll_pins)
       self.poll_thread.start()

   def poll_pins(self):
       """ Poll the PCF8575 pins and detect edges. """
       while self.running:
           current_states = self.read_pcf8575()
           if current_states is not None:
               current_states = ~current_states & 0xFFFF  # Invert the pin states and mask to 16 bits
               if self.previous_states is None:
                   # Skip edge detection for the first cycle
                   self.previous_states = current_states
               else:
                   changed_pins = current_states ^ self.previous_states
                   if changed_pins:
                       for pin in range(16):
                           if changed_pins & (1 << pin):
                               # Extract the previous and current states for the pin
                               previous_state = (self.previous_states >> pin) & 1
                               current_state = (current_states >> pin) & 1

                               # Log the state change detected on this pin
                               self.logger.debug(
                                   f"State change detected on channel {pin}: {previous_state} -> {current_state}"
                               )

                               # Call edge_detected with the actual state (0 or 1)
                               self.edge_detected(pin, current_state)

                   # Update the previous states to the current ones
                   self.previous_states = current_states

           time.sleep(self.poll_interval)

   def edge_detected(self, pin, state):
       """
       Handle edge detection for a specific pin.
       :param pin: The pin number (0-15).
       :param state: The current state of the pin (1 or 0).
       """
       state_str = "Rising" if state else "Falling"
       rising_or_falling = 1.0 if state else -1.0  # Convert to float for InfluxDB

       self.logger.debug(f"Edge detected on pin {pin}: {state_str}")

       # Write the edge detection to the database
       write_db = threading.Thread(
           target=write_influxdb_value,
           args=(self.unique_id, measurements_dict[pin]['unit'], rising_or_falling,),
           kwargs={'channel': pin,
                   'measure': measurements_dict[pin]['measurement'],
                   'timestamp': datetime.datetime.utcnow()},
           daemon=True)   # Add daemon=True
       write_db.start()

       # Check for triggers that need to be activated on edge detection
       trigger = db_retrieve_table_daemon(Trigger)
       trigger = trigger.filter(Trigger.trigger_type == 'trigger_edge')
       trigger = trigger.filter(Trigger.measurement == self.unique_id)
       trigger = trigger.filter(Trigger.is_activated.is_(True))

       for each_trigger in trigger.all():
           if each_trigger.edge_detected in ['both', state_str.lower()]:
               now = time.time()
               timestamp = datetime.datetime.fromtimestamp(now).strftime('%Y-%m-%d %H-%M-%S')
               message = "{ts}\n[Trigger {cid} ({cname})] " \
                         "Input {oid} ({name}) {state} edge detected " \
                         "on pin {pin}".format(
                           ts=timestamp,
                           cid=each_trigger.id,
                           cname=each_trigger.name,
                           oid=self.unique_id,
                           name=self.input_dev.name,
                           state=state_str,
                           pin=pin)
               self.logger.debug("Edge: {}".format(message))

               # Trigger the associated actions
               self.control.trigger_all_actions(
                   each_trigger.unique_id, message=message)

   def stop_input(self):
       """Called when Input is deactivated."""
       self.running = False
       try:
           if self.poll_thread.is_alive():
               self.poll_thread.join()
           self.logger.debug("Stopping PCF8575 polling")
       except Exception as e:
           self.logger.exception("Exception during cleanup: {}".format(e))

and mycodo trigger edge configuration for 2 channels, CH0 and CH1:


and

when I refresh the page, it changes the previously set channels to CH15:


If I don’t refresh the page after setting the measurements, pressing the push buttons I get in the logs:

2024-09-27 15:32:02,364 - DEBUG - mycodo.inputs.edge_pcf8575_all_channels_6819887b - State change detected on channel 8: 1 → 0
2024-09-27 15:32:02,364 - DEBUG - mycodo.inputs.edge_pcf8575_all_channels_6819887b - Edge detected on pin 8: Falling
2024-09-27 15:32:02,919 - DEBUG - mycodo.inputs.edge_pcf8575_all_channels_6819887b - State change detected on channel 9: 0 → 1
2024-09-27 15:32:02,920 - DEBUG - mycodo.inputs.edge_pcf8575_all_channels_6819887b - Edge detected on pin 9: Rising
2024-09-27 15:32:02,954 - DEBUG - mycodo.inputs.edge_pcf8575_all_channels_6819887b - Edge: 2024-09-27 15-32-02
[Trigger 45 (Trigger: Edge0)] Input 6819887b-9a80-4c85-a033-0394f5fd6f68 (PCF8575 Edge PLM ALL Channels Detection) Rising edge detected on pin 9
2024-09-27 15:32:03,091 - DEBUG - mycodo.action.output_on_off_4cec5a51 - Message: 2024-09-27 15-32-02
[Trigger 45 (Trigger: Edge0)] Input 6819887b-9a80-4c85-a033-0394f5fd6f68 (PCF8575 Edge PLM ALL Channels Detection) Rising edge detected on pin 9 Turn output 91c1969b-c684-466b-952b-c21db4eef495 CH6 (LEDs_butoane_sus) on.
2024-09-27 15:32:03,093 - DEBUG - mycodo.inputs.edge_pcf8575_all_channels_6819887b - Edge: 2024-09-27 15-32-03
[Trigger 46 (Trigger: Edge1)] Input 6819887b-9a80-4c85-a033-0394f5fd6f68 (PCF8575 Edge PLM ALL Channels Detection) Rising edge detected on pin 9
2024-09-27 15:32:03,243 - DEBUG - mycodo.action.output_on_off_63dd7ae1 - Message: 2024-09-27 15-32-03
[Trigger 46 (Trigger: Edge1)] Input 6819887b-9a80-4c85-a033-0394f5fd6f68 (PCF8575 Edge PLM ALL Channels Detection) Rising edge detected on pin 9 Turn output 91c1969b-c684-466b-952b-c21db4eef495 CH6 (LEDs_butoane_sus) off.
2024-09-27 15:32:03,746 - DEBUG - mycodo.inputs.edge_pcf8575_all_channels_6819887b - State change detected on channel 9: 1 → 0
2024-09-27 15:32:03,746 - DEBUG - mycodo.inputs.edge_pcf8575_all_channels_6819887b - Edge detected on pin 9: Falling

Both (Trigger: Edge0) and (Trigger: Edge1) are shown as being setup on pin 9, which is incorrect
Thank you.

I’m not sure if this is the issue, but you’re not using a valid measurement. You changed the ‘edge’ measurement from the example Edge Input to ‘buttons_edge’, which is not found in mycodo/config_devices_units.py

I also created custom measurements for each channel and also tried with this as measurements_dict:

Measurements for all 16 channels

measurements_dict = {
0: {‘measurement’: ‘buttons_edge0’, ‘unit’: ‘bool’},
1: {‘measurement’: ‘buttons_edge1’, ‘unit’: ‘bool’},
2: {‘measurement’: ‘buttons_edge2’, ‘unit’: ‘bool’},
3: {‘measurement’: ‘buttons_edge3’, ‘unit’: ‘bool’},
4: {‘measurement’: ‘buttons_edge4’, ‘unit’: ‘bool’},
5: {‘measurement’: ‘buttons_edge5’, ‘unit’: ‘bool’},
6: {‘measurement’: ‘buttons_edge6’, ‘unit’: ‘bool’},
7: {‘measurement’: ‘buttons_edge7’, ‘unit’: ‘bool’},
8: {‘measurement’: ‘buttons_edge8’, ‘unit’: ‘bool’},
9: {‘measurement’: ‘buttons_edge9’, ‘unit’: ‘bool’},
10: {‘measurement’: ‘buttons_edge10’, ‘unit’: ‘bool’},
11: {‘measurement’: ‘buttons_edge11’, ‘unit’: ‘bool’},
12: {‘measurement’: ‘buttons_edge12’, ‘unit’: ‘bool’},
13: {‘measurement’: ‘buttons_edge13’, ‘unit’: ‘bool’},
14: {‘measurement’: ‘buttons_edge14’, ‘unit’: ‘bool’},
15: {‘measurement’: ‘buttons_edge15’, ‘unit’: ‘bool’}
}

but doesn’t work… only works with only one custom input per channel…