#!/usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author:  Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
#          https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.

# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.md

"""See the check's README for more details."""

import argparse
import concurrent.futures
import ipaddress
import re
import socket
import ssl
import sys
import time
import urllib.parse
import warnings
from pathlib import Path

import lib.args
import lib.base
import lib.disk
import lib.human
import lib.net
import lib.time
import lib.txt
from lib.globals import STATE_CRIT, STATE_OK, STATE_UNKNOWN, STATE_WARN

try:
    from cryptography import x509
    from cryptography.hazmat.primitives import hashes, serialization
    from cryptography.utils import CryptographyDeprecationWarning
except ImportError:
    print('Python module "cryptography" is not installed.')
    sys.exit(STATE_UNKNOWN)

# System trust stores (for example /etc/pki/tls/certs) ship legacy root CAs whose serial
# number is zero or negative, which RFC 5280 disallows. cryptography only warns about this
# for now, but the warning would otherwise leak onto the plugin's stderr and confuse the
# monitoring system. We still parse and check these certificates, so silence the warning.
warnings.filterwarnings('ignore', category=CryptographyDeprecationWarning)


__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2026060804'

DESCRIPTION = """Inspects X.509 certificates and alerts on days remaining until expiry,
hostname mismatch and chain verification failures. Sources via --source: `url` fetches the
certificate from a single TLS endpoint and verifies the chain against the system trust store
by default (override with --ca-file); `file` reads one or many certificate files via glob
expansion (PEM or DER); `scan` discovers the hosts of a subnet (the subnet of the default
interface, a chosen interface, an explicit network in CIDR notation, or an explicit host
list), connects to each one on the ports given by --ports and inspects every certificate it
gets. PEM bundles expand to one item per certificate, so a fullchain.pem produces a row for
the leaf and a row for each intermediate. With --source url the plugin only runs the TLS
handshake and reads the server certificate; no HTTP request is sent. That means it works for
any "TLS from start" service, not only HTTPS: IMAPS (port 993), LDAPS (636), SMTPS (465),
AMQPS (5671), MQTTS (8883), custom TLS ports - just point --url at the right host and port
(`https://mail.example.com:993/` inspects the IMAPS cert). STARTTLS protocols that upgrade an
existing plaintext connection (SMTP submission on 587, IMAP on 143, LDAP on 389) are not
supported. Hostname verification only applies to --source url; chain/trust verification
applies to --source url and --source scan and its failures use --severity (warn or crit),
while a valid self-signed certificate is tolerated. Expired certificates are unconditionally
reported as CRIT. With --source file and --source scan the worst state across all inspected
certificates
drives the plugin state; targets that do not answer within --timeout are skipped. The default
source is `scan`, so without any parameter the plugin scans the default interface's subnet on a
set of common data-center TLS ports (HTTPS, mail, LDAPS, AMQPS, MQTTS and common management
interfaces); see --ports for the full default list."""

DEFAULT_CRIT = '5:'
DEFAULT_LENGTHY = False
DEFAULT_MAX_WORKERS = 10
# Common TLS ports in a data center: HTTPS (443), SMTPS (465), LDAPS (636),
# FTPS (990), IMAPS (993), POP3S (995), LDAPS global catalog (3269), AMQPS (5671),
# WinRM (5986), Kubernetes API (6443), Proxmox (8006), Vault (8200), HTTPS-alt /
# Tomcat / WildFly / Keycloak (8443), MQTTS (8883), Cockpit (9090), appliance /
# management HTTPS (9443) and Webmin (10000).
DEFAULT_PORTS = [
    '443',
    '465',
    '636',
    '990',
    '993',
    '995',
    '3269',
    '5671',
    '5986',
    '6443',
    '8006',
    '8200',
    '8443',
    '8883',
    '9090',
    '9443',
    '10000',
]
DEFAULT_SEVERITY = 'warn'
DEFAULT_SOURCE = 'scan'
DEFAULT_TIMEOUT = 8
DEFAULT_WARN = '14:'

VALID_SEVERITIES = ('crit', 'warn')
VALID_SOURCES = ('file', 'scan', 'url')


