APC PDU – SNMP to MQTT

Guide for SNMP and MQTT Integration Script

This is a script I developed to retrieve status information from my APC PDU. While there may be other solutions available, I chose to create my own and integrate it with MQTT discovery so it could seamlessly integrate with Home Assistant. This setup has worked reliably for me for months. I’m always open to suggestions for improvement and hope you find it useful.

Prerequisites

  1. Python Environment: Ensure you have Python installed on your system.
  2. Python Packages: Install the required packages using…
pip install paho-mqtt==1.6.1
  1. SNMP Enabled Device: You need a device with SNMP enabled.
  2. MQTT Broker: Ensure you have access to an MQTT broker.

Docker Option

1. Create the Python Script

Save your Python script as apc2mqtt.py in the your preferred directory in this example /opt/scripts.

2. Create a Dockerfile

Create a Dockerfile in the same directory where your apc2mqtt.py script is located:

DockerfileCopy code# Use the official Python image as a base
FROM python:3.9-slim

# Set the working directory
WORKDIR /opt/scripts

# Copy the requirements file
COPY requirements.txt .

# Install the required Python packages
RUN pip install --no-cache-dir -r requirements.txt

# Copy the Python script
COPY apc2mqtt.py .

# Command to run the script
CMD ["python", "apc2mqtt.py"]

3. Create a Requirements File

Create a requirements.txt file in the same directory to specify the dependencies:

paho-mqtt==1.6.1
pysnmp

4. Create a Docker Compose File

Create a docker-compose.yml file in the same directory:

codeversion: "3"
services:
  apc2mqtt-servers:
    build:
      context: .
      dockerfile: Dockerfile
    volumes:
      - /opt/scripts:/opt/scripts
    command: python /opt/scripts/apc2mqtt.py
    restart: always
    networks:
      - apc2mqtt-net

networks:
  apc2mqtt-net:
    driver: bridge

5. Build and Run the Docker Container

Navigate to the directory containing your Dockerfile and docker-compose.yml and run the following commands:

  1. Build the Docker image:
docker-compose build
  1. Run the Docker container:
docker-compose up -d

Configuration

  1. SNMP Configuration:
    • snmp_ip_address: The IP address of the SNMP-enabled device.
    • community_string: The SNMP community string (usually ‘public’).
  2. MQTT Configuration:
    • mqtt_broker: The IP address of your MQTT broker.
    • mqtt_port: The port your MQTT broker is listening on (default is 1883).
    • mqtt_user: MQTT username.
    • mqtt_password: MQTT password.
    • device_id: A unique identifier for your device (e.g., ‘servers’).
    • debug_mode: Set to True for verbose debug logging.
  3. Polling Interval:
    • sleep_duration: Time in seconds between each status poll and publish cycle.

Integrating with Home Assistant

This integrates with Home Assistant using MQTT discovery. When the script runs, it publishes discovery messages to the MQTT broker. Home Assistant listens for these messages and automatically configures the corresponding entitier.

Handling Multiple PDUs

If you have multiple PDUs, you can change the device_id in the script configuration to ensure each PDU has a unique identifier. This allows Home Assistant to handle multiple instances without conflicts.

device_id = 'it_equitment'

Supported PDUs

This script has only been tested with an APC PDU model AP7900. However, it should be straightforward to update the SNMP OIDs in the script to support other models or brands of PDUs.

Script:

from pysnmp.hlapi import getCmd, setCmd, SnmpEngine, CommunityData, UdpTransportTarget, ContextData, ObjectType, ObjectIdentity, Integer32
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
import json
import time

# Configuration
snmp_ip_address = '192.168.1.1'
community_string = 'public'
mqtt_broker = '192.168.1.2'
mqtt_port = 1883
mqtt_user = 'user'
mqtt_password = 'password'
device_id = 'it_equitment'
debug_mode = False

global_port_names = {}

sleep_duration = 25

def debug_print(message):
    if debug_mode:
        print(message)

def fetch_snmp_data(oid):
    errorIndication, errorStatus, errorIndex, varBinds = next(
        getCmd(SnmpEngine(),
               CommunityData(community_string, mpModel=0),
               UdpTransportTarget((snmp_ip_address, 161)),
               ContextData(),
               ObjectType(ObjectIdentity(oid)))
    )
    if errorIndication:
        debug_print(f"SNMP Fetch Error: {errorIndication}")
        return None
    elif errorStatus:
        debug_print(f"SNMP Fetch Error Status: {errorStatus.prettyPrint()}")
        return None
    else:
        for varBind in varBinds:
            return varBind[1].prettyPrint()

