#!/usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author:  Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
#          https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.

# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.md

"""See the check's README for more details."""

import argparse
import concurrent.futures
import os
import re
import sys

import lib.args
import lib.base
import lib.disk
import lib.lftest
import lib.net
import lib.shell
import lib.ssh
import lib.txt
from lib.globals import STATE_OK, STATE_UNKNOWN, STATE_WARN

__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2026060601'

DESCRIPTION = """Runs a full security audit across the hosts of a subnet and reports each host's
hardening posture. From a single management host it discovers the targets (the subnet of the
default interface, a chosen interface, or an explicit host list), connects to each one over SSH,
copies a self-contained copy of the audit tool over, runs a privileged system audit (root via
password-less sudo by default), retrieves the machine-readable report, and removes its temporary
files. A host that does not answer within the connect timeout is skipped. The check is meant to run
at most once per day; the worst per-host result determines the overall state. Security posture is
informational drift rather than a time-critical availability event, so by default only WARNING is
raised."""

DEFAULT_AUDIT_TIMEOUT = 600
DEFAULT_CONNECT_TIMEOUT = 3
DEFAULT_CRIT = ''
DEFAULT_LENGTHY = False
DEFAULT_MAX_WORKERS = 10
DEFAULT_PORT = 22
DEFAULT_WARN = '65:'

# Where lynis writes its output on the target host when run privileged. The
# report is left in place after the audit so an admin can inspect the details.
REMOTE_CUSTOM_PROFILE = '/etc/lynis/custom.prf'
REMOTE_LOG = '/var/log/lynis.log'
REMOTE_PID = '/var/run/lynis.pid'
REMOTE_REPORT = '/var/log/lynis-report.dat'

# Per-run remote work directory. lynis is a script that must be executed, so the
# directory has to live on a partition mounted without "noexec". Hardened hosts
# (exactly the ones being audited) usually mount /var/tmp and /tmp noexec, so the
# candidates are probed in order and the first one that can actually execute a
# file is used. The name is also used to validate the path before any removal.
REMOTE_WORKDIR_NAME = 'linuxfabrik-monitoring-plugins-lynis'
REMOTE_WORKDIR_BASES = ['/var/tmp', '/tmp', '"$HOME"', '/root', '/']


