Hello everybody!
I’m trying to use the PCF8575 as input for buttons. The following code used as custom input works ok, it detects the changes on all channels.
import copy
import datetime
import threading
import time
import smbus2
from mycodo.databases.models import Trigger
from mycodo.inputs.base_input import AbstractInput
from mycodo.mycodo_client import DaemonControl
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import write_influxdb_value
# Measurements
measurements_dict = {}
for each_channel in range(16):
measurements_dict[each_channel] = {
'measurement': 'buttons_edge',
'unit': 'bool'
}
# Input information
INPUT_INFORMATION = {
'input_name_unique': 'EDGE_PCF8575_toate_canalele',
'input_manufacturer': 'XXX',
'input_name': 'PCF8575 Edge ALL Channels Detection',
'input_library': 'smbus2',
'measurements_name': 'Rising/Falling Edge',
'measurements_dict': measurements_dict,
'options_enabled': [
'i2c_location',
'i2c_bus',
'pre_output',
'poll_interval'
],
'options_disabled': ['interface'],
'dependencies_module': [
('pip-pypi', 'smbus2', 'smbus2==0.4.1')
],
'interfaces': ['I2C'],
'i2c_location': ['0x20', '0x21', '0x22', '0x23', '0x24', '0x25', '0x26', '0x27'],
'i2c_address_editable': False,
'i2c_address_default': '0x20',
'custom_options': [
{
'id': 'poll_interval',
'type': 'float',
'default_value': 0.1,
'name': 'Poll Interval',
'phrase': 'The interval (in seconds) between checks of the PCF8575 pins for state changes.'
}
]
}
class InputModule(AbstractInput):
"""A sensor support class that listens for rising or falling pin edge events on the PCF8575."""
def __init__(self, input_dev, testing=False):
super().__init__(input_dev, testing=testing, name=__name__)
self.bus = None
self.address = int(str(self.input_dev.i2c_location), 16)
self.poll_interval = getattr(self.input_dev, 'poll_interval', 0.1) # Default to 0.1 if not set
self.previous_states = None # Initial state is unknown
#self.previous_states = 0xFFFF # Initial state (all pins high) ## asta am scos'o am inlocuit'o cu ce'i deasupra...
self.control = None
if not testing:
self.setup_custom_options(
INPUT_INFORMATION['custom_options'], input_dev)
self.try_initialize()
def initialize(self):
try:
self.bus = smbus2.SMBus(self.input_dev.i2c_bus)
self.control = DaemonControl()
self.logger.info("PCF8575 initialized without causing an exception!...")
self.start_polling()
except Exception as e:
self.logger.exception("Exception encountered while initializing PCF8575: {}".format(e))
def read_pcf8575(self):
""" Reads the state of all pins (16 bits) """
try:
data = self.bus.read_word_data(self.address, 0)
# Correct byte order if necessary
data = ((data & 0xFF) << 8) | (data >> 8)
return data
except Exception as e:
self.logger.exception(f"Error reading from PCF8575: {e}")
return None
def start_polling(self):
""" Start polling the PCF8575 for pin state changes. """
self.running = True
self.poll_thread = threading.Thread(target=self.poll_pins)
self.poll_thread.start()
def poll_pins(self):
""" Poll the PCF8575 pins and detect edges. """
while self.running:
current_states = self.read_pcf8575()
if current_states is not None:
current_states = ~current_states & 0xFFFF # Invert the pin states and mask to 16 bits
if self.previous_states is None:
# Skip edge detection for the first cycle
self.previous_states = current_states
else:
changed_pins = current_states ^ self.previous_states
if changed_pins:
for pin in range(16):
if changed_pins & (1 << pin):
# Extract the previous and current states for the pin
previous_state = (self.previous_states >> pin) & 1
current_state = (current_states >> pin) & 1
# Log the state change detected on this pin
self.logger.debug(
f"State change detected on channel {pin}: {previous_state} -> {current_state}"
)
# Call edge_detected with the actual state (0 or 1)
self.edge_detected(pin, current_state)
# Update the previous states to the current ones
self.previous_states = current_states
time.sleep(self.poll_interval)
def edge_detected(self, pin, state):
"""
Handle edge detection for a specific pin.
:param pin: The pin number (0-15).
:param state: The current state of the pin (1 or 0).
"""
state_str = "Rising" if state else "Falling"
rising_or_falling = 1.0 if state else -1.0 ### aici k sa nu mai dea eroare in influx .. convert to float
self.logger.debug(f"Edge detected on pin {pin}: {state_str}")
write_db = threading.Thread(
target=write_influxdb_value,
args=(self.unique_id, measurements_dict[pin]['unit'], rising_or_falling,),
kwargs={'channel': pin,
'measure': measurements_dict[pin]['measurement'],
'timestamp': datetime.datetime.utcnow()})
write_db.start()
trigger = db_retrieve_table_daemon(Trigger)
trigger = trigger.filter(Trigger.trigger_type == 'trigger_edge')
trigger = trigger.filter(Trigger.measurement == self.unique_id)
trigger = trigger.filter(Trigger.is_activated.is_(True))
for each_trigger in trigger.all():
if each_trigger.edge_detected in ['both', state_str.lower()]:
now = time.time()
timestamp = datetime.datetime.fromtimestamp(now).strftime('%Y-%m-%d %H-%M-%S')
message = "{ts}\n[Trigger {cid} ({cname})] " \
"Input {oid} ({name}) {state} edge detected " \
"on pin {pin}".format(
ts=timestamp,
cid=each_trigger.id,
cname=each_trigger.name,
oid=self.unique_id,
name=self.input_dev.name,
state=state_str,
pin=pin)
self.logger.debug("Edge: {}".format(message))
self.control.trigger_all_actions(
each_trigger.unique_id, message=message)
def stop_input(self):
"""Called when Input is deactivated."""
self.running = False
try:
if self.poll_thread.is_alive():
self.poll_thread.join()
self.logger.debug("Stopping PCF8575 polling")
except Exception as e:
self.logger.exception("Exception during cleanup: {}".format(e))
What I would need now is a trigger edge function, which would start an action. The code I currently work on is this:
import datetime
import threading
import time
import smbus2
from mycodo.databases.models import CustomController
from mycodo.functions.base_function import AbstractFunction
from mycodo.mycodo_client import DaemonControl
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import write_influxdb_value
from mycodo.config_translations import TRANSLATIONS
from mycodo.databases.models import Actions
from mycodo.databases.models import Output
from mycodo.actions.base_action import AbstractFunctionAction
# Define function information
FUNCTION_INFORMATION = {
'function_name_unique': 'edge_detection_funct3ion',
'function_name': 'Edge Detection Function',
'message': 'Monitors PCF8575 for edge detection and activates outputs.',
'options_enabled': ['custom_options', 'function_status'],
'dependencies_module': [('pip-pypi', 'smbus2', 'smbus2==0.4.1')],
'custom_options': [
{'id': 'i2c_location', 'type': 'text', 'default_value': '0x20', 'name': 'I2C Location', 'phrase': 'I2C address'},
{'id': 'i2c_bus', 'type': 'integer', 'default_value': 1, 'name': 'I2C Bus', 'phrase': 'I2C bus number'},
{'id': 'poll_interval', 'type': 'float', 'default_value': 0.1, 'name': 'Poll Interval', 'phrase': 'Poll interval (s)'},
{'id': 'channel', 'type': 'integer', 'default_value': 0, 'name': 'Channel', 'phrase': 'Channel to monitor'},
{'id': 'activation_output', 'type': 'select_device', 'default_value': '', 'options_select': ['Output'], 'name': 'Activation Output', 'phrase': 'Output to activate'},
{'id': 'activation_duration', 'type': 'float', 'default_value': 15.0, 'name': 'Activation Duration', 'phrase': 'Activation duration (s)'}
]
}
class EdgeDetectionFunction(AbstractFunction):
def __init__(self, function, testing=False):
super().__init__(function, testing=testing, name=__name__)
self.control = DaemonControl()
self.listener_running = True
self.timer_loop = time.time()
self.i2c_location = None
self.i2c_bus = None
self.poll_interval = None
self.channel = None
self.activation_output = None
self.activation_duration = None
self.previous_state = None
# Retrieve custom options from the database
custom_function = db_retrieve_table_daemon(CustomController, unique_id=self.unique_id)
self.setup_custom_options(FUNCTION_INFORMATION['custom_options'], custom_function)
self.bus = None
if not testing:
self.try_initialize()
def initialize(self):
try:
self.bus = smbus2.SMBus(self.i2c_bus)
self.logger.info("PCF8575 initialized successfully.")
self.start_polling()
except Exception as e:
self.logger.exception("Initialization error: {}".format(e))
def read_pcf8575(self):
""" Reads the state of all pins (16 bits) """
try:
data = self.bus.read_word_data(int(self.i2c_location, 16), 0)
data = ((data & 0xFF) << 8) | (data >> 8) # Correct byte order
return data
except Exception as e:
self.logger.exception(f"Error reading from PCF8575: {e}")
return None
def start_polling(self):
""" Start polling the PCF8575 for pin state changes. """
self.running = True
self.poll_thread = threading.Thread(target=self.poll_pins)
self.poll_thread.start()
def poll_pins(self):
""" Poll the PCF8575 pins and detect edges for the specified channel. """
while self.running:
current_states = self.read_pcf8575()
if current_states is not None:
current_state = (current_states >> self.channel) & 1
if self.previous_state is None:
self.previous_state = current_state
if current_state != self.previous_state:
self.logger.debug(f"State change detected on channel {self.channel}: {self.previous_state} -> {current_state}")
self.edge_detected(self.channel, current_state)
self.previous_state = current_state
time.sleep(self.poll_interval)
def edge_detected(self, pin, state):
"""
Handle edge detection for a specific pin.
:param pin: The pin number (0-15).
:param state: The current state of the pin (1 or 0).
"""
state_str = "Rising" if state else "Falling"
self.logger.debug(f"Edge detected on pin {pin}: {state_str}")
# Activate the output for a specified duration
if self.activation_output:
self.logger.debug(f"Activating output {self.activation_output} for {self.activation_duration} seconds...")
self.control.output_on(self.activation_output, output_type='sec', amount=self.activation_duration)
# Record edge detection in the database
write_db = threading.Thread(
target=write_influxdb_value,
args=(self.unique_id, measurements_dict[pin]['unit'], float(state)),
kwargs={'channel': pin,
'measure': measurements_dict[pin]['measurement'],
'timestamp': datetime.datetime.utcnow()})
write_db.start()
def stop_function(self):
"""Called when the function is deactivated."""
self.running = False
try:
if self.poll_thread.is_alive():
self.poll_thread.join()
self.logger.debug("Stopping PCF8575 polling")
except Exception as e:
self.logger.exception("Exception during cleanup: {}".format(e))
def function_status(self):
return_dict = {
'string_status': f"Current time: {datetime.datetime.now()} ({time.time()})"
f"\nI2C Location: {self.i2c_location}"
f"\nI2C Bus: {self.i2c_bus}"
f"\nChannel: {self.channel}"
f"\nPoll Interval: {self.poll_interval}",
'error': []
}
return return_dict
How can I add the “Actions” drop down menu to it so I can use the internal actions?
Thank you!