def control_port(snmp_ip_address, port_num, state, client):
    oid_base = '.1.3.6.1.4.1.318.1.1.12.3.3.1.1.4'
    full_oid = f"{oid_base}.{port_num}"  
    state_value = 1 if state == 1 else 2
    
    # Log the action being attempted
    debug_message = f"Preparing to send SNMP set command for port {port_num} to {'on' if state == 1 else 'off'} (value: {state_value}). OID: {full_oid}"
    publish_debug(client, debug_message)
    
    try:
        errorIndication, errorStatus, errorIndex, varBinds = next(
            setCmd(SnmpEngine(),
                   CommunityData('private', mpModel=0),
                   UdpTransportTarget((snmp_ip_address, 161)),
                   ContextData(),
                   ObjectType(ObjectIdentity(full_oid), Integer32(state_value)))
        )
        
        if errorIndication:
            publish_debug(client, f"SNMP Error: {errorIndication} for port {port_num}")
            return
        elif errorStatus:
            publish_debug(client, f"SNMP Error: {errorStatus.prettyPrint()} for port {port_num} when attempting to set to {'on' if state == 1 else 'off'}")
            return
        else:
            state_str = 'on' if state == 1 else 'off'
            client.publish(f"apc_pdu_{device_id}/status/ports/{port_num}/state", state_str)
            publish_debug(client, f"SNMP set command sent successfully for port {port_num}, updated MQTT with state '{state_str}'.")
    except Exception as e:
        publish_debug(client, f"Exception sending SNMP set command for port {port_num}: {e}")



def format_mac_address(raw_mac):
    if raw_mac.lower().startswith("0x"):
        raw_mac = raw_mac[2:]
    raw_mac = raw_mac.upper()
    formatted_mac = ":".join(raw_mac[i:i+2] for i in range(0, len(raw_mac), 2))
    return formatted_mac

def publish_status(client, port_num=None):
    global global_serial, global_model, global_mac, global_port_names

    global_serial = 'Unknown or Unable to Connect'
    global_model = 'Unknown or Unable to Connect'
    global_mac = 'Unknown or Unable to Connect'

    serial_temp = fetch_snmp_data('.1.3.6.1.4.1.318.1.1.4.1.5.0')
    model_temp = fetch_snmp_data('.1.3.6.1.4.1.318.1.1.4.1.4.0')
    raw_mac = fetch_snmp_data('.1.3.6.1.2.1.2.2.1.6.2')

    if serial_temp is not None:
        global_serial = serial_temp
    if model_temp is not None:
        global_model = model_temp
    if raw_mac is not None:
        global_mac = format_mac_address(raw_mac)

    system_uptime_oid = '.1.3.6.1.2.1.1.3.0'
    system_uptime_ticks = fetch_snmp_data(system_uptime_oid)
    if system_uptime_ticks is not None:
        system_uptime_readable = convert_time_ticks_to_readable_format(system_uptime_ticks)
        client.publish(f"apc_pdu_{device_id}/status/uptime", system_uptime_readable)
    else:
        client.publish(f"apc_pdu_{device_id}/status/uptime", "Uptime fetch failed")

    port_range = range(1, 9) if port_num is None else [port_num]

    for port_num in port_range:
        try:
            name_oid = f'1.3.6.1.4.1.318.1.1.12.3.5.1.1.2.{port_num}'
            state_oid = f'1.3.6.1.4.1.318.1.1.12.3.5.1.1.4.{port_num}'
            port_name = fetch_snmp_data(name_oid) or "Unknown Port Name"
            port_state = 'on' if fetch_snmp_data(state_oid) == '1' else 'off'
            global_port_names[port_num] = port_name

            client.publish(f"apc_pdu_{device_id}/status/ports/{port_num}/name", port_name)
            client.publish(f"apc_pdu_{device_id}/status/ports/{port_num}/state", port_state)
        except Exception as e:
            print(f"Error updating port {port_num}: {e}")

    # Publish global device information
    client.publish(f"apc_pdu_{device_id}/status/serial", global_serial)
    client.publish(f"apc_pdu_{device_id}/status/model", global_model)
    client.publish(f"apc_pdu_{device_id}/status/mac", global_mac)
    client.publish(f"apc_pdu_{device_id}/status/identifier", device_id)
    
    # Ensure discovery messages are published even if there's an issue
    try:
        publish_discovery_messages(client)
    except Exception as e:
        debug_print(f"Error publishing discovery messages: {e}")