def parse_args():
    """Parse command line arguments using argparse."""
    parser = argparse.ArgumentParser(description=DESCRIPTION)

    parser.add_argument(
        '-V',
        '--version',
        action='version',
        version=f'%(prog)s: v{__version__} by {__author__}',
    )

    parser.add_argument(
        '--always-ok',
        help=lib.args.help('--always-ok'),
        dest='ALWAYS_OK',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '--audit-timeout',
        help='Seconds to wait for the remote audit of a single host to finish. '
        'Default: %(default)s (seconds)',
        dest='AUDIT_TIMEOUT',
        type=int,
        default=DEFAULT_AUDIT_TIMEOUT,
    )

    parser.add_argument(
        '--configfile',
        help='SSH: Alternative per-user configuration file. If a configuration '
        'file is given on the command line, the system-wide '
        'configuration file (`/etc/ssh/ssh_config`) will be ignored. The '
        'default for the per-user configuration file is `~/.ssh/config`. If '
        'set to `none`, no configuration files will be read.',
        dest='CONFIGFILE',
    )

    parser.add_argument(
        '--connect-timeout',
        help='Seconds to wait for the SSH connection to a target host before '
        'skipping it. '
        'Default: %(default)s (seconds)',
        dest='CONNECT_TIMEOUT',
        type=int,
        default=DEFAULT_CONNECT_TIMEOUT,
    )

    # Threshold parameters: use type=str (not int/float) to support Nagios
    # range expressions like "65:", "~:50", "@10:20".
    # In main(), pass _operator='range' to lib.base.get_state().
    parser.add_argument(
        '-c',
        '--critical',
        help='CRIT threshold for the per-host hardening index (0-100). '
        'Supports Nagios ranges. '
        'Empty by default, because a daily hardening scan is posture drift, '
        'not a time-critical event that should page someone at night. '
        'Default: %(default)s (no critical)',
        dest='CRIT',
        default=DEFAULT_CRIT,
    )

    parser.add_argument(
        '--disable-pseudo-terminal',
        help='SSH: Disable pseudo-terminal allocation.',
        dest='DISABLE_PSEUDO_TERMINAL',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '-H',
        '--host',
        help='Target host to audit. Overrides subnet auto-discovery. '
        'Can be specified multiple times. '
        'If not specified, the subnet of the default interface is scanned.',
        dest='HOST',
        action='append',
        default=None,
    )

    parser.add_argument(
        '--identity',
        help='SSH: File from which the identity (private key) for public '
        'key authentication is read. You can also specify a public key '
        'file to use the corresponding private key that is loaded in '
        'ssh-agent(1) when the private key file is not present locally. '
        'The default is `~/.ssh/id_dsa`, `~/.ssh/id_ecdsa`, '
        '`~/.ssh/id_ecdsa_sk`, `~/.ssh/id_ed25519`, `~/.ssh/id_ed25519_sk` and '
        '`~/.ssh/id_rsa`. Identity files may also be specified on a per-'
        'host basis in the configuration file. It is possible to have '
        'multiple --identity options (and multiple identities specified in configuration '
        'files). If no certificates have been explicitly specified by the '
        'CertificateFile directive, ssh will also try to load '
        'certificate information from the filename obtained by appending '
        '`-cert.pub` to identity filenames.',
        dest='IDENTITY',
        action='append',
    )

    parser.add_argument(
        '--interface',
        help='Network interface whose subnet is scanned. '
        'Ignored when --host is given. '
        'If not specified, the default interface (the one carrying the '
        'default route) is used.',
        dest='INTERFACE',
        default=None,
    )

    parser.add_argument(
        '--ipv4',
        help='SSH: Forces ssh to use IPv4 addresses only.',
        dest='IPV4',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '--ipv6',
        help='SSH: Forces ssh to use IPv6 addresses only.',
        dest='IPV6',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '--lengthy',
        help=lib.args.help('--lengthy'),
        dest='LENGTHY',
        action='store_true',
        default=DEFAULT_LENGTHY,
    )

    parser.add_argument(
        '--lynis-auditor',
        help='Name of the auditor to record in the report (lynis `--auditor`). '
        'If not specified, lynis uses its own default.',
        dest='LYNIS_AUDITOR',
        default=None,
    )

    parser.add_argument(
        '--lynis-option',
        help='Additional raw option to pass to the remote `lynis audit system` '
        'call, for options that have no dedicated parameter here. '
        'Can be specified multiple times. '
        'Example: `--lynis-option=--no-plugins`',
        dest='LYNIS_OPTION',
        action='append',
        default=None,
    )

    parser.add_argument(
        '--lynis-profile',
        help='Profile file to use for the audit (lynis `--profile`). The path '
        'is resolved on the target host. '
        'If not specified, lynis uses the bundled default profile.',
        dest='LYNIS_PROFILE',
        default=None,
    )

    parser.add_argument(
        '--lynis-skip-test',
        help='Lynis test ID to skip on every target (injected as `skip-test` '
        'into the pushed profile), for fleet-wide exceptions controlled from the '
        'monitoring configuration. '
        'Can be specified multiple times. '
        "Host-specific exceptions belong in the target's own "
        '`/etc/lynis/custom.prf` instead. '
        'Example: `--lynis-skip-test=MAIL-8818`',
        dest='LYNIS_SKIP_TEST',
        action='append',
        default=None,
    )

    parser.add_argument(
        '--lynis-source',
        help='Path to a self-contained lynis directory (the directory that '
        'contains the `lynis` executable next to its `include`, `db` and '
        '`plugins` subdirectories). This is the copy that gets pushed to and '
        'run on every target host. '
        'If not specified, a self-contained copy is assembled from the local '
        'lynis installation.',
        dest='LYNIS_SOURCE',
        default=None,
    )

    parser.add_argument(
        '--lynis-test',
        help='Only run these lynis tests (lynis `--tests`). '
        'Can be specified multiple times. '
        'If not specified, all tests are run. '
        'Example: `--lynis-test=SSH-7408 --lynis-test=KRNL-5820`',
        dest='LYNIS_TEST',
        action='append',
        default=None,
    )

    parser.add_argument(
        '--lynis-test-category',
        help='Only run lynis tests of these categories (lynis '
        '`--tests-from-category`). '
        'Can be specified multiple times. '
        'If not specified, all categories are run. '
        'Example: `--lynis-test-category=security`',
        dest='LYNIS_TEST_CATEGORY',
        action='append',
        default=None,
    )

    parser.add_argument(
        '--lynis-test-group',
        help='Only run lynis tests of these groups (lynis `--tests-from-group`). '
        'Can be specified multiple times. '
        'If not specified, all groups are run. '
        'Example: `--lynis-test-group=ssh --lynis-test-group=kernel`',
        dest='LYNIS_TEST_GROUP',
        action='append',
        default=None,
    )

    parser.add_argument(
        '--max-workers',
        help='Maximum number of hosts to audit in parallel. Default: %(default)s',
        dest='MAX_WORKERS',
        type=int,
        default=DEFAULT_MAX_WORKERS,
    )

    parser.add_argument(
        '--network',
        help='Network in CIDR notation to scan for targets via auto-discovery. '
        'Can be specified multiple times. '
        'Takes precedence over --interface. '
        'Example: `--network=192.0.2.0/24`',
        dest='NETWORK',
        action='append',
        default=None,
    )

    parser.add_argument(
        '-p',
        '--password',
        help='SSH: Password authentication. NOT RECOMMENDED. Requires `sshpass`. '
        'If you need to use password-based SSH login, run this plugin only on trusted hosts. '
        '`ps` will expose the SSH password.',
        dest='PASSWORD',
        default=None,
    )

    parser.add_argument(
        '--port',
        help='SSH: Port to connect to on the remote host. This can be specified on '
        'a per-host basis in the configuration file. Default: %(default)s',
        dest='PORT',
        default=DEFAULT_PORT,
    )

    parser.add_argument(
        '--quiet',
        help='SSH: Quiet mode. Causes most warning and diagnostic messages to be '
        'suppressed.',
        dest='QUIET',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '--ssh-option',
        help='SSH: Can be used to give options in the format used in the configuration file. '
        'This is useful for specifying options for which there '
        'is no separate command-line flag. For full details of the options, '
        'and their possible values, see ssh_config(5). Can be specified multiple times.',
        dest='SSH_OPTION',
        action='append',
    )

    parser.add_argument(
        '--test',
        help=lib.args.help('--test'),
        dest='TEST',
        type=lib.args.csv,
    )

    parser.add_argument(
        '-u',
        '--username',
        help='SSH: Username. '
        'If not specified, ssh determines the user from `~/.ssh/config` or '
        'falls back to the current local user.',
        dest='USERNAME',
        default=None,
    )

    parser.add_argument(
        '--verbose',
        help=lib.args.help('--verbose') + ' Default: %(default)s',
        dest='VERBOSE',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '-w',
        '--warning',
        help='WARN threshold for the per-host hardening index (0-100). '
        'Supports Nagios ranges. '
        'The default alerts when the index drops below 65. '
        'Default: %(default)s',
        dest='WARN',
        default=DEFAULT_WARN,
    )

    args, _ = parser.parse_known_args()
    return args


