Add initial version

This commit is contained in:
Jonny007-MKD 2025-05-15 14:17:22 +01:00
commit 9f08367de6

436
check_iperf3.py Executable file
View file

@ -0,0 +1,436 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This module provides a Nagios plugin to run and evaluate network performance tests using iperf3.
The script executes an iperf3 test, evaluates the results based on user-defined thresholds, and
outputs a formatted Nagios status message along with performance data. The script supports both
TCP and UDP tests, bidirectional testing, and customizable thresholds for throughput and
retransmissions.
Key functions in this module:
- `run_iperf3`: Executes an iperf3 test and returns the parsed JSON output.
- `determine_result`: Analyzes the iperf3 output and determines the Nagios status code and message.
- `build_perfdata`: Constructs the Nagios performance data string based on iperf3 results.
- `check_iperf3`: Orchestrates the test execution, result evaluation, and Nagios output generation.
Command-line interface (CLI):
- The script expects command-line arguments to configure the iperf3 test. These arguments are parsed
into an `options` object, which is then passed through the testing and evaluation functions.
Usage:
Run this module as a standalone script to initiate a network performance test:
python check_iperf3.py --remote <host> --port <port> --rate-critical <rate_critical> ...
The `options` object is populated with the user-specified parameters, and the `check_iperf3`
function is invoked to execute the test, evaluate the results, and output Nagios-compatible
status and performance data.
Example:
python check_iperf3.py --remote 192.168.1.1 --port 5201 --rate-critical 1000000000 ...
This would run an iperf3 test against the specified remote host and port, evaluating the throughput
against the provided critical and warning thresholds, and outputting the result in Nagios format.
"""
from optparse import OptionParser, OptionGroup
import subprocess
import json
import sys
from typing import Tuple, Optional, List, Union, NoReturn
def nagexit(exit_code: int, statuslines: Union[str, List[str]], perfdata: Optional[List[str]]=None) -> NoReturn:
"""
Exits this Nagios plugin with the appropriate status message and performance data.
This function formats and prints a Nagios-compatible status line and exits the
script with the provided exit code. The exit code must be one of the standard
Nagios status codes:
0 - OK
1 - WARNING
2 - CRITICAL
3 - UNKNOWN
Args:
exit_code (int): The exit code indicating the plugin result. Must be 0, 1, 2, or 3.
statuslines (str | List[str]): The human-readable status message(s). If a list
is provided, each item will be printed on a new line.
perfdata (List, optional): A list of performance data strings to be appended
in Nagios performance data format. Defaults to None.
Returns:
NoReturn: This function does not return; it exits the program using sys.exit().
"""
status_code = {0: 'OK', 1: 'WARNING',
2: 'CRITICAL', 3: 'UNKNOWN'}[exit_code]
status = statuslines if isinstance(statuslines, str) else "\n".join(statuslines)
perf = "|" + ("; ".join(perfdata)) if perfdata else ""
if status:
print(f"{status_code}: {status}{perf}")
else:
print(f"{status_code}{perf}")
sys.exit(exit_code)
def make_iperf_cmdline(options) -> List[str]:
"""
Constructs the command-line arguments for an iperf3 network performance test.
This function generates a list of command-line arguments for running `iperf3`
based on the provided options. It supports TCP/UDP, bidirectional or reverse
tests, and can specify either a data limit in bytes or a time duration.
Args:
options: An object containing the following attributes:
- remote (str): The target hostname or IP address of the iperf3 server.
- port (int): The port number to connect to on the server.
- bidir (bool): If True, run a bidirectional test (overrides downstream).
- downstream (bool): If True, run a reverse test (server to client).
- udp (bool): If True, use UDP instead of TCP.
- bytes (str): Optional. Amount of data to transfer (e.g., "10M").
- time (int): Duration of the test in seconds (used if `bytes` is not set).
Returns:
List[str]: A list of command-line arguments for iperf3.
"""
params = [
"iperf3",
"--json",
"--client", options.remote,
"--port", str(options.port),
"--connect-timeout", "10000",
]
if options.bidir:
params.append("--bidir")
elif options.downstream:
params.append("--reverse")
if options.udp:
params.append("--udp")
if options.bytes:
params += ["--bytes", options.bytes]
else:
params += ["--time", str(options.time)]
return params
def run_iperf3(options) -> Union[dict, NoReturn]:
"""
Executes an iperf3 test using the given options and returns the parsed JSON result.
This function builds the iperf3 command line using `make_iperf_cmdline(options)`,
executes the command, and returns the resulting output as a parsed JSON dictionary.
If the iperf3 executable is not found, or the command fails, it exits the program
with an appropriate Nagios status using `nagexit`.
Args:
options: An object containing iperf3 options. Must be compatible with
`make_iperf_cmdline()` (i.e., have attributes like `remote`, `port`,
`bidir`, `downstream`, `udp`, `bytes`, and `time`).
Returns:
dict: The parsed JSON output from iperf3 if the command succeeds.
Exits:
Exits with code 3 (UNKNOWN) if iperf3 is not found or if the command fails.
"""
try:
params = make_iperf_cmdline(options)
raw_data = subprocess.check_output(
params,
stderr=subprocess.STDOUT,
universal_newlines=True
)
return json.loads(raw_data)
except OSError as e:
if e.errno == 2:
nagexit(3, "cannot find iperf3")
except subprocess.CalledProcessError as e:
nagexit(3, e.output)
nagexit(3, "what happended?")
def determine_result(options, json_data) -> Tuple[int, Optional[List[str]]]:
"""
Evaluates iperf3 test results against performance thresholds and determines the Nagios status.
This function checks the iperf3 output (`json_data`) for errors and compares metrics like
transfer rate and retransmissions against critical and warning thresholds provided in `options`.
If an error is found in the iperf3 output, the function exits with a Nagios UNKNOWN state.
Args:
options: An object with threshold attributes used for evaluation.
Expected attributes include:
- rate_warn (int): Warning threshold for bits per second.
- rate_crit (int): Critical threshold for bits per second.
- retrans_warn (int): Warning threshold for retransmissions (TCP only).
- retrans_crit (int): Critical threshold for retransmissions (TCP only).
- udp (bool): Whether the test was run over UDP (retransmissions are ignored if True).
json_data (dict): Parsed JSON output from an iperf3 test.
Returns:
Tuple[int, Optional[List[str]]]:
- An integer exit code for Nagios (0 = OK, 1 = WARNING, 2 = CRITICAL).
- A list of status messages if thresholds were breached, or an empty list if all checks
passed.
Exits:
Exits with code 3 (UNKNOWN) if an "error" key is present in the iperf3 output.
"""
rc = 0
statuslines = []
if "error" in json_data:
nagexit(3, json_data["error"])
if options.rate_crit and options.rate_warn:
if json_data["end"]["sum_sent"]["bits_per_second"] <= options.rate_crit:
rc = max(rc, 2)
statuslines.append("transfer rate below critical threshold")
elif json_data["end"]["sum_sent"]["bits_per_second"] <= options.rate_warn:
rc = max(rc, 1)
statuslines.append("transfer rate below warning threshold")
if not options.udp and options.retrans_crit and options.retrans_warn:
if json_data["end"]["sum_sent"]["retransmits"] >= options.retrans_crit:
rc = max(rc, 2)
statuslines.append("retransmissions over critical threshold")
elif json_data["end"]["sum_sent"]["retransmits"] >= options.retrans_warn:
rc = max(rc, 1)
statuslines.append("retransmissions over warning threshold")
return rc, statuslines
def build_single_perfdata(key: str, value, limit_warn=None, limit_crit=None) -> str:
"""
Builds a single Nagios performance data string for a metric.
This function formats a key-value pair into a Nagios-compatible performance data
string, optionally including warning and critical thresholds.
Args:
key (str): The name of the metric (e.g., 'throughput').
value: The current value of the metric. Typically numeric.
limit_warn (optional): The warning threshold for the metric. Defaults to None.
limit_crit (optional): The critical threshold for the metric. Defaults to None.
Returns:
str: A formatted performance data string suitable for use in Nagios plugins,
e.g., "'throughput'=500;400;600".
"""
if limit_warn is None and limit_crit is None:
return f"'{key}'={value}"
return f"'{key}'={value};{limit_warn or ''};{limit_crit or ''}"
def bits_per_second(bps: float) -> str:
"""
Formats a bits-per-second value as a string with a 'b' suffix.
If the value is greater than 1000, it is rounded to the nearest whole number
before formatting.
Args:
bps (float): The bits-per-second value to format.
Returns:
str: The formatted string with a 'b' suffix, e.g., "950.1234b" or "1200b".
"""
if bps > 1000:
bps = round(bps)
return f"{bps}b"
def packets(pkts: int) -> str:
"""
Formats a packet count as a string with the 'packets' suffix.
Args:
pkts (int): The number of packets.
Returns:
str: A string representing the packet count, e.g., "123packets".
"""
return f"{pkts}packets"
def percent(prcnt: float) -> str:
"""
Formats a float value as a percentage string.
Args:
prcnt (float): The percentage value to format.
Returns:
str: The formatted percentage string with a '%' suffix, e.g., "99.5%".
"""
return f"{prcnt}%"
def build_perfdata(options, json_data: dict) -> List[str]:
"""
Constructs a list of Nagios performance data strings based on iperf3 test results.
This function extracts key metrics from the iperf3 JSON output, including per-interval
throughput, average throughput (upstream and/or downstream), retransmissions (for TCP),
and CPU utilization. It uses `build_single_perfdata()` to format each data point for
inclusion in a Nagios plugin's performance data output.
Args:
options: An object containing test configuration and threshold values. Expected attributes:
- bidir (bool): Whether the test was bidirectional.
- downstream (bool): Whether to use reverse (download) direction.
- udp (bool): Whether the test used UDP (retransmissions ignored).
- rate_warn (int): Warning threshold for throughput (in bits/sec).
- rate_crit (int): Critical threshold for throughput.
- retrans_warn (int): Warning threshold for retransmissions (TCP only).
- retrans_crit (int): Critical threshold for retransmissions.
json_data (dict): Parsed iperf3 output containing measurement results.
Returns:
List[str]: A list of performance data strings formatted for Nagios output.
"""
perfdata = [
build_single_perfdata(
f"bps{i+1}", bits_per_second(intv['sum']['bits_per_second']))
for i, intv in enumerate(json_data["intervals"])
]
json_end = json_data['end']
bps_avg_up = json_end['sum_sent']['bits_per_second'] if options.bidir or not options.downstream else None
bps_avg_down = json_end['sum_sent_bidir_reverse']['bits_per_second'] if options.bidir else json_data[
'end']['sum_sent']['bits_per_second'] if options.downstream else None
if not options.udp:
retrans_sum_up = json_end['sum_sent']['retransmits'] if options.bidir or not options.downstream else None
retrans_sum_down = json_end['sum_sent_bidir_reverse']['retransmits'] if options.bidir else json_data[
'end']['sum_sent']['retransmits'] if options.downstream else None
if bps_avg_up is not None:
perfdata.append(build_single_perfdata("bps_avg_up", bits_per_second(
bps_avg_up), options.rate_warn, options.rate_crit))
if bps_avg_down is not None:
perfdata.append(build_single_perfdata("bps_avg_down", bits_per_second(
bps_avg_down), options.rate_warn, options.rate_crit))
if not options.udp:
if retrans_sum_up is not None:
perfdata.append(build_single_perfdata("retrans_sum_up", packets(
retrans_sum_up), options.retrans_warn, options.retrans_crit))
if retrans_sum_down is not None:
perfdata.append(build_single_perfdata("retrans_sum_down", packets(
retrans_sum_down), options.retrans_warn, options.retrans_crit))
perfdata.append(build_single_perfdata("local_cpu", percent(
json_end['cpu_utilization_percent']['host_total'])))
perfdata.append(build_single_perfdata("remote_cpu", percent(
json_end['cpu_utilization_percent']['remote_total'])))
return perfdata
def check_iperf3(options):
"""
Executes an iperf3 test, evaluates the results, and outputs Nagios status and performance data.
This function orchestrates the process of running an iperf3 test with the specified options,
parsing the JSON output, and evaluating the results against user-defined thresholds. Based on
the evaluation, it outputs a Nagios status message (OK, WARNING, CRITICAL, or UNKNOWN) and
performance data. It then exits the program with the appropriate exit code.
Args:
options: An object containing configuration options for the iperf3 test. Expected attributes
include:
- remote (str): The target iperf3 server's hostname or IP address.
- port (int): The port number to use for the test.
- bidir (bool): Whether the test should be bidirectional.
- downstream (bool): Whether the test should use reverse mode (server to client).
- udp (bool): Whether the test should use UDP instead of TCP.
- rate_warn (int): Warning threshold for throughput (in bits/sec).
- rate_crit (int): Critical threshold for throughput.
- retrans_warn (int): Warning threshold for retransmissions (TCP only).
- retrans_crit (int): Critical threshold for retransmissions (TCP only).
- bytes (str): Optional. Data to transfer (e.g., '10M').
- time (int): Duration of the test in seconds.
Exits:
Exits the program with the appropriate Nagios status code (0 for OK, 1 for WARNING,
2 for CRITICAL, 3 for UNKNOWN) and prints the relevant status message and performance data.
"""
json_data = run_iperf3(options)
rc, statuslines = determine_result(options, json_data)
perfdata = build_perfdata(options, json_data)
nagexit(rc, statuslines, perfdata)
if __name__ == "__main__":
DESC = "%prog is used to run an iperf3 check against a given host."
parser = OptionParser(description=DESC, version="%prog version 0.2")
gen_opts = OptionGroup(parser, "Generic options")
thres_opts = OptionGroup(parser, "Threshold options")
parser.add_option_group(gen_opts)
parser.add_option_group(thres_opts)
# transfer rate
thres_opts.add_option("-w", "--rate-warning", dest="rate_warn",
type="int", metavar="BITS", action="store",
help="Defines the transfer rate's warning threshold")
thres_opts.add_option("-c", "--rate-critical", dest="rate_crit",
type="int", metavar="BITS", action="store",
help="Defines the transfer rate's critical threshold")
# retransmits
thres_opts.add_option("-W", "--retransmit-warning", dest="retrans_warn",
type="int", metavar="RETRANS", action="store",
help="Defines the retransmission warning threshold")
thres_opts.add_option("-C", "--retransmit-critical", dest="retrans_crit",
type="int", metavar="RETRANS", action="store",
help="Defines the retransmission critical threshold")
# -r / --remote
gen_opts.add_option("-r", "--remote", dest="remote",
type="string", action="store",
help="iperf3 server to connect to")
# -p / --port
gen_opts.add_option("-p", "--port", dest="port",
type="int", action="store", default=5201,
help="iperf3 server port to connect to [default: %default]")
# -d / --downstream
gen_opts.add_option("-d", "--downstream", dest="downstream",
action="store_true", default=False,
help="measure downstream instead of upstream")
# --bidir
gen_opts.add_option("--bidir", dest="bidir",
action="store_true", default=False,
help="test in both directions (normal and reverse), with both the client and server sending and receiving data simultaneously")
# -u / --udp
gen_opts.add_option("-u", "--udp", dest="udp",
action="store_true", default=False,
help="use UDP rather than TCP")
# -t / --time
gen_opts.add_option("-t", "--time", dest="time",
type="int", action="store", default=10,
help="time in seconds to transmit for [default: %default]")
# -n / --bytes
gen_opts.add_option("-n", "--bytes", dest="bytes",
type="string", action="store",
help="number of bytes to transmit (instead of --time)")
(opts, args) = parser.parse_args()
if not opts.remote or opts.time <= 0:
parser.print_help()
sys.exit(3)
check_iperf3(opts)