def convert_time_ticks_to_readable_format(time_ticks):
    total_seconds = int(time_ticks) / 100  # Convert TimeTicks to seconds

    days = total_seconds // (24 * 3600)
    total_seconds = total_seconds % (24 * 3600)

    hours = total_seconds // 3600
    total_seconds %= 3600

    minutes = total_seconds // 60

    return f"{int(days)} days, {int(hours)} hours, {int(minutes)} minutes"


def publish_debug(client, message):
    if debug_mode:
        debug_topic = f"apc_pdu_{device_id}/debug/logs"
        client.publish(debug_topic, message)

def on_connect(client, userdata, flags, rc):
    publish_debug(client, f"Connected with result code {rc}")
    for port_num in range(1, 9):
        topic = f"apc_pdu_{device_id}/control/ports/{port_num}"
        client.subscribe(topic)
        publish_debug(client, f"Subscribing to topic: {topic}")


def on_message(client, userdata, msg):
    message = msg.payload.decode('utf-8')
    topic_parts = msg.topic.split('/')
    if len(topic_parts) != 4:
        publish_debug(client, f"Received message on unexpected topic: {msg.topic}")
        return

    expected_prefix = f'apc_pdu_{device_id}'
    received_prefix = topic_parts[0]

    # Debug output for diagnostic purposes
    publish_debug(client, f"Diagnostic: Expecting prefix '{expected_prefix}', received '{received_prefix}'")

    if received_prefix == expected_prefix and topic_parts[1] == 'control' and topic_parts[2] == 'ports':
        try:
            port_num = int(topic_parts[3])
            state = 1 if message.lower() == 'on' else 0  # Assuming '0' is the correct state for 'off'
            control_port(snmp_ip_address, port_num, state, client)
        except ValueError as e:
            publish_debug(client, f"Error processing command for topic {msg.topic}: {e}")
    else:
        publish_debug(client, f"Received message does not match expected structure: {msg.topic}")



