nft-blackhole-netdev/nft-blackhole.py

367 lines
11 KiB
Python
Executable file

#!/usr/bin/python3
"""Script to create blocking IP in nftables by country and black lists"""
__author__ = "Tomasz Cebula <tomasz.cebula@gmail.com>"
__credits__ = ["Brian Farrell <brian.farrell@me.com>"]
__license__ = "MIT"
__version__ = "1.2.1"
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import os
import re
import ssl
from string import Template
from subprocess import run
import sys
from textwrap import dedent
from urllib.error import HTTPError
import urllib.request
from jinja2 import Environment, FileSystemLoader, select_autoescape
from systemd.journal import JournalHandler
from yaml import safe_load
app_name = 'nft-blackhole'
"""
LOGGING
"""
logger = logging.getLogger(app_name)
# Get logging level from environment variable if set
DEBUG_MODE = bool(os.getenv('NFT_BH_DEBUG_MODE', False))
if DEBUG_MODE:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
LAUNCHED_BY_SYSTEMD = bool(os.getenv('LAUNCHED_BY_SYSTEMD', False))
if LAUNCHED_BY_SYSTEMD:
log_handler = JournalHandler(SYSLOG_IDENTIFIER=app_name)
log_formatter = logging.Formatter('%(levelname)s - %(module)s line %(lineno)d: %(message)s')
else:
log_handler = logging.StreamHandler(stream=sys.stderr)
log_formatter = logging.Formatter(
'%(asctime)s.%(msecs)03d - %(levelname)s - %(module)s line %(lineno)d: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
log_handler.setFormatter(log_formatter)
logger.addHandler(log_handler)
"""
urllib config
"""
IGNORE_CERTIFICATE = False
ctx = ssl.create_default_context()
if IGNORE_CERTIFICATE:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
https_handler = urllib.request.HTTPSHandler(context=ctx)
opener = urllib.request.build_opener(https_handler)
opener.addheaders = [('User-agent', 'Mozilla/5.0 (compatible; nft-blackhole/0.1.0; '
'+https://github.com/tomasz-c/nft-blackhole)')]
urllib.request.install_opener(opener)
class Config(object):
"""The Config object holds all configuration values.
The user is able to customize settings in /usr/local/etc/nft-blackhole.yaml.
"""
IP_VERSIONS = ['v4', 'v6']
NFT_BLACKHOLE_CONFIG = '/usr/local/etc/nft-blackhole.yaml'
SET_TEMPLATE = (
'table netdev blackhole {\n\tset ${set_name} {\n\t\ttype ${ip_version}_addr\n'
'\t\tflags interval\n\t\tauto-merge\n\t\telements = { ${ip_list} }\n\t}\n}'
).expandtabs()
def __init__(self):
self._active_ip_versions = list()
self._block_policy = None
self._default_policy = None
self._whitelist = None
self._blacklist = None
self._country_list = None
self._iifname = None
_config = Config._load_config()
self._configure(_config)
self.jinja_env = Environment(
loader=FileSystemLoader("/usr/local/share/nft-blackhole"),
autoescape=select_autoescape(),
trim_blocks=True,
lstrip_blocks=True
)
@property
def iifname(self):
return self._iifname
@iifname.setter
def iifname(self, value):
self._iifname = value
@property
def active_ip_versions(self):
return self._active_ip_versions
@active_ip_versions.setter
def active_ip_versions(self, value):
for ip_v in self.IP_VERSIONS:
if value[ip_v]:
self._active_ip_versions.append(ip_v)
@property
def block_policy(self):
return self._block_policy
@block_policy.setter
def block_policy(self, value):
self._block_policy = value
@property
def default_policy(self):
return self._default_policy
@default_policy.setter
def default_policy(self, value):
self._default_policy = value
@property
def whitelist(self):
return self._whitelist
@whitelist.setter
def whitelist(self, value):
self._whitelist = value
@property
def blacklist(self):
return self._blacklist
@blacklist.setter
def blacklist(self, value):
self._blacklist = value
@property
def country_list(self):
return self._country_list
@country_list.setter
def country_list(self, value):
# Correct incorrect YAML parsing of no (Norway)
# It should be the string 'no', but YAML interprets it as False
# This is a hack due to the lack of YAML 1.2 support by PyYAML
while False in value:
value[value.index(False)] = 'no'
self._country_list = value
def _configure(self, _config):
# IP_VERSIONS is a required config value
if active_ip_versions := _config.get("IP_VERSIONS"):
self.active_ip_versions = active_ip_versions
else:
logger.error("The config file does not specify IP_VERSIONS. Exiting.")
sys.exit(78)
self.default_policy = _config.get("DEFAULT_POLICY", 'accept')
self.block_policy = _config.get("BLOCK_POLICY", 'drop')
self.whitelist = _config.get("WHITELIST")
self.blacklist = _config.get("BLACKLIST")
self.country_list = _config.get("COUNTRY_LIST")
self.iifname = _config.get("IIFNAME")
@classmethod
def _load_config(cls):
try:
with open(cls.NFT_BLACKHOLE_CONFIG, 'r') as stream:
data = safe_load(stream)
except FileNotFoundError:
logger.error("No config file found at /usr/local/etc/nft-blackhole.yaml. Exiting.")
sys.exit(78)
else:
logger.info(f"Config loaded from {cls.NFT_BLACKHOLE_CONFIG}")
return data
def __str__(self):
config = f"""
IP_VERSIONS: {self.active_ip_versions}
BLOCK_POLICY: {self.block_policy}
DEFAULT_POLICY: {self.default_policy}
WHITELIST: {self.whitelist}
BLACKLIST: {self.blacklist}
COUNTRY_LIST: {self.country_list}
IIFNAME: {self.iifname}
"""
return dedent(config)
def stop():
"""Stopping nft-blackhole"""
run(['nft', 'delete', 'table', 'netdev', 'blackhole'], check=True)
def start(config):
"""Starting nft-blackhole"""
nft_template = config.jinja_env.get_template("nft-blackhole.j2")
nft_conf = nft_template.render(
default_policy=config.default_policy,
block_policy=config.block_policy,
iifname=config.iifname,
)
run(['nft', '-f', '-'], input=nft_conf.encode(), check=True)
def get_urls(urls, do_filter=False):
"""Download urls in threads"""
ip_list_aggregated = []
def get_url(url):
logger.info(f"Getting URL: {url}")
try:
response = urllib.request.urlopen(url, timeout=10)
content = response.read().decode('utf-8')
except HTTPError as e:
logger.error(f"HTTP error {e.code} {e.reason} {e.url}")
ip_list = []
else:
if do_filter:
content = re.sub(r'^ *(#.*\n?|\n?)', '', content, flags=re.MULTILINE)
ip_list = content.splitlines()
return ip_list
with ThreadPoolExecutor(max_workers=8) as executor:
do_urls = [executor.submit(get_url, url) for url in urls]
for out in as_completed(do_urls):
ip_list = out.result()
ip_list_aggregated += ip_list
return ip_list_aggregated
def get_blacklist(blacklist):
"""Get blacklists"""
urls = []
for bl_url in blacklist:
urls.append(bl_url)
ips = get_urls(urls, do_filter=True)
return ips
def get_country_ip_list(country_list, ip_version):
"""Get country lists from GitHub @herrbischoff"""
urls = []
for country in country_list:
logger.info(f"Getting blocklist for country: {country}")
url = (
f'https://raw.githubusercontent.com/herrbischoff/country-ip-blocks/'
f'master/ip{ip_version}/{country.lower()}.cidr'
)
urls.append(url)
ips = get_urls(urls)
return ips
def whitelist_sets(config, reload=False):
"""Create whitelist sets"""
for ip_version in config.active_ip_versions:
whitelist = config.whitelist.get(ip_version)
if whitelist:
set_name = f'whitelist-{ip_version}'
set_list = ', '.join(whitelist)
nft_set = (
Template(config.SET_TEMPLATE).substitute(
ip_version=f'ip{ip_version}', set_name=set_name, ip_list=set_list
)
)
if reload:
run(['nft', 'flush', 'set', 'netdev', 'blackhole', set_name], check=True)
if config.whitelist[ip_version]:
run(['nft', '-f', '-'], input=nft_set.encode(), check=True)
def blacklist_sets(config, reload=False):
"""Create blacklist sets"""
for ip_version in config.active_ip_versions:
blacklist = config.blacklist.get(ip_version)
if blacklist:
set_name = f'blacklist-{ip_version}'
ip_list = get_blacklist(config.blacklist[ip_version])
set_list = ', '.join(ip_list)
nft_set = (
Template(config.SET_TEMPLATE).substitute(
ip_version=f'ip{ip_version}', set_name=set_name, ip_list=set_list
)
)
if reload:
run(['nft', 'flush', 'set', 'netdev', 'blackhole', set_name], check=True)
if ip_list:
run(['nft', '-f', '-'], input=nft_set.encode(), check=True)
def country_sets(config, reload=False):
"""Create country sets"""
country_list = config.country_list
if country_list:
for ip_version in config.active_ip_versions:
set_name = f'country-{ip_version}'
ip_list = get_country_ip_list(config.country_list, ip_version)
set_list = ', '.join(ip_list)
nft_set = (
Template(config.SET_TEMPLATE).substitute(
ip_version=f'ip{ip_version}', set_name=set_name, ip_list=set_list
)
)
if reload:
run(['nft', 'flush', 'set', 'netdev', 'blackhole', set_name], check=True)
if ip_list:
run(['nft', '-f', '-'], input=nft_set.encode(), check=True)
def main():
desc = 'Script to blocking IP in nftables by country and black lists'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('action', choices=('start', 'stop', 'restart', 'reload', 'config'),
help='Action to nft-blackhole')
args = parser.parse_args()
action = args.action
config = Config()
if action == 'start':
logger.info("Starting blackhole")
start(config)
whitelist_sets(config)
blacklist_sets(config)
country_sets(config)
elif action == 'stop':
logger.info("Stopping blackhole")
stop()
elif action == 'restart':
logger.info("Re-starting blackhole")
stop()
start(config)
whitelist_sets(config)
blacklist_sets(config)
country_sets(config)
elif action == 'reload':
logger.info("Re-loading blackhole sets")
whitelist_sets(config, reload=True)
blacklist_sets(config, reload=True)
country_sets(config, reload=True)
elif action == 'config':
print(config)
if __name__ == '__main__':
main()