def parse_args():
    """Parse command line arguments using argparse."""
    parser = argparse.ArgumentParser(description=DESCRIPTION)

    parser.add_argument(
        '-V',
        '--version',
        action='version',
        version=f'%(prog)s: v{__version__} by {__author__}',
    )

    parser.add_argument(
        '--always-ok',
        help=lib.args.help('--always-ok'),
        dest='ALWAYS_OK',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '--ca-file',
        help='Path to a CA bundle in PEM format, trusted for chain verification in addition '
        'to the system trust store. '
        'Can be specified multiple times to combine several bundles. '
        'Applies to --source=url and --source=scan. '
        'Example: `--ca-file=/etc/pki/ca-trust/source/anchors/internal.pem`',
        dest='CA_FILE',
        action='append',
        default=None,
    )

    parser.add_argument(
        '--client-cert',
        help='Path to a client certificate in PEM format for mutual TLS.',
        dest='CLIENT_CERT',
    )

    parser.add_argument(
        '--client-key',
        help='Path to the client certificate private key in PEM format.',
        dest='CLIENT_KEY',
    )

    parser.add_argument(
        '-c',
        '--critical',
        help='CRIT threshold for the time remaining until the certificate expires. '
        'Accepts a Nagios range in days (`5:`), a percentage of the total validity period '
        '(`10%%`, CRIT when less than 10%% of the lifetime is left), or a duration with a '
        'unit (`3d`, `12h`, `2W`, `1M`; CRIT when less time than that is left). '
        'Examples: `5:` `10%%` `3d`. '
        'Default: %(default)s',
        dest='CRIT',
        default=DEFAULT_CRIT,
    )

    parser.add_argument(
        '--exclude',
        help='IP address or hostname to skip during a scan. Matched against both the '
        'discovered target address and, for --host, the given name. '
        'Only applies to --source=scan. '
        'Can be specified multiple times. '
        'Example: `--exclude=192.0.2.1 --exclude=192.0.2.254`',
        dest='EXCLUDE',
        action='append',
        default=None,
    )

    parser.add_argument(
        '--filename',
        help='Path to a certificate file or a glob pattern matching multiple certificate '
        'files. Required when --source=file. Files are read as PEM or DER (autodetected); '
        'when a PEM bundle contains multiple certificates (typical for fullchain.pem or a '
        'CA bundle), each certificate becomes its own row in the output. Globs follow '
        'Python conventions: `*` matches one path segment, `**` matches across '
        'directories. Always quote the pattern in shells so that the shell does not '
        'expand the wildcard before the plugin sees it. '
        "Example: `--filename='/etc/ssl/certs/*.pem'`. "
        "Recursive example: `--filename='/etc/letsencrypt/live/**/cert.pem'`",
        dest='FILENAME',
    )

    parser.add_argument(
        '-H',
        '--host',
        help='Target host to scan. Overrides subnet auto-discovery. '
        'Only applies to --source=scan. '
        'Can be specified multiple times. '
        'If not specified, the subnet of the default interface is scanned. '
        'Example: `--host=mail.example.com --host=192.0.2.10`',
        dest='HOST',
        action='append',
        default=None,
    )

    parser.add_argument(
        '--insecure',
        help='Skip chain and hostname verification entirely. The certificate is still '
        'fetched and inspected, but the chain verdict is reported as "verification skipped".',
        dest='INSECURE',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '--interface',
        help='Network interface whose subnet is scanned. '
        'Only applies to --source=scan. Ignored when --host or --network is given. '
        'If not specified, the default interface (the one carrying the default route) '
        'is used.',
        dest='INTERFACE',
        default=None,
    )

    parser.add_argument(
        '--lengthy',
        help=lib.args.help('--lengthy'),
        dest='LENGTHY',
        action='store_true',
        default=DEFAULT_LENGTHY,
    )

    parser.add_argument(
        '--max-workers',
        help='Maximum number of targets to scan in parallel. '
        'Only applies to --source=scan. Default: %(default)s',
        dest='MAX_WORKERS',
        type=int,
        default=DEFAULT_MAX_WORKERS,
    )

    parser.add_argument(
        '--network',
        help='Network in CIDR notation to scan for targets via auto-discovery. '
        'Only applies to --source=scan. Takes precedence over --interface. '
        'Can be specified multiple times. '
        'Example: `--network=192.0.2.0/24`',
        dest='NETWORK',
        action='append',
        default=None,
    )

    parser.add_argument(
        '--ports',
        help='TCP port to probe on every scanned target. A range is written `start-end`. '
        'Only applies to --source=scan. '
        'Can be specified multiple times. '
        f'If not specified, a set of common data-center TLS ports is probed '
        f'({", ".join(DEFAULT_PORTS)}). '
        'Example: `--ports=443 --ports=993 --ports=8000-8100`',
        dest='PORTS',
        action='append',
        default=None,
    )

    parser.add_argument(
        '--severity',
        help='Severity assigned to chain/trust verification failures (--source=url and '
        '--source=scan) and to hostname mismatches (--source=url only). '
        'A valid self-signed certificate is always tolerated. '
        'Defaults to %(default)s so that operators running internal CAs are not paged '
        'by trust issues that are expected in their environment. '
        'Set to `crit` to enforce strict trust.',
        dest='SEVERITY',
        choices=VALID_SEVERITIES,
        default=DEFAULT_SEVERITY,
    )

    parser.add_argument(
        '--sni-hostname',
        help='SNI hostname sent during the TLS handshake and used for hostname verification. '
        'Useful when --url points at an IP address or a load balancer that needs an explicit '
        'SNI. Default uses the hostname from --url.',
        dest='SNI_HOSTNAME',
    )

    parser.add_argument(
        '--source',
        help='Where the certificates are fetched from. '
        '`url` fetches one from a TLS endpoint (requires --url). '
        '`file` reads one or many from local files (requires --filename, supports glob '
        'patterns). '
        '`scan` discovers the hosts of a subnet (via --host, --network, --interface or the '
        'default interface) and probes each one on --ports. '
        '`p12` and `jks` are reserved for future expansion. '
        'Default: %(default)s',
        dest='SOURCE',
        choices=VALID_SOURCES,
        default=DEFAULT_SOURCE,
    )

    parser.add_argument(
        '--timeout',
        help=lib.args.help('--timeout') + ' Default: %(default)s (seconds)',
        dest='TIMEOUT',
        type=int,
        default=DEFAULT_TIMEOUT,
    )

    parser.add_argument(
        '--url',
        help='URL of the TLS endpoint to inspect. Required when --source=url. '
        'Example: `https://www.example.com/`',
        dest='URL',
    )

    parser.add_argument(
        '--verbose',
        help=lib.args.help('--verbose') + ' Default: %(default)s',
        dest='VERBOSE',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '-w',
        '--warning',
        help='WARN threshold for the time remaining until the certificate expires. '
        'Accepts a Nagios range in days (`14:`), a percentage of the total validity period '
        '(`25%%`, WARN when less than 25%% of the lifetime is left), or a duration with a '
        'unit (`14d`, `12h`, `2W`, `1M`; WARN when less time than that is left). '
        'Examples: `14:` `25%%` `14d`. '
        'Default: %(default)s',
        dest='WARN',
        default=DEFAULT_WARN,
    )

    args, _ = parser.parse_known_args()
    return args


