Edge detection with I2C buttons using PCF8575

Hi.

I have 22 buttons attached to the GPIO pins on a raspberry pi. Using as input “edge detection” and function “trigger edge” I can successfully turn on and off LEDs and relays which are connected via I2C. The problem I have is that I get false triggers when quickly inserting and removing a empty extension power cord from the same socket which powers the raspberry pi.
Things I tried and didn’t helped: changing from pull down to a pull up configuration, reducing resistor value from 10k to 1k, installed EMI filter, installed isolation transformer, using an UPS, using shorter wires, bounce time 3000 ms in mycodo, detecting rising edge, falling edge, both.
I now think I can’t get rid of these false triggers and hope to solve this by using buttons connected somehow to I2C via some sort of IC. (since I2C doesn’t seem to be influenced by this issue)
Could someone please suggest which integrated circuit I could use to connect those 22 buttons via I2C to mycodo? Or is it possible another way to get rid of those false triggers? Maybe by modifying the code to ignore those trigger edges shorter than a specified amount of time?
I’ve attached a picture showing my setup.
Please let me know if you need more information.

Thank you very much.

Have you connected an oscilloscope to understand what’s happening?

Thanks for replying. Unfortunately I don’t have an oscilloscope…

Hi,
Is there any way to solve this issue without an oscilloscope?
Can this be solved in software by waiting for 5mSec, and then reading the input again?
Thank you

You can copy the input, edit it to suit you needs, then import it on the Input Import page.

Hi,
I’ve tried to create an input module to read the input states (on or off) from the PCF8575 with the hope to connect to it buttons, which would solve the false detection issue… .
The code I wrote is here:
input_PCF8575.py (7.0 KB)

Unfortunately it doesn’t work and I need help to get it running…

mycodo.log shows:
2024-08-10 22:19:02,719 - DEBUG - mycodo.inputs.pcf8575_input - Initialized InputModule with input device <Input(id=48)>
2024-08-10 22:19:02,724 - DEBUG - mycodo.controllers.controller_input_5443884c - get_measurement() found
2024-08-10 22:19:02,725 - DEBUG - mycodo - Input controller with ID 5443884c-1fe8-4379-b818-8bfac05de5e1 activated.
2024-08-10 22:19:02,726 - DEBUG - mycodo.controllers.controller_input_5443884c - listener() found
2024-08-10 22:19:02,728 - DEBUG - mycodo.controllers.controller_input_5443884c - Starting listener() thread.
2024-08-10 22:19:02,732 - INFO - mycodo.controllers.controller_input_5443884c - Activated in 1129.5 ms
2024-08-10 22:19:02,734 - DEBUG - mycodo.inputs.pcf8575_input - Current measurement state: {‘button_0_state’: None, ‘button_1_state’: None, ‘button_2_state’: None, ‘button_3_state’: None, ‘button_4_state’: None, ‘button_5_state’: None, ‘button_6_state’: None, ‘button_7_state’: None, ‘button_8_state’: None, ‘button_9_state’: None, ‘button_10_state’: None, ‘button_11_state’: None, ‘button_12_state’: None, ‘button_13_state’: None, ‘button_14_state’: None, ‘button_15_state’: None}
2024-08-10 22:19:02,837 - DEBUG - mycodo.controllers.controller_input_5443884c - Adding measurements to InfluxDB with ID 5443884c-1fe8-4379-b818-8bfac05de5e1: {}

and the live measurement page shows:
On/Off: PCF8575 inp 16-Channel I/O Expander (Input 5443884c)

