Skip to content
176 changes: 119 additions & 57 deletions check_sensorProbe2plus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
# ------------------------------------------------------------------------------
# check_sensorProbe2plus.py - A check plugin for AKCP SensorProbe2+.
# Copyright (C) 2017 NETWAYS GmbH, www.netways.de
# Authors: Noah Hilverling <noah.hilverling@netways.de>
# Jennifer Mourek <jennifer.mourek@netways.de>
#
# Version: 1.0
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
Expand All @@ -24,9 +20,11 @@

import argparse
import sys
from pysnmp.entity.rfc3413.oneliner import cmdgen
import typing
from enum import Enum, IntEnum

from pysnmp.entity.rfc3413.oneliner import cmdgen


# Translate the OID indexes to keywords
class Types(IntEnum):
Expand Down Expand Up @@ -67,7 +65,7 @@ class NagiosState(Enum):
22: "Power",
24: "Fuel",
26: "Tank sender",
27: "Door"
27: "Door",
}


Expand Down Expand Up @@ -106,21 +104,36 @@ def print_status_message(sensor_states, perf_data):

result_message = ""
if len(sensor_states["WARNING"]) > 0 and len(sensor_states["CRITICAL"]) > 0:
result_message = "CRITICAL sensorProbe2plus: Sensor reports state CRITICAL for %d sensor%s (%s) " \
"and state WARNING for %d sensor%s (%s)" % (
len(sensor_states["CRITICAL"]),
"s" if len(sensor_states["CRITICAL"]) > 1 else "",
critical_name_string,
len(sensor_states["WARNING"]),
"s" if len(sensor_states["WARNING"]) > 1 else "",
warning_name_string
)
result_message = (
"CRITICAL sensorProbe2plus: Sensor reports state CRITICAL for %d sensor%s (%s) "
"and state WARNING for %d sensor%s (%s)"
% (
len(sensor_states["CRITICAL"]),
"s" if len(sensor_states["CRITICAL"]) > 1 else "",
critical_name_string,
len(sensor_states["WARNING"]),
"s" if len(sensor_states["WARNING"]) > 1 else "",
warning_name_string,
)
)
elif len(sensor_states["WARNING"]) > 0:
result_message = "WARNING sensorProbe2plus: Sensor reports state WARNING for %d sensor%s (%s)" % (
len(sensor_states["WARNING"]), "s" if len(sensor_states["WARNING"]) > 1 else "", warning_name_string)
result_message = (
"WARNING sensorProbe2plus: Sensor reports state WARNING for %d sensor%s (%s)"
% (
len(sensor_states["WARNING"]),
"s" if len(sensor_states["WARNING"]) > 1 else "",
warning_name_string,
)
)
elif len(sensor_states["CRITICAL"]) > 0:
result_message = "CRITICAL sensorProbe2plus: Sensor reports state CRITICAL for %d sensor%s (%s)" % (
len(sensor_states["CRITICAL"]), "s" if len(sensor_states["CRITICAL"]) > 1 else "", critical_name_string)
result_message = (
"CRITICAL sensorProbe2plus: Sensor reports state CRITICAL for %d sensor%s (%s)"
% (
len(sensor_states["CRITICAL"]),
"s" if len(sensor_states["CRITICAL"]) > 1 else "",
critical_name_string,
)
)
else:
result_message = "OK sensorProbe2plus: Sensor reports that everything is fine"

Expand All @@ -143,13 +156,29 @@ def print_status_message(sensor_states, perf_data):
port = 0

# Arguments for the CLI command
parser = argparse.ArgumentParser(description='Check plugin for AKCP SensorProbe2+')
parser = argparse.ArgumentParser(description="Check plugin for AKCP SensorProbe2+")
parser.add_argument("-V", "--version", action="store_true")
parser.add_argument("-v", "--verbose", action="count", default=0, help="increase output verbosity (-v or -vv)")
parser.add_argument("-p", "--port", help="port of the sensors to check (shows all if not set)", type=int, default=0)
required = parser.add_argument_group('required arguments')
required.add_argument("-H", "--hostname", help="host of the sensor probe", required=True)
required.add_argument("-C", "--community", help="read community of the sensor probe", required=True)
parser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="increase output verbosity (-v or -vv)",
)
parser.add_argument(
"-p",
"--port",
help="port of the sensors to check (shows all if not set)",
type=int,
default=0,
)
required = parser.add_argument_group("required arguments")
required.add_argument(
"-H", "--hostname", help="host of the sensor probe", required=True
)
required.add_argument(
"-C", "--community", help="read community of the sensor probe", required=True
)

args = parser.parse_args()

Expand All @@ -174,26 +203,35 @@ def print_status_message(sensor_states, perf_data):
perfData = []

# Root for sensor dictionary tree
sensorPorts = {}
sensorPorts: dict[int, typing.Any] = {}

# Root for sensor OIDs
sensorsOID = (1, 3, 6, 1, 4, 1, 3854, 3, 5)

generator = cmdgen.CommandGenerator()
communityData = cmdgen.CommunityData(community)
transport = cmdgen.UdpTransportTarget((hostname, 161))
command = getattr(generator, 'nextCmd')
command = getattr(generator, "nextCmd")

errorIndication, errorStatus, errorIndex, result = command(communityData, transport, sensorsOID)
errorIndication, errorStatus, errorIndex, result = command(
communityData, transport, sensorsOID
)