def get_cert_from_url(args):
    """Open a TLS connection, capture the peer certificate chain in DER form, the negotiated
    TLS version and the wall-clock handshake time, and decide the chain/hostname verdict.

    Returns (success, result). On success result is a dict with the keys consumed by main();
    `chain_ders` holds the leaf followed by every intermediate the server sent (Python 3.13+
    via ssl.get_unverified_chain(); on older Python only the leaf is available).
    """
    parsed = urllib.parse.urlsplit(args.URL)
    host = parsed.hostname
    if not host:
        return False, f'Cannot parse a hostname from --url "{args.URL}"'
    port = parsed.port or 443
    server_hostname = args.SNI_HOSTNAME or host

    chain_ok = True
    chain_reason = ''

    if not args.INSECURE:
        try:
            # the given CA bundle(s) are trusted in addition to the system trust store
            secure_ctx = ssl.create_default_context()
            for ca_file in args.CA_FILE:
                secure_ctx.load_verify_locations(cafile=ca_file)
            secure_ctx.minimum_version = ssl.TLSVersion.TLSv1_2
            if args.CLIENT_CERT:
                secure_ctx.load_cert_chain(args.CLIENT_CERT, keyfile=args.CLIENT_KEY)
            with (
                socket.create_connection((host, port), timeout=args.TIMEOUT) as sock,
                secure_ctx.wrap_socket(sock, server_hostname=server_hostname),
            ):
                pass
        except ssl.SSLCertVerificationError as e:
            chain_ok = False
            chain_reason = e.verify_message or str(e)
        except ssl.SSLError as e:
            chain_ok = False
            chain_reason = str(e)
        except (socket.timeout, OSError) as e:
            return False, f'Cannot connect to {host}:{port}: {e}'

    insecure_ctx = ssl.create_default_context()
    insecure_ctx.minimum_version = ssl.TLSVersion.TLSv1_2
    insecure_ctx.check_hostname = False
    insecure_ctx.verify_mode = ssl.CERT_NONE
    if args.CLIENT_CERT:
        insecure_ctx.load_cert_chain(args.CLIENT_CERT, keyfile=args.CLIENT_KEY)

    try:
        t0 = time.monotonic()
        with (
            socket.create_connection((host, port), timeout=args.TIMEOUT) as sock,
            insecure_ctx.wrap_socket(sock, server_hostname=server_hostname) as tls_sock,
        ):
            handshake_seconds = time.monotonic() - t0
            cert_der = tls_sock.getpeercert(True)
            tls_version = tls_sock.version()
            # full peer chain (leaf + intermediates) on Python 3.13+; the method returns a
            # list of DER-encoded bytes. Older Python only exposes the leaf via getpeercert.
            chain_ders = [cert_der]
            get_chain = getattr(tls_sock, 'get_unverified_chain', None)
            if get_chain is not None:
                try:
                    chain = get_chain()
                    if chain:
                        chain_ders = list(chain)
                except (ssl.SSLError, ValueError, OSError):
                    pass
    except (socket.timeout, OSError) as e:
        return False, f'Cannot connect to {host}:{port}: {e}'
    except ssl.SSLError as e:
        return False, f'TLS handshake failed for {host}:{port}: {e}'

    if not cert_der:
        return False, f'No server certificate received from {host}:{port}'

    return True, {
        'chain_ders': chain_ders,
        'chain_ok': chain_ok,
        'chain_reason': chain_reason,
        'handshake_seconds': handshake_seconds,
        'host': host,
        'port': port,
        'sni': server_hostname,
        'tls_version': tls_version,
    }


def verbose(args, msg):
    """Print a progress message when --verbose is set, so the admin can see what the
    plugin is doing during a long multi-host scan. Only used for debugging.
    """
    if args.VERBOSE:
        print(msg)