Input (PCF8575_input), 15.0 second interval
Measurement | Timestamp
0 bool (button_0_state) CH0 | No Data Last 30 sec
0 bool (button_1_state) CH1 | No Data Last 30 sec
0 bool (button_2_state) CH2 | No Data Last 30 sec
0 bool (button_3_state) CH3 | No Data Last 30 sec
0 bool (button_4_state) CH4 | No Data Last 30 sec
0 bool (button_5_state) CH5 | No Data Last 30 sec
0 bool (button_6_state) CH6 | No Data Last 30 sec
0 bool (button_7_state) CH7 | No Data Last 30 sec
0 bool (button_8_state) CH8 | No Data Last 30 sec
0 bool (button_9_state) CH9 | No Data Last 30 sec
0 bool (button_10_state) CH10 | No Data Last 30 sec
0 bool (button_11_state) CH11 | No Data Last 30 sec
0 bool (button_12_state) CH12 | No Data Last 30 sec
0 bool (button_13_state) CH13 | No Data Last 30 sec
0 bool (button_14_state) CH14 | No Data Last 30 sec
0 bool (button_15_state) CH15 | No Data Last 30 sec

The expected output should be half of them with a value of 1…

The PCF8575 is correctly connected to the PI and also works as expected, with the following code:
$ cat pcf8575


import smbus2
import logging

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# PCF8575 default I2C address
PCF8575_ADDRESS = 0x20

# Initialize I2C bus
bus = smbus2.SMBus(1)  # 1 indicates /dev/i2c-1

try:
   # Direct I2C read test
   logger.debug("Performing direct I2C read test")
   test_data = bus.read_word_data(PCF8575_ADDRESS, 0)
   test_value = ((test_data << 8) & 0xFF00) | (test_data >> 8)
   logger.debug(f"Test Read Data: 0x{test_data:04X}, Processed Value: 0x{test_value:04X}")
except Exception as e:
   logger.error(f"Direct I2C read test failed: {e}")

I get:
python pcf8575
DEBUG:main:Performing direct I2C read test
DEBUG:main:Test Read Data: 0x0FF0, Processed Value: 0xF00F

and with the following code: cat channel


import smbus2
import logging

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# PCF8575 default I2C address
PCF8575_ADDRESS = 0x20

# Initialize I2C bus
bus = smbus2.SMBus(1)  # 1 indicates /dev/i2c-1

try:
    # Direct I2C read test
    logger.debug("Performing direct I2C read test")

    # Read a 16-bit word from the PCF8575
    test_data = bus.read_word_data(PCF8575_ADDRESS, 0)

    # The data returned is in little-endian format, so swap bytes
    test_value = ((test_data << 8) & 0xFF00) | (test_data >> 8)
    logger.debug(f"Test Read Data: 0x{test_data:04X}, Processed Value: 0x{test_value:04X}")

    # Iterate over each of the 16 bits to determine the state of each channel
    for channel in range(16):
        # Extract the state of the current channel
        channel_state = (test_value >> channel) & 1
        logger.info(f"Channel {channel + 1}: {'ON' if channel_state else 'OFF'}")

except Exception as e:
    logger.error(f"Direct I2C read test failed: {e}")

I get the expected output:
~ $ python channel
DEBUG:main:Performing direct I2C read test
DEBUG:main:Test Read Data: 0x0FF0, Processed Value: 0xF00F
INFO:main:Channel 1: ON
INFO:main:Channel 2: ON
INFO:main:Channel 3: ON
INFO:main:Channel 4: ON
INFO:main:Channel 5: OFF
INFO:main:Channel 6: OFF
INFO:main:Channel 7: OFF
INFO:main:Channel 8: OFF
INFO:main:Channel 9: OFF
INFO:main:Channel 10: OFF
INFO:main:Channel 11: OFF
INFO:main:Channel 12: OFF
INFO:main:Channel 13: ON
INFO:main:Channel 14: ON
INFO:main:Channel 15: ON
INFO:main:Channel 16: ON

Any advise/help is much appreciated. Thank you

Hello everybody!
I’m trying to use the PCF8575 as input for buttons. The following code used as custom input works ok, it detects the changes on all channels.

import copy
import datetime
import threading
import time
import smbus2

from mycodo.databases.models import Trigger
from mycodo.inputs.base_input import AbstractInput
from mycodo.mycodo_client import DaemonControl
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import write_influxdb_value