# Check if an exception occurred
if errorIndication:
print("%s sensorProbe2plus: %s" % (NagiosState.UNKNOWN.name, errorIndication))
mostImportantState = NagiosState.UNKNOWN
elif errorStatus:
print(('%s sensorProbe2plus: %s at %s' % (NagiosState.CRITICAL.name,
errorStatus.prettyPrint(),
errorIndex and result[int(errorIndex)-1] or '?')))
print(
(
"%s sensorProbe2plus: %s at %s"
% (
NagiosState.CRITICAL.name,
errorStatus.prettyPrint(),
errorIndex and result[int(errorIndex) - 1] or "?",
)
)
)
mostImportantState = NagiosState.CRITICAL
else:
# Sort results
Expand All @@ -205,7 +243,7 @@ def print_status_message(sensor_states, perf_data):
valueIndex = int(oid[11])
try:
Types(valueIndex)
except ValueError as err:
except ValueError:
continue

# Get sensor category
Expand Down Expand Up @@ -233,11 +271,19 @@ def print_status_message(sensor_states, perf_data):

# Check if there is no sensor on the given port
if len(sensorPorts) < 1:
print("%s sensorProbe2plus: There is no sensor on the given port" % NagiosState.UNKNOWN.name)
print(
"%s sensorProbe2plus: There is no sensor on the given port"
% NagiosState.UNKNOWN.name
)
sys.exit(NagiosState.UNKNOWN.value)

# Sensor names sorted by state
namesByState = {"OK": [], "WARNING": [], "CRITICAL": [], "UNKNOWN": []}
namesByState: dict[str, list] = {
"OK": [],
"WARNING": [],
"CRITICAL": [],
"UNKNOWN": [],
}

# Iterate through sensors
for sensorPort, sensorIndexes in sensorPorts.items():
Expand All @@ -246,7 +292,7 @@ def print_status_message(sensor_states, perf_data):
state = convert_state_to_nagios(valueIndexes[Types.STATE])

# Redetermines most important state
if hasattr(state, 'value') and state.value > mostImportantState.value:
if hasattr(state, "value") and state.value > mostImportantState.value:
mostImportantState = state
else:
mostImportantState = NagiosState.UNKNOWN
Expand All @@ -260,44 +306,60 @@ def print_status_message(sensor_states, perf_data):
value = 0 if state == NagiosState.OK else 1

# Status message for sensor
stateMessages.append(
"%s %s" % (state.name, valueIndexes[Types.NAME]))
stateMessages.append("%s %s" % (state.name, valueIndexes[Types.NAME]))

# Add performance data to performance data array
perfData.append("'%s'=%s;" % (valueIndexes[Types.NAME], value))
else:
# Convert temperatures into right format
if valueIndexes[Types.UNIT] == "C":
valueIndexes[Types.VALUE] = float(valueIndexes[Types.VALUE]) / 10
valueIndexes[Types.LOW_CRITICAL] = float(valueIndexes[Types.LOW_CRITICAL]) / 10
valueIndexes[Types.LOW_WARNING] = float(valueIndexes[Types.LOW_WARNING]) / 10
valueIndexes[Types.HIGH_WARNING] = float(valueIndexes[Types.HIGH_WARNING]) / 10
valueIndexes[Types.HIGH_CRITICAL] = float(valueIndexes[Types.HIGH_CRITICAL]) / 10
valueIndexes[Types.LOW_CRITICAL] = (
float(valueIndexes[Types.LOW_CRITICAL]) / 10
)
valueIndexes[Types.LOW_WARNING] = (
float(valueIndexes[Types.LOW_WARNING]) / 10
)
valueIndexes[Types.HIGH_WARNING] = (
float(valueIndexes[Types.HIGH_WARNING]) / 10
)
valueIndexes[Types.HIGH_CRITICAL] = (
float(valueIndexes[Types.HIGH_CRITICAL]) / 10
)

# Status message for sensor
stateMessage = '%s %s sensor "%s": %s%s' % (state.name,
valueIndexes[Types.CATEGORY],
valueIndexes[Types.NAME],
valueIndexes[Types.VALUE],
valueIndexes[Types.UNIT])
stateMessage = '%s %s sensor "%s": %s%s' % (
state.name,
valueIndexes[Types.CATEGORY],
valueIndexes[Types.NAME],
valueIndexes[Types.VALUE],
valueIndexes[Types.UNIT],
)

# Add thresholds to verbose sensor messages
if verbose > 1:
stateMessage += " (%s:%s/%s:%s)" % (valueIndexes[Types.LOW_WARNING],
valueIndexes[Types.HIGH_WARNING],
valueIndexes[Types.LOW_CRITICAL],
valueIndexes[Types.HIGH_CRITICAL])
stateMessage += " (%s:%s/%s:%s)" % (
valueIndexes[Types.LOW_WARNING],
valueIndexes[Types.HIGH_WARNING],
valueIndexes[Types.LOW_CRITICAL],
valueIndexes[Types.HIGH_CRITICAL],
)

stateMessages.append(stateMessage)

# Add performance data to performance data array
perfData.append("'%s'=%s%s;%s:%s;%s:%s" % (valueIndexes[Types.NAME],
valueIndexes[Types.VALUE],
valueIndexes[Types.UNIT],
valueIndexes[Types.LOW_WARNING],
valueIndexes[Types.HIGH_WARNING],
valueIndexes[Types.LOW_CRITICAL],
valueIndexes[Types.HIGH_CRITICAL]))
perfData.append(
"'%s'=%s%s;%s:%s;%s:%s"
% (
valueIndexes[Types.NAME],
valueIndexes[Types.VALUE],
valueIndexes[Types.UNIT],
valueIndexes[Types.LOW_WARNING],
valueIndexes[Types.HIGH_WARNING],
valueIndexes[Types.LOW_CRITICAL],
valueIndexes[Types.HIGH_CRITICAL],
)
)

# Print first line of output
print_status_message(namesByState, perfData)
Expand Down