def parse_ports(specs):
    """Parse the --ports values into a sorted list of unique ints. Each value is either a
    single port (`443`) or an inclusive range (`8000-8100`). Returns (success, result).
    """
    ports = set()
    for spec in specs:
        spec = spec.strip()
        if not spec:
            continue
        start, sep, end = spec.partition('-')
        try:
            lo, hi = (int(start), int(end)) if sep else (int(spec), int(spec))
        except ValueError:
            return False, f'Invalid port "{spec}" in --ports'
        if lo > hi:
            lo, hi = hi, lo
        for port in range(lo, hi + 1):
            if not 1 <= port <= 65535:
                return False, f'Port {port} in --ports is out of range (1-65535)'
            ports.add(port)
    if not ports:
        return False, '--ports did not yield any port'
    return True, sorted(ports)


def discover_hosts(args):
    """Return (success, list of target hosts) for --source=scan. --host wins, then
    --network (CIDR), then --interface, then the default interface subnet. Addresses and
    names listed in --exclude are removed. Mirrors the discovery logic of the lynis check.
    """
    if args.HOST is not None:
        hosts = list(args.HOST)
    elif args.NETWORK is not None:
        verbose(args, f'Discovering hosts on {", ".join(args.NETWORK)}...')
        hosts = []
        for cidr in args.NETWORK:
            ok, result = lib.net.cidr_to_hosts(cidr)
            if not ok:
                return False, result
            hosts.extend(result)
    else:
        scope = args.INTERFACE if args.INTERFACE else 'the default interface'
        verbose(args, f'Discovering hosts on {scope}...')
        ok, result = lib.net.get_subnet_hosts(args.INTERFACE)
        if not ok:
            return False, result
        hosts = result

    excludes = set(args.EXCLUDE)
    hosts = [host for host in hosts if host not in excludes]
    if not hosts:
        return False, 'No target hosts to scan after applying --exclude.'
    return True, hosts


# OpenSSL verify codes for a self-signed certificate (leaf, or a self-signed root in the
# chain). A valid self-signed certificate is cryptographically as sound as a publicly
# trusted one, so a subnet scan tolerates it instead of flagging every appliance/IPMI cert.
_SELF_SIGNED_VERIFY_CODES = (18, 19)


def _grab_cert_insecure(host, port, server_hostname, args):
    """Open a TLS connection without any verification and return (cert_der, tls_version),
    or (None, None) on a connection/TLS error. Used to read the certificate for expiry
    reporting even when trust verification failed or was skipped.
    """
    ctx = ssl.create_default_context()
    ctx.minimum_version = ssl.TLSVersion.TLSv1_2
    ctx.check_hostname = False
    ctx.verify_mode = ssl.CERT_NONE
    if args.CLIENT_CERT:
        ctx.load_cert_chain(args.CLIENT_CERT, keyfile=args.CLIENT_KEY)
    try:
        with (
            socket.create_connection((host, port), timeout=args.TIMEOUT) as sock,
            ctx.wrap_socket(sock, server_hostname=server_hostname) as tls_sock,
        ):
            return tls_sock.getpeercert(True), tls_sock.version()
    except (OSError, ssl.SSLError):
        return None, None


def get_cert_from_target(host, port, args):
    """Probe host:port, verify the certificate chain against the system trust store (plus
    any --ca-file) and capture the peer certificate. Hostname verification is intentionally
    not done: a subnet scan hits IP addresses whose certificates legitimately do not match.
    A valid self-signed certificate is tolerated (verdict "self-signed"); any other trust
    failure becomes "unverified (...)" and drives the state via --severity. --insecure skips
    verification entirely. Returns (True, dict) on success, or (False, None) when the target
    does not answer or speaks no TLS, so non-responding hosts are skipped.
    """
    # SNI must be a hostname; for a bare IP literal no SNI is sent (RFC 6066)
    try:
        ipaddress.ip_address(host)
        server_hostname = None
    except ValueError:
        server_hostname = host

    cert_der = None
    tls_version = None
    chain_verdict = 'verification skipped'

    if not args.INSECURE:
        try:
            ctx = ssl.create_default_context()
            for ca_file in args.CA_FILE:
                ctx.load_verify_locations(cafile=ca_file)
            ctx.minimum_version = ssl.TLSVersion.TLSv1_2
            ctx.check_hostname = False  # an IP scan never matches the certificate names
            if args.CLIENT_CERT:
                ctx.load_cert_chain(args.CLIENT_CERT, keyfile=args.CLIENT_KEY)
            with (
                socket.create_connection((host, port), timeout=args.TIMEOUT) as sock,
                ctx.wrap_socket(sock, server_hostname=server_hostname) as tls_sock,
            ):
                cert_der = tls_sock.getpeercert(True)
                tls_version = tls_sock.version()
                chain_verdict = 'verified'
        except ssl.SSLCertVerificationError as e:
            if e.verify_code in _SELF_SIGNED_VERIFY_CODES:
                chain_verdict = 'self-signed'
            else:
                chain_verdict = f'unverified ({e.verify_message or e})'
        except ssl.SSLError as e:
            chain_verdict = f'unverified ({e})'
        except (socket.timeout, OSError):
            return False, None  # target down, filtered, or not listening

    # read the certificate when trust verification failed or was skipped; a target that
    # answered the verified handshake already yielded the certificate above
    if cert_der is None:
        cert_der, tls_version = _grab_cert_insecure(host, port, server_hostname, args)
    if not cert_der:
        return False, None
    return True, {
        'cert_der': cert_der,
        'chain_verdict': chain_verdict,
        'tls_version': tls_version,
    }