# Measurements
measurements_dict = {}

for each_channel in range(16):
   measurements_dict[each_channel] = {
       'measurement': 'buttons_edge',
       'unit': 'bool'
   }

# Input information
INPUT_INFORMATION = {
   'input_name_unique': 'EDGE_PCF8575_toate_canalele',
   'input_manufacturer': 'XXX',
   'input_name': 'PCF8575 Edge ALL Channels Detection',
   'input_library': 'smbus2',
   'measurements_name': 'Rising/Falling Edge',
   'measurements_dict': measurements_dict,

   'options_enabled': [
       'i2c_location',
       'i2c_bus',
       'pre_output',
       'poll_interval'
   ],
   'options_disabled': ['interface'],

   'dependencies_module': [
       ('pip-pypi', 'smbus2', 'smbus2==0.4.1')
   ],

   'interfaces': ['I2C'],
   'i2c_location': ['0x20', '0x21', '0x22', '0x23', '0x24', '0x25', '0x26', '0x27'],
   'i2c_address_editable': False,
   'i2c_address_default': '0x20', 
   'custom_options': [
       {
           'id': 'poll_interval',
           'type': 'float',
           'default_value': 0.1,
           'name': 'Poll Interval',
           'phrase': 'The interval (in seconds) between checks of the PCF8575 pins for state changes.'
       }
   ]
}

class InputModule(AbstractInput):
   """A sensor support class that listens for rising or falling pin edge events on the PCF8575."""

   def __init__(self, input_dev, testing=False):
       super().__init__(input_dev, testing=testing, name=__name__)

       self.bus = None
       self.address = int(str(self.input_dev.i2c_location), 16)
       self.poll_interval = getattr(self.input_dev, 'poll_interval', 0.1)  # Default to 0.1 if not set
       self.previous_states = None  # Initial state is unknown
       #self.previous_states = 0xFFFF  # Initial state (all pins high) ## asta am scos'o am inlocuit'o cu ce'i deasupra...
       self.control = None

       if not testing:
           self.setup_custom_options(
               INPUT_INFORMATION['custom_options'], input_dev)
           self.try_initialize()

   def initialize(self):
       try:
           self.bus = smbus2.SMBus(self.input_dev.i2c_bus)
           self.control = DaemonControl()

           self.logger.info("PCF8575 initialized without causing an exception!...")
           self.start_polling()

       except Exception as e:
           self.logger.exception("Exception encountered while initializing PCF8575: {}".format(e))

   def read_pcf8575(self):
       """ Reads the state of all pins (16 bits) """
       try:
           data = self.bus.read_word_data(self.address, 0)
           # Correct byte order if necessary
           data = ((data & 0xFF) << 8) | (data >> 8)
           return data
       except Exception as e:
           self.logger.exception(f"Error reading from PCF8575: {e}")
           return None

   def start_polling(self):
       """ Start polling the PCF8575 for pin state changes. """
       self.running = True
       self.poll_thread = threading.Thread(target=self.poll_pins)
       self.poll_thread.start()

   def poll_pins(self):
       """ Poll the PCF8575 pins and detect edges. """
       while self.running:
           current_states = self.read_pcf8575()
           if current_states is not None:
               current_states = ~current_states & 0xFFFF  # Invert the pin states and mask to 16 bits
               if self.previous_states is None:
                   # Skip edge detection for the first cycle
                   self.previous_states = current_states      
               else:
                   changed_pins = current_states ^ self.previous_states
                   if changed_pins:
                       for pin in range(16):
                           if changed_pins & (1 << pin):
                               # Extract the previous and current states for the pin
                               previous_state = (self.previous_states >> pin) & 1
                               current_state = (current_states >> pin) & 1
                               
                               # Log the state change detected on this pin
                               self.logger.debug(
                                   f"State change detected on channel {pin}: {previous_state} -> {current_state}"
                               )
                               
                               # Call edge_detected with the actual state (0 or 1)
                               self.edge_detected(pin, current_state)
                   
                   # Update the previous states to the current ones
                   self.previous_states = current_states
                   
           time.sleep(self.poll_interval)

   def edge_detected(self, pin, state):
       """
       Handle edge detection for a specific pin.
       :param pin: The pin number (0-15).
       :param state: The current state of the pin (1 or 0).
       """
       state_str = "Rising" if state else "Falling"
       rising_or_falling = 1.0 if state else -1.0                    ### aici k sa nu mai dea eroare in influx .. convert to float

       self.logger.debug(f"Edge detected on pin {pin}: {state_str}")

       write_db = threading.Thread(
           target=write_influxdb_value,
           args=(self.unique_id, measurements_dict[pin]['unit'], rising_or_falling,),
           kwargs={'channel': pin,
                   'measure': measurements_dict[pin]['measurement'],
                   'timestamp': datetime.datetime.utcnow()})
       write_db.start()

       trigger = db_retrieve_table_daemon(Trigger)
       trigger = trigger.filter(Trigger.trigger_type == 'trigger_edge')
       trigger = trigger.filter(Trigger.measurement == self.unique_id)
       trigger = trigger.filter(Trigger.is_activated.is_(True))

       for each_trigger in trigger.all():
           if each_trigger.edge_detected in ['both', state_str.lower()]:
               now = time.time()
               timestamp = datetime.datetime.fromtimestamp(now).strftime('%Y-%m-%d %H-%M-%S')
               message = "{ts}\n[Trigger {cid} ({cname})] " \
                         "Input {oid} ({name}) {state} edge detected " \
                         "on pin {pin}".format(
                               ts=timestamp,
                               cid=each_trigger.id,
                               cname=each_trigger.name,
                               oid=self.unique_id,
                               name=self.input_dev.name,
                               state=state_str,
                               pin=pin)
               self.logger.debug("Edge: {}".format(message))

               self.control.trigger_all_actions(
                   each_trigger.unique_id, message=message)

   def stop_input(self):
       """Called when Input is deactivated."""
       self.running = False
       try:
           if self.poll_thread.is_alive():
               self.poll_thread.join()
           self.logger.debug("Stopping PCF8575 polling")
       except Exception as e:
           self.logger.exception("Exception during cleanup: {}".format(e))