def verbose(args, msg):
    """Print a progress message when --verbose is set, so the admin can see what
    the plugin is doing during a long multi-host run. Only used for debugging.
    """
    if args.VERBOSE:
        print(msg)


def build_ssh_opts(args):
    """Assemble the common ssh/scp option string via lib.ssh, adding the
    non-interactive options this plugin needs (BatchMode, LogLevel,
    ConnectTimeout) on top of the by-ssh-compatible parameters.
    """
    return lib.ssh.build_options(
        configfile=args.CONFIGFILE,
        identity=args.IDENTITY,
        ssh_option=args.SSH_OPTION,
        ipv4=args.IPV4,
        ipv6=args.IPV6,
        quiet=args.QUIET,
        batch_mode=True,
        connect_timeout=args.CONNECT_TIMEOUT,
        log_level='ERROR',
    )


def ssh_exec(host, args, ssh_opts, remote, timeout=None):
    """Run `remote` on `host` over SSH (thin wrapper around lib.ssh.run that
    fills in this plugin's connection parameters).
    """
    return lib.ssh.run(
        host,
        remote,
        username=args.USERNAME,
        port=args.PORT,
        options=ssh_opts,
        disable_pseudo_terminal=args.DISABLE_PSEUDO_TERMINAL,
        password=args.PASSWORD,
        timeout=timeout,
    )