def scan_targets(args, hosts, ports):
    """Probe every host/port combination in parallel and return the list of certificate
    items. Each item has the same shape as a file-source item, plus a `host` key, and the
    label is `host:port`. The chain verdict comes from a trust check (without hostname
    verification). Non-responding host/port combinations are skipped.
    """
    targets = [(host, port) for host in hosts for port in ports]

    def probe(target):
        host, port = target
        verbose(args, f'{host}:{port}: probing TLS connection...')
        ok, info = get_cert_from_target(host, port, args)
        if not ok:
            verbose(args, f'{host}:{port}: not reachable, skipping')
            return None
        verbose(args, f'{host}:{port}: reachable ({info["chain_verdict"]})')
        return {
            'cert_der': info['cert_der'],
            'chain_verdict': info['chain_verdict'],
            'handshake_seconds': None,
            'host': host,
            'label': f'{host}:{port}',
            'tls_version': info['tls_version'],
        }

    items = []
    workers = max(1, min(args.MAX_WORKERS, len(targets))) if targets else 1
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool:
        # map() preserves input order, so the output table is deterministic
        for item in pool.map(probe, targets):
            if item is not None:
                items.append(item)
    return items


def _looks_like_cert(data):
    """Return True if `data` has a recognisable certificate envelope. PEM certificates carry
    the `BEGIN CERTIFICATE` marker; DER certificates always start with the ASN.1 SEQUENCE
    tag (`0x30`). Files that match neither (private keys, OpenSSL trust bundles, text
    files) are not certificates and should be skipped silently when expanding a glob.
    """
    if b'-----BEGIN CERTIFICATE-----' in data:
        return True
    return data[:1] == b'\x30'


def _load_all_certs(data):
    """Parse every certificate from `data` and return a list of DER-encoded bytes. Should
    only be called on data that already passed `_looks_like_cert()`. Raises `ValueError`
    when the envelope looks like a certificate (PEM marker present or DER prefix detected)
    but the content cannot be parsed.

    PEM bundles can carry multiple `BEGIN CERTIFICATE`/`END CERTIFICATE` blocks (typical
    for `fullchain.pem` or system CA bundles); each block becomes its own list entry.
    DER files always carry exactly one certificate.
    """
    certs = []
    if b'-----BEGIN CERTIFICATE-----' in data:
        # walk the PEM block by block so a single corrupt block fails the whole call
        # rather than silently dropping the rest of the bundle
        marker_begin = b'-----BEGIN CERTIFICATE-----'
        marker_end = b'-----END CERTIFICATE-----'
        cursor = 0
        while True:
            start = data.find(marker_begin, cursor)
            if start < 0:
                break
            end = data.find(marker_end, start)
            if end < 0:
                raise ValueError(
                    'truncated PEM bundle: BEGIN CERTIFICATE without matching END',
                )
            block = data[start : end + len(marker_end)]
            cert = x509.load_pem_x509_certificate(block)
            certs.append(cert.public_bytes(serialization.Encoding.DER))
            cursor = end + len(marker_end)
    else:
        cert = x509.load_der_x509_certificate(data)
        certs.append(cert.public_bytes(serialization.Encoding.DER))
    return certs


def get_certs_from_files(args):
    """Glob-expand --filename, read each match, parse every certificate per file.

    Returns (success, items) where items is a list of dicts with `cert_der` and `label`.
    PEM bundles expand to one item per certificate so `fullchain.pem` produces a leaf row
    plus an intermediate row, and a CA bundle becomes one row per anchor. Files that don't
    look like certificates at all are silently skipped, which makes recursive globs safe
    even when they hit private keys or trust bundles. Files that look like certificates
    but fail to parse are reported as a real error so corrupt certs surface clearly.
    """
    path = Path(args.FILENAME)
    matches = sorted(Path(path.anchor).glob(str(path.relative_to(path.anchor))))
    items = []
    matched_files = 0
    for match in matches:
        if not match.is_file():
            continue
        matched_files += 1
        try:
            data = match.read_bytes()
        except OSError as e:
            return False, f'Cannot read {match}: {e}'
        if not _looks_like_cert(data):
            continue
        try:
            cert_ders = _load_all_certs(data)
        except (ValueError, TypeError) as e:
            return False, f'Cannot parse certificate from {match}: {e}'
        for cert_der in cert_ders:
            items.append(
                {
                    'cert_der': cert_der,
                    'chain_verdict': None,
                    'handshake_seconds': None,
                    'label': str(match),
                    'tls_version': None,
                }
            )
    if not items:
        if matched_files == 0:
            return False, f'No files match "{args.FILENAME}"'
        return False, (
            f'No parseable certificates among {matched_files} file(s)'
            f' matching "{args.FILENAME}"'
        )
    return True, items