What I would need now is a trigger edge function, which would start an action. The code I currently work on is this:

import datetime
import threading
import time
import smbus2

from mycodo.databases.models import CustomController
from mycodo.functions.base_function import AbstractFunction
from mycodo.mycodo_client import DaemonControl
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import write_influxdb_value
from mycodo.config_translations import TRANSLATIONS
from mycodo.databases.models import Actions
from mycodo.databases.models import Output
from mycodo.actions.base_action import AbstractFunctionAction

 
# Define function information
FUNCTION_INFORMATION = {
    'function_name_unique': 'edge_detection_funct3ion',
    'function_name': 'Edge Detection Function',
    'message': 'Monitors PCF8575 for edge detection and activates outputs.',
    'options_enabled': ['custom_options', 'function_status'],
    'dependencies_module': [('pip-pypi', 'smbus2', 'smbus2==0.4.1')],
    'custom_options': [
        {'id': 'i2c_location', 'type': 'text', 'default_value': '0x20', 'name': 'I2C Location', 'phrase': 'I2C address'},
        {'id': 'i2c_bus', 'type': 'integer', 'default_value': 1, 'name': 'I2C Bus', 'phrase': 'I2C bus number'},
        {'id': 'poll_interval', 'type': 'float', 'default_value': 0.1, 'name': 'Poll Interval', 'phrase': 'Poll interval (s)'},
        {'id': 'channel', 'type': 'integer', 'default_value': 0, 'name': 'Channel', 'phrase': 'Channel to monitor'},
        {'id': 'activation_output', 'type': 'select_device', 'default_value': '', 'options_select': ['Output'], 'name': 'Activation Output', 'phrase': 'Output to activate'},
        {'id': 'activation_duration', 'type': 'float', 'default_value': 15.0, 'name': 'Activation Duration', 'phrase': 'Activation duration (s)'}
    ]
}