def scp_push(host, args, ssh_opts, local, remote, recursive=False):
    """Copy `local` to `remote` on `host` (thin wrapper around lib.ssh.scp)."""
    return lib.ssh.scp(
        host,
        local,
        remote,
        username=args.USERNAME,
        port=args.PORT,
        options=ssh_opts,
        password=args.PASSWORD,
        timeout=args.AUDIT_TIMEOUT,
        recursive=recursive,
    )


def push_lynis(host, args, ssh_opts, base_dir, remote_dir):
    """Copy the local lynis tree to `remote_dir`. Prefer rsync when it is
    available locally (faster for many files; run via sudo so the files land
    root-owned), and fall back to scp -r otherwise or when rsync fails (for
    example when the target has no rsync). Returns (success, (out, err, retc)).
    """
    if lib.shell.which('rsync'):
        verbose(args, f'{host}: pushing lynis via rsync...')
        success, result = lib.ssh.rsync(
            host,
            base_dir,
            remote_dir,
            username=args.USERNAME,
            port=args.PORT,
            options=ssh_opts,
            password=args.PASSWORD,
            timeout=args.AUDIT_TIMEOUT,
            sudo=True,
        )
        if success and result[2] == 0:
            return (True, result)
        verbose(args, f'{host}: rsync unavailable or failed, falling back to scp')
    verbose(args, f'{host}: pushing lynis via scp...')
    return scp_push(host, args, ssh_opts, base_dir, remote_dir, recursive=True)


def build_lynis_command(args):
    """Assemble the remote `./lynis audit system ...` invocation. --cronjob
    runs unattended (quick, quiet, no colors); --usecwd pins the pushed copy so
    a lynis installation already present on the target is ignored.
    """
    # --plugin-dir is given as an absolute path ($PWD is expanded on the target,
    # where the command runs from inside the extracted lynis directory), because
    # lynis resolves a relative plugin directory against its current working
    # directory and aborts once it changes directory during the scan.
    parts = ['./lynis audit system --cronjob --usecwd --plugin-dir "$PWD/plugins"']
    if args.LYNIS_PROFILE:
        parts.append(f'--profile "{args.LYNIS_PROFILE}"')
    if args.LYNIS_AUDITOR:
        parts.append(f'--auditor "{args.LYNIS_AUDITOR}"')
    if args.LYNIS_TEST:
        parts.append(f'--tests "{",".join(args.LYNIS_TEST)}"')
    if args.LYNIS_TEST_GROUP:
        parts.append(f'--tests-from-group "{",".join(args.LYNIS_TEST_GROUP)}"')
    if args.LYNIS_TEST_CATEGORY:
        parts.append(f'--tests-from-category "{",".join(args.LYNIS_TEST_CATEGORY)}"')
    parts.extend(args.LYNIS_OPTION)
    return ' '.join(parts)


def probe_host(host, args, ssh_opts):
    """Attempt an SSH connection to `host` and run a trivial command. Returns:
    - 'ok'   : connected and authenticated, ready to audit
    - 'auth' : host answered on SSH but authentication failed (wrong user/key);
               typical when auto-discovery probes a raw IP that has no matching
               `~/.ssh/config` host alias
    - 'down' : host did not answer (no SSH, filtered, or connect timeout)
    """
    verbose(args, f'{host}: probing SSH connection...')
    success, result = ssh_exec(host, args, ssh_opts, 'true')
    if success and result[2] == 0:
        verbose(args, f'{host}: reachable')
        return 'ok'
    stderr = result[1] if success else str(result)
    if 'permission denied' in stderr.lower():
        verbose(
            args, f'{host}: SSH reachable but authentication failed, skipping audit'
        )
        return 'auth'
    verbose(args, f'{host}: not reachable, skipping')
    return 'down'