def collect_items(args):
    """Return (success, items). Each item has the keys consumed by main(): `cert_der`,
    `label`, `chain_verdict`, `tls_version`, `handshake_seconds`. Wraps the per-source
    fetchers so main() can iterate uniformly over URL and file sources.
    """
    if args.SOURCE == 'url':
        if not args.URL:
            return False, '--url is required when --source=url'
        ok, info = get_cert_from_url(args)
        if not ok:
            return False, info
        if args.INSECURE:
            chain_verdict = 'verification skipped'
        elif info['chain_ok']:
            chain_verdict = 'verified'
        else:
            chain_verdict = f'unverified ({info["chain_reason"]})'
        # one item per certificate the server sent: the leaf carries the chain verdict, the
        # handshake metrics and the TLS version; the intermediates are checked for expiry
        # only, so a soon-to-expire intermediate still drives the plugin state
        items = []
        for index, cert_der in enumerate(info['chain_ders']):
            if index == 0:
                items.append(
                    {
                        'cert_der': cert_der,
                        'chain_verdict': chain_verdict,
                        'handshake_seconds': info['handshake_seconds'],
                        'label': info['host'],
                        'tls_version': info['tls_version'],
                    }
                )
            else:
                items.append(
                    {
                        'cert_der': cert_der,
                        'chain_verdict': None,
                        'handshake_seconds': None,
                        'label': f'{info["host"]} (chain {index})',
                        'tls_version': None,
                    }
                )
        return True, items
    if args.SOURCE == 'file':
        if not args.FILENAME:
            return False, '--filename is required when --source=file'
        return get_certs_from_files(args)
    return False, f'Source "{args.SOURCE}" is not implemented yet'


def parse_cert(cert_der):
    """Decode a DER-encoded X.509 certificate into a flat dict of human-readable fields."""
    cert = x509.load_der_x509_certificate(cert_der)

    def _cn(name):
        attrs = name.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
        return attrs[0].value if attrs else ''

    pub_key = cert.public_key()
    key_type = type(pub_key).__name__.replace('PublicKey', '')
    try:
        key_size = pub_key.key_size
    except AttributeError:
        key_size = None
    # RSA only: extract the public exponent. Standard value is 65537; unusual values
    # (3, 17) are a configuration smell.
    try:
        rsa_exponent = pub_key.public_numbers().e
    except AttributeError:
        rsa_exponent = None

    sans = []
    try:
        san_ext = cert.extensions.get_extension_for_oid(
            x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME,
        )
        for name in san_ext.value:
            try:
                sans.append(str(name.value))
            except AttributeError:
                pass
    except x509.ExtensionNotFound:
        pass

    # OCSP Must-Staple per RFC 7633: TLS Feature extension (OID 1.3.6.1.5.5.7.1.24)
    # carries feature codes; 5 = status_request (Must-Staple). cryptography exposes the
    # codes as TLSFeatureType enum members; .value contains the integer.
    must_staple = False
    try:
        tls_feature_ext = cert.extensions.get_extension_for_oid(
            x509.ObjectIdentifier('1.3.6.1.5.5.7.1.24'),
        )
        for feature in tls_feature_ext.value:
            code = getattr(feature, 'value', feature)
            if code == 5:
                must_staple = True
                break
    except x509.ExtensionNotFound:
        pass

    # cryptography >=42 returns tz-aware UTC datetimes via *_utc; older releases (e.g. the
    # one shipped on RHEL8) only have the naive variants. Strip tzinfo for downstream
    # arithmetic against `lib.time.now(as_type='datetime')`, which is naive UTC.
    if hasattr(cert, 'not_valid_after_utc'):
        not_after = cert.not_valid_after_utc.replace(tzinfo=None)
        not_before = cert.not_valid_before_utc.replace(tzinfo=None)
    else:
        not_after = cert.not_valid_after
        not_before = cert.not_valid_before

    return {
        'issuer_cn': _cn(cert.issuer),
        'key_size': key_size,
        'key_type': key_type,
        'must_staple': must_staple,
        'not_after': not_after,
        'not_before': not_before,
        'rsa_exponent': rsa_exponent,
        'sans': sans,
        'serial_hex': f'{cert.serial_number:X}',
        'sha256_fingerprint': cert.fingerprint(hashes.SHA256()).hex(':').upper(),
        # cryptography exposes no public getter for the signature algorithm's OID name;
        # `_name` is the documented way to read it.
        'sig_algorithm': cert.signature_algorithm_oid._name,  # pylint: disable=protected-access
        'subject_cn': _cn(cert.subject),
    }


def _format_days_left(days_left):
    """Render the days-remaining counter as a human-friendly phrase. Negative values mean
    the certificate is already past its `notAfter` date, so we say "expired N days ago"
    rather than the unintuitive "-N days left".
    """
    if days_left < 0:
        return f'expired {-days_left} days ago'
    if days_left == 0:
        return 'expires today'
    return f'{days_left}d left'