class EdgeDetectionFunction(AbstractFunction):
    def __init__(self, function, testing=False):
        super().__init__(function, testing=testing, name=__name__)
        self.control = DaemonControl()
        self.listener_running = True
        self.timer_loop = time.time()
        self.i2c_location = None
        self.i2c_bus = None
        self.poll_interval = None
        self.channel = None
        self.activation_output = None
        self.activation_duration = None
        self.previous_state = None
        # Retrieve custom options from the database
        custom_function = db_retrieve_table_daemon(CustomController, unique_id=self.unique_id)
        self.setup_custom_options(FUNCTION_INFORMATION['custom_options'], custom_function)
        self.bus = None
        if not testing:
            self.try_initialize()

    def initialize(self):
        try:
            self.bus = smbus2.SMBus(self.i2c_bus)
            self.logger.info("PCF8575 initialized successfully.")
            self.start_polling()
        except Exception as e:
            self.logger.exception("Initialization error: {}".format(e))

    def read_pcf8575(self):
        """ Reads the state of all pins (16 bits) """
        try:
            data = self.bus.read_word_data(int(self.i2c_location, 16), 0)
            data = ((data & 0xFF) << 8) | (data >> 8)  # Correct byte order
            return data
        except Exception as e:
            self.logger.exception(f"Error reading from PCF8575: {e}")
            return None

    def start_polling(self):
        """ Start polling the PCF8575 for pin state changes. """
        self.running = True
        self.poll_thread = threading.Thread(target=self.poll_pins)
        self.poll_thread.start()

    def poll_pins(self):
        """ Poll the PCF8575 pins and detect edges for the specified channel. """
        while self.running:
            current_states = self.read_pcf8575()
            if current_states is not None:
                current_state = (current_states >> self.channel) & 1

                if self.previous_state is None:
                    self.previous_state = current_state

                if current_state != self.previous_state:
                    self.logger.debug(f"State change detected on channel {self.channel}: {self.previous_state} -> {current_state}")
                    self.edge_detected(self.channel, current_state)
                    self.previous_state = current_state
            time.sleep(self.poll_interval)

    def edge_detected(self, pin, state):
        """
        Handle edge detection for a specific pin.
        :param pin: The pin number (0-15).
        :param state: The current state of the pin (1 or 0).
        """
        state_str = "Rising" if state else "Falling"
        self.logger.debug(f"Edge detected on pin {pin}: {state_str}")

        # Activate the output for a specified duration
        if self.activation_output:
            self.logger.debug(f"Activating output {self.activation_output} for {self.activation_duration} seconds...")
            self.control.output_on(self.activation_output, output_type='sec', amount=self.activation_duration)

        # Record edge detection in the database
        write_db = threading.Thread(
            target=write_influxdb_value,
            args=(self.unique_id, measurements_dict[pin]['unit'], float(state)),
            kwargs={'channel': pin,
                    'measure': measurements_dict[pin]['measurement'],
                    'timestamp': datetime.datetime.utcnow()})
        write_db.start()

    def stop_function(self):
        """Called when the function is deactivated."""
        self.running = False
        try:
            if self.poll_thread.is_alive():
                self.poll_thread.join()
            self.logger.debug("Stopping PCF8575 polling")
        except Exception as e:
            self.logger.exception("Exception during cleanup: {}".format(e))

    def function_status(self):
        return_dict = {
            'string_status': f"Current time: {datetime.datetime.now()} ({time.time()})"
                             f"\nI2C Location: {self.i2c_location}"
                             f"\nI2C Bus: {self.i2c_bus}"
                             f"\nChannel: {self.channel}"
                             f"\nPoll Interval: {self.poll_interval}",
            'error': []
        }
        return return_dict

How can I add the “Actions” drop down menu to it so I can use the internal actions?