def prepare_lynis_source(source):
    """Return (success, result). On success result is (base_dir, tmp_dir):
    `base_dir` is a self-contained lynis directory ready to be archived, and
    `tmp_dir` is a directory to clean up afterwards (or None if `base_dir`
    points at an existing tree that must not be removed).

    If `source` is given it is used as-is. Otherwise a self-contained copy is
    assembled from the local installation (`lynis` executable plus the
    `include`, `db`, `plugins` directories under /usr/share/lynis and the
    default profile), because a packaged install splits the executable from its
    data directories.
    """
    if source:
        if not lib.disk.file_exists(os.path.join(source, 'lynis')):
            return (
                False,
                f'--lynis-source "{source}" is not a self-contained lynis '
                'directory (no ./lynis executable found).',
            )
        return (True, (source, None))

    # autodetect a self-contained tree
    for candidate in ('/usr/local/lynis', '/opt/lynis'):
        if lib.disk.file_exists(
            os.path.join(candidate, 'lynis')
        ) and lib.disk.dir_exists(os.path.join(candidate, 'include')):
            return (True, (candidate, None))

    # assemble from a packaged installation
    executable = lib.shell.which('lynis')
    share = '/usr/share/lynis'
    if not executable or not lib.disk.dir_exists(os.path.join(share, 'include')):
        return (
            False,
            'No local lynis installation found. Install lynis or pass --lynis-source.',
        )

    success, result = lib.disk.make_temp_dir(
        prefix='linuxfabrik-monitoring-plugins-lynis-'
    )
    if not success:
        return (False, result)
    tmp_dir = result
    base_dir = os.path.join(tmp_dir, 'lynis')
    success, error = lib.disk.mkdir(base_dir)
    if not success:
        return (False, error)
    success, error = lib.disk.copy_file(executable, os.path.join(base_dir, 'lynis'))
    if not success:
        return (False, error)
    for subdir in ('include', 'db', 'plugins'):
        src = os.path.join(share, subdir)
        if lib.disk.dir_exists(src):
            success, error = lib.disk.copy_dir(src, os.path.join(base_dir, subdir))
            if not success:
                return (False, error)
    profile = '/etc/lynis/default.prf'
    if lib.disk.file_exists(profile):
        success, error = lib.disk.copy_file(
            profile, os.path.join(base_dir, 'default.prf')
        )
        if not success:
            return (False, error)
    return (True, (base_dir, tmp_dir))


def extract_error(text):
    """Reduce noisy remote output (SSH login banner, ASCII art, generic chatter)
    to the single most relevant error line, so the plugin output stays readable.
    Falls back to a generic message if nothing error-like is found.
    """
    error_regex = re.compile(
        r'(fatal|error|denied|cannot|no such|not found|timed out|timeout'
        r'|failed|sudo:|permission)',
        re.IGNORECASE,
    )
    candidates = [
        line.strip()
        for line in text.splitlines()
        if line.strip() and error_regex.search(line)
    ]
    if candidates:
        return candidates[-1][:200]
    return 'audit produced no report'


