#!/usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author:  Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
#          https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.

# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.md

"""See the check's README for more details."""

import argparse
import fnmatch
import re
import sys

import lib.args
import lib.base
import lib.human
import lib.lftest
import lib.shell
import lib.time
import lib.txt
from lib.globals import STATE_OK, STATE_UNKNOWN, STATE_WARN

__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2026050801'

DESCRIPTION = """Checks for failed systemd units. Alerts when any unit is in a failed state. Specific
units can be excluded from the check via --ignore with regular expressions. When no unit
is currently failed, reports the most recent system unit-failed event from the current
boot's journal so operators see at a glance how long the host has been clean since the
last reboot."""

DEFAULT_IGNORE = []


def parse_args():
    """Parse command line arguments using argparse."""
    parser = argparse.ArgumentParser(description=DESCRIPTION)

    parser.add_argument(
        '-V',
        '--version',
        action='version',
        version=f'%(prog)s: v{__version__} by {__author__}',
    )

    parser.add_argument(
        '--always-ok',
        help=lib.args.help('--always-ok'),
        dest='ALWAYS_OK',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '--ignore',
        help='Unit name to exclude from the check. '
        'Can be specified multiple times. '
        'Supports glob patterns according to https://docs.python.org/3/library/fnmatch.html. '
        'Example: `--ignore "dhcpd.service"`. '
        'Default: %(default)s',
        dest='IGNORE',
        default=DEFAULT_IGNORE,
        action='append',
    )

    parser.add_argument(
        '--test',
        help=lib.args.help('--test'),
        dest='TEST',
        type=lib.args.csv,
    )

    args, _ = parser.parse_known_args()
    return args


def _unescape_systemd_name(name):
    """Replace systemd's `\\xNN` hex escapes with the original character so
    unit names are human-readable.
    """
    return re.sub(
        r'\\x([0-9a-fA-F]{2})',
        lambda m: bytes.fromhex(m.group(1)).decode('utf-8', errors='replace'),
        name,
    )


_LAST_FAILED_LINE = re.compile(
    r"^(\d+(?:\.\d+)?)\s+\S+\s+systemd\[\d+\]:\s+(.+?):\s+"
    r"Failed with result\s+'([^']+)'"
)


def get_last_failed_unit():
    """Return (unit_name, epoch_seconds, reason) of the most recent
    system-scope unit-failed event recorded in the journal of the
    current boot, or (None, None, None) if journalctl is unavailable,
    returns nothing, or its output cannot be parsed. Restricted to
    PID 1 entries so user-session unit failures do not leak into the
    system-scope check.

    Filters by message text rather than CODE_FUNC or MESSAGE_ID because
    the structured fields are not consistently set on every systemd
    version (RHEL 9 / Rocky 9 hosts at systemd 252 are missing them on
    most of the unit-failed entries). The hardcoded English string is
    safe: systemd's `unit_log_failure()` writes this format string
    verbatim into the journal, regardless of the system locale.

    Scopes to the current boot via `--boot=0` so the scan stays fast
    regardless of journal size, and so 'last failed since reboot' is
    what the admin sees - which is more actionable than 'last failed
    ever in the retained journal'. Reads short-unix output
    (epoch.microseconds prefix) and parses the last matching line
    client-side, because RHEL 9 / Rocky 9 (systemd 252) misorder
    `--grep` against `--lines=1` and produce no JSON output for some
    `--output=json --grep ...` combinations.
    """
    cmd = (
        'journalctl --no-pager --output=short-unix --boot=0 '
        '_PID=1 --grep="Failed with result"'
    )
    success, result = lib.shell.shell_exec(cmd)
    if not success:
        return None, None, None
    stdout, _stderr, retc = result
    if retc != 0 or not stdout:
        return None, None, None
    for line in reversed(stdout.splitlines()):
        match = _LAST_FAILED_LINE.match(line)
        if match:
            break
    else:
        return None, None, None
    timestamp_str, unit, reason = match.groups()
    try:
        epoch = float(timestamp_str)
    except ValueError:
        return None, None, None
    unit = _unescape_systemd_name(unit)
    message = f"Failed with result '{reason}'"
    return unit, epoch, message


def main():
    """The main function. This is where the magic happens."""

    # parse the command line
    try:
        args = parse_args()
    except SystemExit:
        sys.exit(STATE_UNKNOWN)

    # fetch data
    if args.TEST is None:
        # get the values the normal way
        stdout, _stderr, retc = lib.base.coe(
            lib.shell.shell_exec('systemctl --state=failed --no-pager --no-legend')
        )
    else:
        # do not call the command, put in test data
        stdout, _stderr, retc = lib.lftest.test(args.TEST)

    if retc != 0:
        lib.base.cu('systemctl was unable to return any data.')

    msg = 'Everything is ok.'
    table = ''
    state = STATE_OK
    count = 0

    failed_units = stdout.splitlines()
    if len(failed_units) > 0:
        table_data = []
        offset = 0
        for line in failed_units:
            if line.startswith('*'):
                offset = 1
            unit = line.split()
            if any(
                fnmatch.fnmatchcase(unit[0 + offset], ignore) for ignore in args.IGNORE
            ):
                continue
            count += 1
            table_data.append(
                {
                    'unit': unit[0 + offset],
                    'load': unit[1 + offset],
                    'active': unit[2 + offset],
                    'sub': unit[3 + offset],
                    'description': ' '.join(unit[4 + offset :]),
                }
            )
        if count > 0:
            state = STATE_WARN
            unit_names = ', '.join(row['unit'] for row in table_data)
            msg = f'{count} failed {lib.txt.pluralize("unit", count)}: {unit_names}\n'
            table = lib.base.get_table(
                table_data,
                ['unit', 'load', 'active', 'sub', 'description'],
                ['unit', 'load', 'active', 'sub', 'description'],
            )

    # When the system is currently clean, surface the most recent unit-failed
    # journal event so the admin sees how long the host has been quiet.
    # Skipped in --test mode because the journal is host-state, not fixture data.
    if state == STATE_OK and args.TEST is None:
        unit, epoch, message = get_last_failed_unit()
        if unit and epoch:
            delta = max(0, lib.time.now() - epoch)
            ago = lib.human.seconds2human(delta)
            ts = lib.time.epoch2iso(epoch)
            # message ends with '.' from systemd; drop it so the suffix reads cleanly
            message = message.rstrip('.') if message else ''
            msg = (
                f'Everything is ok. Last failed: `{unit}`'
                + (f' with message "{message}"' if message else '')
                + f' at {ts} ({ago} ago)'
            )

    perfdata = lib.base.get_perfdata(
        'systemd-units-failed',
        count,
        warn=1,
        _min=0,
    )

    # over and out
    lib.base.oao(f'{msg}\n\n{table}', state, perfdata, always_ok=args.ALWAYS_OK)


if __name__ == '__main__':
    try:
        main()
    except Exception:
        lib.base.cu()