Thank you!

Please read all of the Mycodo Documentation before posting questions like this.
If you read the Mycodo Documentation (or scroll through the Functions drop-down list) you will see there is already a Trigger: Edge Function, complete with the Actions drop-down menu…

There is also already an Output Module for the PCF8575 I/O Expander…

The Edge Trigger currently only works with the Edge Detection Input. You could easily add the functionality to your Input by incorporating a few lines to check for the relevant trigger when an edge detection occurs. See:

I will need to update how Edge Inputs are defined, since the Edge Trigger is currently hard-coded to only work with the Edge Input. See:

This kind of update would expand the Edge Trigger to be able to be used with any Input specifically designed to detect binary state changes.

If you would like to go ahead and update your Input with the above code, and are willing to test modifications to the Edge Trigger code, I’ll update it to be usable by other Inputs.

1 Like

@Lucid3y3
Hi, thanks for the reply and sorry for my English.
I would need use the PCF8575 as Input (not Output) and use the Trigger Edge function on the pins of the PCF8575. Currently the PCF8575 can be selected only as and Output and the Trigger Edge function works only for the Raspberry PI GPIO pins.

@KyleGabriel
Hi, thanks for the reply. I would be more than happy to test any code.
I’ve updated my input code with your snippet.
Here is the code:

import copy
import datetime
import threading
import time
import smbus2

from mycodo.databases.models import Trigger
from mycodo.inputs.base_input import AbstractInput
from mycodo.mycodo_client import DaemonControl
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import write_influxdb_value

# Measurements
measurements_dict = {}

for each_channel in range(16):
    measurements_dict[each_channel] = {
        'measurement': 'buttons_edge',
        'unit': 'bool'
    }

# Input information
INPUT_INFORMATION = {
    'input_name_unique': 'EDGE_PCF8575_ALL_Channels',
    'input_manufacturer': 'XXX',
    'input_name': 'PCF8575 Edge ALL Channels Detection',
    'input_library': 'smbus2',
    'measurements_name': 'Rising/Falling Edge',
    'measurements_dict': measurements_dict,

    'options_enabled': [
        'i2c_location',
        'i2c_bus',
        'pre_output',
        'poll_interval'
    ],
    'options_disabled': ['interface'],

    'dependencies_module': [
        ('pip-pypi', 'smbus2', 'smbus2==0.4.1')
    ],

    'interfaces': ['I2C'],
    'i2c_location': ['0x20', '0x21', '0x22', '0x23', '0x24', '0x25', '0x26', '0x27'],
    'i2c_address_editable': False,
    'i2c_address_default': '0x20',
    'custom_options': [
        {
            'id': 'poll_interval',
            'type': 'float',
            'default_value': 0.5,
            'name': 'Poll Interval',
            'phrase': 'The interval (in seconds) between checks of the PCF8575 pins for state changes.'
        }
    ]
}