def audit_host(host, args, ssh_opts, base_dir, lynis_cmd):
    """Push a self-contained lynis copy to `host`, run a privileged audit, fetch
    the machine-readable report and remove the per-run work directory. The report
    and log are left on the host for the admin to inspect.
    Returns (success, report_text_or_errormessage).
    """
    short_timeout = args.CONNECT_TIMEOUT + 30

    # create a unique per-run work directory on the first candidate partition
    # that can actually execute a file (not mounted noexec). mktemp guarantees a
    # fresh, non-empty path; the path is echoed for the steps below and gets
    # validated before any recursive removal.
    # Best-practice preference order, then every mounted filesystem from
    # /proc/mounts as a fallback, so unusual layouts are still covered. The
    # first candidate that is both writable (mktemp succeeds) and executable
    # (the probe script runs) wins.
    bases = ' '.join(REMOTE_WORKDIR_BASES)
    make_workdir = (
        f'candidates="{bases}";'
        ' mounts=$(cut -d" " -f2 /proc/mounts 2>/dev/null);'
        ' for base in $candidates $mounts; do'
        f' dir=$(mktemp --directory "$base/{REMOTE_WORKDIR_NAME}.XXXXXXXX" 2>/dev/null)'
        ' || continue;'
        ' if printf "#!/bin/sh\\n" > "$dir/.exectest" && chmod +x "$dir/.exectest"'
        ' && "$dir/.exectest" >/dev/null 2>&1;'
        ' then rm --force "$dir/.exectest"; echo "$dir"; exit 0; fi;'
        ' rm --force "$dir/.exectest" 2>/dev/null; rmdir "$dir" 2>/dev/null;'
        ' done; exit 1'
    )
    success, result = ssh_exec(
        host, args, ssh_opts, make_workdir, timeout=short_timeout
    )
    if not success or result[2] != 0:
        return (
            False,
            'no executable (non-noexec) work directory found among '
            f'{", ".join(REMOTE_WORKDIR_BASES)}',
        )
    workdir = result[0].strip().splitlines()[-1].strip() if result[0].strip() else ''
    if not workdir.startswith('/') or f'/{REMOTE_WORKDIR_NAME}.' not in workdir:
        return (False, f'unexpected remote work directory: {workdir!r}')

    # push the self-contained lynis copy into the work directory (rsync if
    # available, else scp -r; neither needs `tar` on the target)
    success, result = push_lynis(host, args, ssh_opts, base_dir, f'{workdir}/lynis')
    if not success:
        return (False, f'push failed: {result}')
    _, push_stderr, push_retc = result
    if push_retc != 0:
        return (False, f'push failed: {push_stderr.strip() or f"retc {push_retc}"}')

    # run the audit. Single files are removed explicitly (never a recursive
    # wipe): the stale PID file would make lynis --cronjob refuse to run, and the
    # previous report/log are cleared so a failing run cannot leave a stale report
    # behind that looks like a fresh success. Because --usecwd makes lynis look
    # for profiles in the current directory only, the target's own custom.prf is
    # staged into the work directory so per-host suppressions (skip-test=...) are
    # honored. chown/chmod run via sudo so they work whether the files were
    # copied in root-owned (rsync --rsync-path) or user-owned (scp); chown also
    # silences lynis' ownership security check.
    # fleet-wide --lynis-skip-test entries are appended to the pushed default.prf
    # (double quotes only: the whole remote command is single-quoted by lib.ssh).
    # Host-specific skips come from the target's own custom.prf staged below;
    # lynis merges both profiles.
    skip_inject = ''
    if args.LYNIS_SKIP_TEST:
        skips = ' '.join(f'"{t}"' for t in args.LYNIS_SKIP_TEST)
        skip_inject = f' ; printf "skip-test=%s\\n" {skips} >> ./default.prf'

    remote_audit = (
        f'sudo --non-interactive rm --force {REMOTE_PID} {REMOTE_REPORT} {REMOTE_LOG}'
        f' ; cd "{workdir}/lynis" || exit 1'
        f' ; sudo --non-interactive cp --force {REMOTE_CUSTOM_PROFILE} ./custom.prf'
        ' 2>/dev/null'
        f'{skip_inject}'
        ' ; sudo --non-interactive chown --recursive 0:0 .'
        ' && sudo --non-interactive chmod +x ./lynis'
        f' && sudo --non-interactive {lynis_cmd}'
    )
    verbose(args, f'{host}: running audit (this can take a few minutes)...')
    success, result = ssh_exec(
        host, args, ssh_opts, remote_audit, timeout=args.AUDIT_TIMEOUT
    )
    audit_stderr = result[1] if success else str(result)

    # fetch the report (root-owned, so cat it via sudo instead of scp)
    verbose(args, f'{host}: fetching report...')
    success, result = ssh_exec(
        host,
        args,
        ssh_opts,
        f'sudo --non-interactive cat {REMOTE_REPORT}',
        timeout=short_timeout,
    )
    report_text = result[0] if success else ''

    # remove the per-run work directory. The path comes from mktemp and was
    # validated above; the shell `case` guard repeats that check so a recursive
    # removal can only ever hit our own work directory, never an empty or
    # unexpected path. Report and log stay on the host for the admin.
    verbose(args, f'{host}: cleaning up...')
    ssh_exec(
        host,
        args,
        ssh_opts,
        f'case "{workdir}" in */{REMOTE_WORKDIR_NAME}.*)'
        f' sudo --non-interactive rm --recursive --force "{workdir}" ;; esac',
        timeout=short_timeout,
    )

    if 'hardening_index=' not in report_text:
        return (False, extract_error(audit_stderr))
    return (True, report_text)


