#!/usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author:  Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
#          https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.

# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.md

"""See the check's README for more details."""

import argparse
import json
import socket
import sys
from collections import namedtuple

import lib.args
import lib.base
import lib.lftest
import lib.net
from lib.globals import STATE_CRIT, STATE_OK, STATE_UNKNOWN, STATE_WARN

try:
    import psutil
except ImportError:
    print('Python module "psutil" is not installed.')
    sys.exit(STATE_UNKNOWN)

# psutil 5.x exposes sconn under psutil._common, psutil 6+ under
# psutil._ntuples. Fall back to a plain namedtuple with the fields
# we touch (status, family, type) if neither is available.
try:
    from psutil._ntuples import sconn
except ImportError:
    try:
        from psutil._common import sconn
    except ImportError:
        sconn = namedtuple(
            'sconn',
            ['fd', 'family', 'type', 'laddr', 'raddr', 'status', 'pid'],
        )


__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2026041301'

DESCRIPTION = """Counts system-wide socket connections by type (TCP, TCP6, UDP, UDP6) and state.
Alerts when the total number of connections in a specific state exceeds the configured
thresholds. Useful for detecting connection leaks or applications that do not properly
close sockets."""

DEFAULT_CONN_STATUS = 'all'
DEFAULT_CONN_TYPE = 'all'
DEFAULT_CRIT = None
DEFAULT_WARN = None


def parse_args():
    """Parse command line arguments using argparse."""
    parser = argparse.ArgumentParser(description=DESCRIPTION)

    parser.add_argument(
        '-V',
        '--version',
        action='version',
        version=f'%(prog)s: v{__version__} by {__author__}',
    )

    parser.add_argument(
        '--conn-status',
        help='Filter connections by status. Can be specified multiple times. '
        'Default: %(default)s',
        default=None,  # due to https://bugs.python.org/issue16399, see in main() below
        dest='CONN_STATUS',
        action='append',
        choices=[
            'all',
            'close',
            'close_wait',
            'closing',
            'established',
            'fin_wait1',
            'fin_wait2',
            'last_ack',
            'listen',
            'none',
            'syn_recv',
            'syn_sent',
            'time_wait',
        ],
    )

    parser.add_argument(
        '--conn-type',
        help='Filter connections by family/type. Can be specified multiple times. '
        'Default: %(default)s',
        default=None,  # due to https://bugs.python.org/issue16399, see in main() below
        dest='CONN_TYPE',
        action='append',
        choices=[
            'all',
            'tcp',
            'tcp6',
            'udp',
            'udp6',
        ],
    )

    parser.add_argument(
        '-c',
        '--critical',
        help='CRIT threshold for the number of connections. Default: %(default)s',
        default=DEFAULT_CRIT,
        dest='CRIT',
    )

    parser.add_argument(
        '--test',
        help=lib.args.help('--test'),
        dest='TEST',
        type=lib.args.csv,
    )

    parser.add_argument(
        '-w',
        '--warning',
        help='WARN threshold for the number of connections. Default: %(default)s',
        default=DEFAULT_WARN,
        dest='WARN',
    )

    args, _ = parser.parse_known_args()
    return args


# Protocol string -> (family, type) pairs used to decode fixtures.
# We prefer a string form in the JSON fixtures because the raw
# numeric socket family values (AF_INET=2, AF_INET6=10) and the
# socket type values (SOCK_STREAM=1, SOCK_DGRAM=2) are unfriendly.
_PROTO_TO_FAMILY_TYPE = {
    'tcp': (socket.AF_INET, socket.SOCK_STREAM),
    'udp': (socket.AF_INET, socket.SOCK_DGRAM),
    'tcp6': (getattr(socket, 'AF_INET6', 10), socket.SOCK_STREAM),
    'udp6': (getattr(socket, 'AF_INET6', 10), socket.SOCK_DGRAM),
}


def _load_network_connections_fixture(raw_json):
    """Convert a test fixture into the list of `sconn` namedtuples
    that `psutil.net_connections(kind='inet')` would return. The
    plugin only reads the `status`, `family`, and `type` fields, so
    the fixture just needs those (laddr/raddr/pid/fd default to
    blank values).

    Fixture shape:

        [
          {"proto": "tcp",  "status": "ESTABLISHED"},
          {"proto": "tcp6", "status": "LISTEN"},
          {"proto": "udp",  "status": "NONE"},
          ...
        ]
    """
    data = json.loads(raw_json)
    result = []
    for entry in data:
        family, stype = _PROTO_TO_FAMILY_TYPE[entry['proto']]
        result.append(
            sconn(
                fd=-1,
                family=family,
                type=stype,
                laddr=(),
                raddr=(),
                status=entry['status'],
                pid=None,
            )
        )
    return result


def main():
    """The main function. This is where the magic happens."""

    # parse the command line
    try:
        args = parse_args()
    except SystemExit:
        sys.exit(STATE_UNKNOWN)

    # due to https://bugs.python.org/issue16399, set the default value here
    if args.CONN_STATUS is None:
        args.CONN_STATUS = DEFAULT_CONN_STATUS
    if args.CONN_TYPE is None:
        args.CONN_TYPE = DEFAULT_CONN_TYPE

    # fetch data
    if args.TEST is None:
        connections = psutil.net_connections(kind='inet')
    else:
        stdout, _, _ = lib.lftest.test(args.TEST)
        connections = _load_network_connections_fixture(stdout)

    stats = {}
    for c in connections:
        if 'all' not in args.CONN_STATUS and c.status.lower() not in args.CONN_STATUS:
            continue
        if (
            'all' not in args.CONN_TYPE
            and lib.net.PROTO_MAP[(c.family, c.type)] not in args.CONN_TYPE
        ):
            continue
        key = f'{lib.net.PROTO_MAP[c.family, c.type]}_{c.status}'
        if key in stats:
            stats[key] += 1
        else:
            stats[key] = 1

    # init some vars
    msg = ''
    perfdata = ''
    state = STATE_OK

    # build the message
    for item in lib.base.sort(stats, reverse=True):
        proto, value = item
        if value:
            conn_state = STATE_OK
            if not lib.base.coe(lib.base.match_range(value, args.CRIT)):
                conn_state = STATE_CRIT
            elif not lib.base.coe(lib.base.match_range(value, args.WARN)):
                conn_state = STATE_WARN
            state = lib.base.get_worst(conn_state, state)
            msg += (
                f'{proto.replace("_", " ")}'
                f': {value}'
                f'{lib.base.state2str(conn_state, prefix=" ")}'
                f', '
            )
        perfdata += lib.base.get_perfdata(
            proto,
            value,
        )
    if not msg:
        msg = (
            f'No connections of type'
            f' "{",".join(args.CONN_TYPE)}"'
            f' in status'
            f' "{",".join(args.CONN_STATUS)}"'
            f' found.  '
        )

    # over and out
    lib.base.oao(msg[:-2], state, perfdata)


if __name__ == '__main__':
    try:
        main()
    except Exception:
        lib.base.cu()