# a single duration token like 3d, 12h, 2W, 1M, 1Y (units handled by lib.human)
_DURATION_RE = re.compile(r'^\d+(\.\d+)?[YMWDdhms]$')


def resolve_threshold(threshold, total_seconds):
    """Normalize a --warning/--critical value into a Nagios range expressed in days, so the
    existing day-based range comparison and the days perfdata keep working unchanged.

    - empty stays empty (no threshold)
    - `N%` is N percent of this certificate's total validity period, converted to days
    - a single duration token (`3d`, `12h`, `2W`, `1M`) is that duration in days (via
      lib.human, which knows the unit letters)
    - anything else is already a Nagios range in days (`14:`, `@5:10`) and passes through

    `%` and duration thresholds become a `<n>:` range, i.e. alert when fewer than `<n>`
    days remain, matching the "less time left than the threshold" semantics.
    """
    threshold = threshold.strip()
    if not threshold:
        return ''
    if threshold.endswith('%'):
        try:
            pct = float(threshold[:-1])
        except ValueError:
            return threshold
        # `:g` drops trailing zeros, so 25% of 60 days reads "15:", not "15.000000:"
        return f'{pct / 100.0 * total_seconds / 86400.0:g}:'
    if _DURATION_RE.match(threshold):
        return f'{lib.human.human2seconds(threshold) / 86400.0:g}:'
    return threshold


def _evaluate_item(item, args, now, severity_state):
    """Compute the per-item state, the parsed cert fields and the resolved day thresholds.
    Pure function so the same logic drives the URL, file and scan sources.
    """
    cert = parse_cert(item['cert_der'])
    total_seconds = (cert['not_after'] - cert['not_before']).total_seconds()
    seconds_left = (cert['not_after'] - now).total_seconds()
    days_left = (cert['not_after'] - now).days  # floored, for display

    warn = resolve_threshold(args.WARN, total_seconds)
    crit = resolve_threshold(args.CRIT, total_seconds)
    # compare in fractional days so sub-day thresholds (12h) work
    item_state = lib.base.get_state(
        seconds_left / 86400.0, warn, crit, _operator='range'
    )
    if seconds_left < 0:
        item_state = STATE_CRIT
    if item['chain_verdict'] and item['chain_verdict'].startswith('unverified'):
        item_state = lib.base.get_worst(item_state, severity_state)

    return cert, days_left, item_state, warn, crit


def _build_lengthy_table(item, cert):
    """Render the full --lengthy field/value table for a single certificate."""
    key_label = (
        f'{cert["key_type"]} {cert["key_size"]}'
        if cert['key_size']
        else cert['key_type']
    )
    if cert['rsa_exponent'] is not None:
        key_label += f' (e={cert["rsa_exponent"]})'
    sans_label = ', '.join(cert['sans']) if cert['sans'] else '-'
    rows = [
        {'k': 'Subject CN', 'v': cert['subject_cn'] or '(no CN)'},
        {'k': 'Issuer CN', 'v': cert['issuer_cn'] or '(no CN)'},
        {'k': 'Serial', 'v': cert['serial_hex']},
        {'k': 'Signature Algorithm', 'v': cert['sig_algorithm']},
        {'k': 'Public Key', 'v': key_label},
        {'k': 'SANs', 'v': sans_label},
        {'k': 'Not Before', 'v': cert['not_before'].isoformat() + 'Z'},
        {'k': 'Not After', 'v': cert['not_after'].isoformat() + 'Z'},
        {'k': 'SHA-256 Fingerprint', 'v': cert['sha256_fingerprint']},
        {'k': 'OCSP Must-Staple', 'v': 'yes' if cert['must_staple'] else 'no'},
    ]
    if item['tls_version'] is not None:
        rows.append({'k': 'TLS Version', 'v': item['tls_version']})
    if item['chain_verdict'] is not None:
        rows.append({'k': 'Chain', 'v': item['chain_verdict']})
    return lib.base.get_table(rows, ['k', 'v'], header=['Field', 'Value'])