def parse_report(text):
    """Parse a lynis report.dat (key=value, with repeated warning[]/suggestion[]
    entries) into a dict.
    """
    data = {
        'hardening_index': None,
        'warnings': [],
        'suggestions': [],
        'lynis_version': '',
        'os_name': '',
        'hostname': '',
        'ipv4_addresses': [],
        'ipv6_addresses': [],
    }
    for line in text.splitlines():
        key, sep, value = line.partition('=')
        if not sep:
            continue
        if key == 'hardening_index':
            try:
                data['hardening_index'] = int(value)
            except ValueError:
                pass
        elif key == 'warning[]':
            data['warnings'].append(value)
        elif key == 'suggestion[]':
            data['suggestions'].append(value)
        elif key == 'network_ipv4_address[]':
            data['ipv4_addresses'].append(value)
        elif key == 'network_ipv6_address[]':
            data['ipv6_addresses'].append(value)
        elif key in ('lynis_version', 'os_name', 'hostname'):
            data[key] = value
    return data


def report_hostname(report):
    """Return the target's short hostname from its own lynis report (client-side,
    as the audited host sees itself).
    """
    return report['hostname']


def report_ip(report):
    """Return the target's primary, non-loopback IP address from its own lynis
    report (client-side). Prefers IPv4 over IPv6; empty string if none.
    """
    for ip in report['ipv4_addresses']:
        if not ip.startswith('127.'):
            return ip
    for ip in report['ipv6_addresses']:
        if ip != '::1' and not ip.lower().startswith('fe80'):
            return ip
    return ''