class InputModule(AbstractInput):
    """A sensor support class that listens for rising or falling pin edge events on the PCF8575."""

    def __init__(self, input_dev, testing=False):
        super().__init__(input_dev, testing=testing, name=__name__)

        self.bus = None
        self.address = int(str(self.input_dev.i2c_location), 16)
        self.poll_interval = getattr(self.input_dev, 'poll_interval', 0.5)  # Default to 0.5 if not set
        self.previous_states = None  # Initial state is unknown
        self.control = None

        if not testing:
            self.setup_custom_options(
                INPUT_INFORMATION['custom_options'], input_dev)
            self.try_initialize()

    def initialize(self):
        try:
            self.bus = smbus2.SMBus(self.input_dev.i2c_bus)
            self.control = DaemonControl()

            self.logger.info("PCF8575 initialized without causing an exception!")
            self.start_polling()

        except Exception as e:
            self.logger.exception("Exception encountered while initializing PCF8575: {}".format(e))

    def read_pcf8575(self):
        """ Reads the state of all pins (16 bits) """
        try:
            data = self.bus.read_word_data(self.address, 0)
            # Correct byte order if necessary
            data = ((data & 0xFF) << 8) | (data >> 8)
            return data
        except Exception as e:
            self.logger.exception(f"Error reading from PCF8575: {e}")
            return None

    def start_polling(self):
        """ Start polling the PCF8575 for pin state changes. """
        self.running = True
        self.poll_thread = threading.Thread(target=self.poll_pins)
        self.poll_thread.start()

    def poll_pins(self):
        """ Poll the PCF8575 pins and detect edges. """
        while self.running:
            current_states = self.read_pcf8575()
            if current_states is not None:
                current_states = ~current_states & 0xFFFF  # Invert the pin states and mask to 16 bits
                if self.previous_states is None:
                    # Skip edge detection for the first cycle
                    self.previous_states = current_states
                else:
                    changed_pins = current_states ^ self.previous_states
                    if changed_pins:
                        for pin in range(16):
                            if changed_pins & (1 << pin):
                                # Extract the previous and current states for the pin
                                previous_state = (self.previous_states >> pin) & 1
                                current_state = (current_states >> pin) & 1

                                # Log the state change detected on this pin
                                self.logger.debug(
                                    f"State change detected on channel {pin}: {previous_state} -> {current_state}"
                                )

                                # Call edge_detected with the actual state (0 or 1)
                                self.edge_detected(pin, current_state)

                    # Update the previous states to the current ones
                    self.previous_states = current_states

            time.sleep(self.poll_interval)

    def edge_detected(self, pin, state):
        """
        Handle edge detection for a specific pin.
        :param pin: The pin number (0-15).
        :param state: The current state of the pin (1 or 0).
        """
        state_str = "Rising" if state else "Falling"
        rising_or_falling = 1.0 if state else -1.0  # Convert to float for InfluxDB

        self.logger.debug(f"Edge detected on pin {pin}: {state_str}")

        # Write the edge detection to the database
        write_db = threading.Thread(
            target=write_influxdb_value,
            args=(self.unique_id, measurements_dict[pin]['unit'], rising_or_falling,),
            kwargs={'channel': pin,
                    'measure': measurements_dict[pin]['measurement'],
                    'timestamp': datetime.datetime.utcnow()},
            daemon=True)   # Add daemon=True
        write_db.start()

        # Check for triggers that need to be activated on edge detection
        trigger = db_retrieve_table_daemon(Trigger)
        trigger = trigger.filter(Trigger.trigger_type == 'trigger_edge')
        trigger = trigger.filter(Trigger.measurement == self.unique_id)
        trigger = trigger.filter(Trigger.is_activated.is_(True))

        for each_trigger in trigger.all():
            if each_trigger.edge_detected in ['both', state_str.lower()]:
                now = time.time()
                timestamp = datetime.datetime.fromtimestamp(now).strftime('%Y-%m-%d %H-%M-%S')
                message = "{ts}\n[Trigger {cid} ({cname})] " \
                          "Input {oid} ({name}) {state} edge detected " \
                          "on pin {pin}".format(
                            ts=timestamp,
                            cid=each_trigger.id,
                            cname=each_trigger.name,
                            oid=self.unique_id,
                            name=self.input_dev.name,
                            state=state_str,
                            pin=pin)
                self.logger.debug("Edge: {}".format(message))

                # Trigger the associated actions
                self.control.trigger_all_actions(
                    each_trigger.unique_id, message=message)

    def stop_input(self):
        """Called when Input is deactivated."""
        self.running = False
        try:
            if self.poll_thread.is_alive():
                self.poll_thread.join()
            self.logger.debug("Stopping PCF8575 polling")
        except Exception as e:
            self.logger.exception("Exception during cleanup: {}".format(e))

Thank you

I just committed the changes necessary for any Input to be easily set as an Edge Input:

You simply need to add 'edge_input': True, to your Input’s INPUT_INFORMATION dictionary:

And your Input will populate the Measurement field of the Edge Trigger Function.