def publish_discovery_messages(client):
    global global_serial, global_model, global_mac, global_port_names

    # Publishing for each port
    for port_num in range(1, 9):
        port_name = global_port_names.get(port_num, f"Port {port_num}")
        device_name = f"Port {port_num} - {port_name}"

        port_discovery_message = {
            "name": device_name,
            "state_topic": f"apc_pdu_{device_id}/status/ports/{port_num}/state",
            "command_topic": f"apc_pdu_{device_id}/control/ports/{port_num}",
            "unique_id": f"apc_pdu_port_{port_num}_{global_serial}_{device_id}",
            "icon": "mdi:power-plug",
            "device": {
                "identifiers": [f"apc_pdu_{snmp_ip_address}_{device_id}"],
                "name": f"APC PDU {global_model}",
                "model": global_model,
                "manufacturer": "APC",
            },
            "payload_on": "on",
            "payload_off": "off",
            "value_template": "{{ value if value == 'on' else 'off' }}"
        }

        publish.single(
            f"homeassistant/switch/apc_pdu_{device_id}/port_{port_num}/config",
            payload=json.dumps(port_discovery_message),
            hostname=mqtt_broker,
            port=mqtt_port,
            auth={"username": mqtt_user, "password": mqtt_password},
            retain=True,
        )

    # Serial Number Discovery Message
    serial_discovery_message = {
        "name": "Serial Number",
        "state_topic": f"apc_pdu_{device_id}/status/serial",
        "unique_id": f"apc_pdu_serial_{global_serial}_{device_id}",
        "icon": "mdi:alphabetical",
        "device": {
            "identifiers": [f"apc_pdu_{snmp_ip_address}_{device_id}"],
            "name": f"APC PDU {global_model}",
            "model": global_model,
            "manufacturer": "APC",
        },
    }
    publish.single(
        f"homeassistant/sensor/apc_pdu_{device_id}/serial/config",
        payload=json.dumps(serial_discovery_message),
        hostname=mqtt_broker,
        port=mqtt_port,
        auth={"username": mqtt_user, "password": mqtt_password},
        retain=True,
    )

    # Model Discovery Message
    model_discovery_message = {
        "name": "Model",
        "state_topic": f"apc_pdu_{device_id}/status/model",
        "unique_id": f"apc_pdu_model_{global_serial}_{device_id}",
        "icon": "mdi:information-outline",
        "device": {
            "identifiers": [f"apc_pdu_{snmp_ip_address}_{device_id}"],
            "name": f"APC PDU {global_model}",  
            "model": global_model,
            "manufacturer": "APC",
        },
    }
    publish.single(
        f"homeassistant/sensor/apc_pdu_{device_id}/model/config",
        payload=json.dumps(model_discovery_message),
        hostname=mqtt_broker,
        port=mqtt_port,
        auth={"username": mqtt_user, "password": mqtt_password},
        retain=True,
    )

    # MAC Address Discovery Message
    mac_discovery_message = {
        "name": "MAC Address",
        "state_topic": f"apc_pdu_{device_id}/status/mac",
        "unique_id": f"apc_pdu_mac_{global_serial}_{device_id}",
        "icon": "mdi:ethernet",
        "device": {
            "identifiers": [f"apc_pdu_{snmp_ip_address}_{device_id}"],
            "name": f"APC PDU {global_model}",
            "model": global_model,
            "manufacturer": "APC",
        },
        "value_template": "{{ value }}"
    }
    publish.single(
        f"homeassistant/sensor/apc_pdu_{device_id}/mac/config",
        payload=json.dumps(mac_discovery_message),
        hostname=mqtt_broker,
        port=mqtt_port,
        auth={"username": mqtt_user, "password": mqtt_password},
        retain=True,
    )

    # Identifier Discovery Message
    identifier_discovery_message = {
        "name": "Identifier",
        "state_topic": f"apc_pdu_{device_id}/status/identifier",
        "unique_id": f"apc_pdu_identifier_{global_serial}_{device_id}",
        "icon": "mdi:identifier",
        "device": {
            "identifiers": [f"apc_pdu_{snmp_ip_address}_{device_id}"],
            "name": f"APC PDU {global_model}",
            "model": global_model,
            "manufacturer": "APC",
        },
    }

    # Publishing the Identifier Discovery Message
    publish.single(
        f"homeassistant/sensor/{device_id}/identifier/config",
        payload=json.dumps(identifier_discovery_message),
        hostname=mqtt_broker,
        port=mqtt_port,
        auth={"username": mqtt_user, "password": mqtt_password},
        retain=True,
    )

    # Uptime Discovery Message
    uptime_discovery_message = {
        "name": f"Uptime",
        "state_topic": f"apc_pdu_{device_id}/status/uptime",
        "unique_id": f"apc_pdu_uptime_{global_serial}_{device_id}",
        "icon": "mdi:timer-outline",
        "device": {
            "identifiers": [f"apc_pdu_{snmp_ip_address}_{device_id}"],
            "name": f"APC PDU {global_model}",
            "model": global_model,
            "manufacturer": "APC",
        },
    }

    # Publishing the Uptime Discovery Message
    publish.single(
        f"homeassistant/sensor/{device_id}/uptime/config",
        payload=json.dumps(uptime_discovery_message),
        hostname=mqtt_broker,
        port=mqtt_port,
        auth={"username": mqtt_user, "password": mqtt_password},
        retain=True,
    )

def safe_publish(client, topic, payload):
    try:
        client.publish(topic, payload)
    except Exception as e:
        debug_print(f"Failed to publish MQTT message: {e}")


def connect_with_retry(client, broker, port, max_retries=5, delay=5):
    for attempt in range(max_retries):
        try:
            client.connect(broker, port, 60)
            debug_print("Successfully connected to MQTT broker.")
            return True
        except Exception as e:
            debug_print(f"Failed to connect to MQTT broker: {e}. Retrying in {delay} seconds...")
            time.sleep(delay)
    debug_print("Failed to connect after maximum retries.")
    return False

if __name__ == "__main__":
    client = mqtt.Client()
    client.username_pw_set(mqtt_user, mqtt_password)
    client.on_connect = on_connect
    client.on_message = on_message
    
    if connect_with_retry(client, mqtt_broker, mqtt_port):
        client.loop_start()

        try:
            while True:
                publish_status(client)
                time.sleep(sleep_duration)
        except KeyboardInterrupt:
            debug_print("Script stopped by user.")
        finally:
            client.disconnect()
    else:
        debug_print("Exiting due to connection issues.")

Leave a Reply

Your email address will not be published. Required fields are marked *