def main():
    """The main function. This is where the magic happens."""

    # parse the command line
    try:
        args = parse_args()
    except SystemExit:
        sys.exit(STATE_UNKNOWN)

    # set default values for append parameters that were not specified
    if args.LYNIS_OPTION is None:  # case 1: default to empty list
        args.LYNIS_OPTION = []
    if args.LYNIS_SKIP_TEST is None:  # case 1: default to empty list
        args.LYNIS_SKIP_TEST = []
    if args.LYNIS_TEST is None:  # case 1: default to empty list
        args.LYNIS_TEST = []
    if args.LYNIS_TEST_CATEGORY is None:  # case 1: default to empty list
        args.LYNIS_TEST_CATEGORY = []
    if args.LYNIS_TEST_GROUP is None:  # case 1: default to empty list
        args.LYNIS_TEST_GROUP = []
    # args.HOST, args.IDENTITY, args.NETWORK, args.SSH_OPTION stay None
    # (case 3: None means "not set", which is different from an empty list)

    # fetch data
    results = []
    if args.TEST is None:
        if args.HOST is not None:
            targets = args.HOST
        elif args.NETWORK is not None:
            verbose(args, f'Discovering hosts on {", ".join(args.NETWORK)}...')
            targets = []
            for cidr in args.NETWORK:
                targets.extend(lib.base.coe(lib.net.cidr_to_hosts(cidr)))
        else:
            scope = args.INTERFACE if args.INTERFACE else 'the default interface'
            verbose(args, f'Discovering hosts on {scope}...')
            targets = lib.base.coe(lib.net.get_subnet_hosts(args.INTERFACE))
        verbose(args, f'{len(targets)} target(s) to check')
        ssh_opts = build_ssh_opts(args)
        lynis_cmd = build_lynis_command(args)

        verbose(args, 'Preparing local lynis copy...')
        base_dir, tmp_dir = lib.base.coe(prepare_lynis_source(args.LYNIS_SOURCE))

        def process_host(host):
            """Probe and (if reachable and authenticated) audit one host.
            Returns a (host, success, result) tuple, or None to skip the host.
            """
            status = probe_host(host, args, ssh_opts)
            if status == 'down':
                return None
            if status == 'auth':
                # host is up on SSH but we could not authenticate; surface it
                # instead of silently skipping, so wrong credentials are visible
                return (host, False, 'SSH authentication failed')
            success, result = audit_host(host, args, ssh_opts, base_dir, lynis_cmd)
            if success:
                report = parse_report(result)
                verbose(args, f'{host}: hardening index {report["hardening_index"]}')
            else:
                verbose(args, f'{host}: audit failed ({result})')
            return (host, success, result)

        try:
            # Audit hosts in parallel. map() preserves the input order, so the
            # output table is deterministic regardless of completion order.
            workers = max(1, min(args.MAX_WORKERS, len(targets))) if targets else 1
            with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool:
                for entry in pool.map(process_host, targets):
                    if entry is not None:
                        results.append(entry)
        finally:
            if tmp_dir is not None and lib.disk.dir_exists(tmp_dir):
                lib.disk.rm_dir(tmp_dir)
    else:
        # do not connect, put in test data
        stdout, _, _ = lib.lftest.test(args.TEST)
        targets = ['203.0.113.1']
        results = [('203.0.113.1', True, stdout)]

    # init some vars
    msg = ''
    state = STATE_OK
    perfdata = ''
    table_data = []
    audited = 0
    total_warnings = 0
    total_suggestions = 0

    # analyze data
    # The state marker goes into the last table column on purpose: IcingaWeb
    # replaces "[WARNING]" with an icon and would otherwise break a monospace
    # table where the marked column is not the last one.
    profile_label = args.LYNIS_PROFILE if args.LYNIS_PROFILE else 'default'
    for host, success, result in results:
        if not success:
            # no report, so no client-side hostname/IP; fall back to the target
            host_state = STATE_UNKNOWN
            state = lib.base.get_worst(state, host_state)
            table_data.append(
                {
                    'hostreport': f'{host}:-',
                    'ip': '-',
                    'warnings': '-',
                    'suggestions': '-',
                    'index': '-',
                    'profile': '-',
                    'state': f'{result}{lib.base.state2str(host_state, prefix=" ")}',
                }
            )
            continue
        audited += 1
        report = parse_report(result)
        index = report['hardening_index']
        total_warnings += len(report['warnings'])
        total_suggestions += len(report['suggestions'])
        # hostname and IP come from the target's own report (client-side), not
        # from a lookup on the management host
        hostname = report_hostname(report) or host
        ip = report_ip(report) or '-'
        host_state = lib.base.get_state(index, args.WARN, args.CRIT, _operator='range')
        # Every lynis warning counts: a warning is a concrete finding, not noise.
        # To accept one, suppress it in the host's lynis profile (skip-test=...),
        # not in this plugin. See the plugin README.
        if report['warnings']:
            host_state = lib.base.get_worst(host_state, STATE_WARN)
        state = lib.base.get_worst(state, host_state)
        table_data.append(
            {
                'hostreport': f'{hostname}:{REMOTE_REPORT}',
                'ip': ip,
                'warnings': len(report['warnings']),
                'suggestions': len(report['suggestions']),
                'index': index,
                'profile': profile_label,
                'state': lib.base.state2str(host_state),
            }
        )

    # build the message
    msg += f'{audited}/{len(targets)} {lib.txt.pluralize("host", len(targets))} audited'
    perfdata += lib.base.get_perfdata('hosts_total', len(targets), _min=0)
    perfdata += lib.base.get_perfdata('hosts_audited', audited, _min=0)
    perfdata += lib.base.get_perfdata('warnings', total_warnings, _min=0)
    perfdata += lib.base.get_perfdata('suggestions', total_suggestions, _min=0)

    # build table output
    # State always sits in the last column: IcingaWeb replaces "[WARNING]" with
    # an icon and would otherwise break a monospace table.
    if table_data:
        if args.LENGTHY:
            keys = [
                'hostreport',
                'ip',
                'warnings',
                'suggestions',
                'index',
                'profile',
                'state',
            ]
            headers = [
                'Host:Report',
                'IP',
                'Warn',
                'Sugg',
                'HIdx',
                'Profile',
                'State',
            ]
        else:
            keys = ['hostreport', 'warnings', 'index', 'state']
            headers = ['Host:Report', 'Warn', 'HIdx', 'State']
        msg += '\n\n' + lib.base.get_table(table_data, keys, header=headers)

    # over and out
    lib.base.oao(msg, state, perfdata, always_ok=args.ALWAYS_OK)


if __name__ == '__main__':
    try:
        main()
    except Exception:
        lib.base.cu()