def main():
    """The main function. This is where the magic happens."""

    # parse the command line
    try:
        args = parse_args()
    except SystemExit:
        sys.exit(STATE_UNKNOWN)

    # set default values for append parameters that were not specified
    if args.CA_FILE is None:  # case 1: always treated as a list
        args.CA_FILE = []
    if args.EXCLUDE is None:  # case 1: always treated as a list
        args.EXCLUDE = []
    if args.PORTS is None:  # case 2: fall back to the default port list
        args.PORTS = list(DEFAULT_PORTS)
    # args.HOST and args.NETWORK stay None
    # (case 3: None means "not set", which selects subnet auto-discovery)

    # fetch data
    if args.SOURCE == 'scan':
        ports = lib.base.coe(parse_ports(args.PORTS))
        hosts = lib.base.coe(discover_hosts(args))
        verbose(args, f'{len(hosts)} host(s) x {len(ports)} port(s) to scan')
        items = scan_targets(args, hosts, ports)
        # count by host (like the lynis check), not by host/port: a /24 has 254 hosts,
        # not 254 x len(ports) "targets". responded = hosts that returned a certificate.
        total_hosts = len(hosts)
        responded_hosts = len({item['host'] for item in items})
    else:
        items = lib.base.coe(collect_items(args))
        total_hosts = None
        responded_hosts = None

    # init some vars
    severity_state = STATE_CRIT if args.SEVERITY == 'crit' else STATE_WARN
    now = lib.time.now(as_type='datetime')
    state = STATE_OK
    days_left_values = []
    warn_ranges = []
    crit_ranges = []
    summary_table = []
    lengthy_blocks = []
    handshake_seconds = None
    # url keeps a single-line headline even when the server sends a chain (leaf plus
    # intermediates); file collapses a single certificate to a one-liner; scan and a
    # multi-certificate file use the table form
    single = args.SOURCE == 'url' or (args.SOURCE == 'file' and len(items) == 1)

    # analyze data
    for item in items:
        cert, days_left, item_state, warn, crit = _evaluate_item(
            item, args, now, severity_state
        )
        state = lib.base.get_worst(state, item_state)
        days_left_values.append(days_left)
        warn_ranges.append(warn)
        crit_ranges.append(crit)
        if item['handshake_seconds'] is not None:
            handshake_seconds = item['handshake_seconds']

        summary_table.append(
            {
                # shorten the path for the File column so long cert-directory paths stay
                # readable in the table; the --lengthy block below keeps the full path
                'label': lib.disk.shorten_path(item['label']),
                'subject_cn': cert['subject_cn'] or '(no CN)',
                'days_left': _format_days_left(days_left),
                'state': lib.base.state2str(item_state, empty_ok=False),
            }
        )
        if args.LENGTHY:
            lengthy_blocks.append((item['label'], _build_lengthy_table(item, cert)))

    # the worst (soonest-expiring) certificate drives the headline and the perfdata
    # thresholds; for a chain that means a near-expiry intermediate surfaces here too
    worst_idx = (
        days_left_values.index(min(days_left_values)) if days_left_values else None
    )

    # build the message
    if single:
        worst = summary_table[worst_idx]
        chain = items[0][
            'chain_verdict'
        ]  # the leaf's verdict represents the connection
        # state2str goes at the end of the line so [WARNING] / [CRITICAL] sits next to the
        # reason (chain unverified, expired, etc.) rather than always next to days_left
        msg = f'{worst["subject_cn"]}, {worst["days_left"]}'
        if chain is not None:
            msg += f', chain {chain}'
        msg += lib.base.state2str(state, prefix=' ')
    elif args.SOURCE == 'scan':
        hosts_word = lib.txt.pluralize('host', total_hosts)
        if items:
            worst_subject = summary_table[worst_idx]['subject_cn']
            certs_word = lib.txt.pluralize('certificate', len(items))
            msg = (
                f'{responded_hosts}/{total_hosts} {hosts_word} responded'
                f', {len(items)} {certs_word}'
                f', worst {_format_days_left(min(days_left_values))} ({worst_subject})'
                f'{lib.base.state2str(state, prefix=" ")}'
            )
        else:
            msg = f'0/{total_hosts} {hosts_word} responded'
    else:  # file source with more than one certificate
        worst_subject = summary_table[worst_idx]['subject_cn']
        msg = (
            f'{len(items)} certificates in {args.FILENAME} checked'
            f', worst {_format_days_left(min(days_left_values))} ({worst_subject})'
            f'{lib.base.state2str(state, prefix=" ")}'
        )

    perfdata = ''
    if days_left_values:
        perfdata += lib.base.get_perfdata(
            'cert_days_left',
            min(days_left_values),
            uom='d',
            warn=warn_ranges[worst_idx],
            crit=crit_ranges[worst_idx],
        )
    if args.SOURCE == 'scan':
        perfdata += lib.base.get_perfdata('hosts_total', total_hosts, _min=0)
        perfdata += lib.base.get_perfdata('hosts_responded', responded_hosts, _min=0)
        perfdata += lib.base.get_perfdata('certs_found', len(items), _min=0)
    elif handshake_seconds is not None:
        perfdata += lib.base.get_perfdata(
            'tls_handshake_time',
            round(handshake_seconds, 4),
            uom='s',
            _min=0,
        )

    # build table output
    if not single and summary_table:
        label_header = 'Target' if args.SOURCE == 'scan' else 'File'
        msg += '\n\n' + lib.base.get_table(
            summary_table,
            ['label', 'subject_cn', 'days_left', 'state'],
            header=[label_header, 'Subject CN', 'Status', 'State'],
        )
    if args.LENGTHY:
        for label, block in lengthy_blocks:
            # `lib.base.get_table` already ends with a newline, so a single `\n` here
            # produces exactly one blank line between blocks. With more than one
            # certificate (a chain or a bundle) each block is labeled so the rows can be
            # told apart; a lone certificate needs no label.
            if len(items) > 1:
                msg += f'\n{label}\n{block}'
            else:
                msg += f'\n\n{block}'

    # over and out
    lib.base.oao(msg, state, perfdata, always_ok=args.ALWAYS_OK)


if __name__ == '__main__':
    try:
        main()
    except Exception:
        lib.base.cu()
