Source code for stakkr.docker_actions

# coding: utf-8
"""Docker functions to get info about containers."""

from docker.errors import NotFound, NullResource
__st__ = {'cts_info': dict(), 'running_cts': 0}


[docs]def add_container_to_network(container: str, network: str): """Attach a container to a network.""" if _container_in_network(container, network) is True: return False docker_network = get_client().networks.get(network) docker_network.connect(container) return True
[docs]def block_ct_ports(service: str, ports: list, project_name: str) -> tuple: """Run iptables commands to block a list of port on a specific container.""" try: container = get_client().containers.get(get_ct_item(service, 'id')) except (LookupError, NullResource): return False, '{} is not started, no port to block'.format(service) _, iptables = container.exec_run(['which', 'iptables']) iptables = iptables.decode().strip() if iptables == '': return True, "Can't block ports on {}, is iptables installed ?".format(service) _allow_contact_subnet(project_name, container) # Now for each port, add an iptable rule for port in ports: rule = ['OUTPUT', '-p', 'tcp', '--dport', str(port), '-j', 'REJECT'] try: container.exec_run([iptables, '-D'] + rule) finally: container.exec_run([iptables, '-A'] + rule) ports_list = ', '.join(map(str, ports)) return False, 'Blocked ports {} on container {}'.format(ports_list, service)
[docs]def check_cts_are_running(project_name: str): """Throw an error if cts are not running.""" get_running_containers(project_name) if not __st__['running_cts']: raise SystemError('Have you started stakkr with the start action ?')
[docs]def container_running(container: str): """Return True if the container is running else False.""" try: return get_api_client().inspect_container(container)['State']['Running'] except (NotFound, NullResource): return False
[docs]def create_network(network: str): """Create a Network.""" if network_exists(network): return False return get_client().networks.create(network, driver='bridge').id
[docs]def get_api_client(): """Return the API client or initialize it.""" if 'api_client' not in __st__: from docker import APIClient, utils params = utils.kwargs_from_env() base_url = None if 'base_url' not in params else params['base_url'] tls = None if 'tls' not in params else params['tls'] __st__['api_client'] = APIClient(base_url=base_url, tls=tls) return __st__['api_client']
[docs]def get_client(): """Return the client or initialize it.""" if 'client' not in __st__: from docker import client __st__['client'] = client.from_env() return __st__['client']
[docs]def get_ct_item(compose_name: str, item_name: str): """Get a value from a container, such as name or IP.""" if 'cts_info' not in __st__: raise LookupError('Before getting an info from a ct, run check_cts_are_running()') for _, ct_data in __st__['cts_info'].items(): if ct_data['compose_name'] == compose_name: return ct_data[item_name] return ''
[docs]def get_ct_name(container: str): """Return the system name of a container, generated by docker-compose.""" ct_name = get_ct_item(container, 'name') if ct_name == '': raise LookupError('{} does not seem to be started ...'.format(container)) return ct_name
[docs]def get_network_name(project_name: str): """Find the full network name.""" try: guessed_network_name = '{}_stakkr'.format(project_name).lower() network = get_client().networks.get(guessed_network_name) except NotFound: raise RuntimeError("Couldn't identify network (check your project name)") return network.name
[docs]def get_subnet(project_name: str): """Find the subnet of the current project.""" network_name = get_network_name(project_name) network_info = get_client().networks.get(network_name).attrs return network_info['IPAM']['Config'][0]['Subnet'].split('/')[0]
[docs]def get_switch_ip(): """Find the main docker daemon IP to add routes.""" import socket cmd = r"""/bin/sh -c "ip addr show hvint0 | grep -oE '([0-9]{1,3}\.){3}[0-9]{1,3}'" """ res = get_client().containers.run( 'alpine', remove=True, tty=True, privileged=True, network_mode='host', pid_mode='host', command=cmd) ip_addr = res.strip().decode() try: socket.inet_aton(ip_addr) return ip_addr except socket.error: raise ValueError('{} is not a valid ip, check docker is running')
[docs]def get_running_containers(project_name: str) -> tuple: """Get the number of running containers and theirs details for the current stakkr instance.""" from requests import exceptions filters = { 'name': '{}_'.format(project_name), 'status': 'running', 'network': '{}_stakkr'.format(project_name).lower()} try: cts = get_client().containers.list(filters=filters) except exceptions.ConnectionError: raise exceptions.ConnectionError('Make sure docker is installed and running') __st__['cts_info'] = dict() for container in cts: container_info = _extract_container_info(project_name, container.id) __st__['cts_info'][container_info['name']] = container_info __st__['running_cts'] = len(cts) return __st__['running_cts'], __st__['cts_info']
[docs]def get_running_containers_names(project_name: str) -> list: """Get a list of compose names of running containers for the current stakkr instance.""" cts = get_running_containers(project_name)[1] return sorted([ct_data['compose_name'] for docker_name, ct_data in cts.items()])
[docs]def guess_shell(container: str) -> str: """By searching for binaries, guess what could be the primary shell available.""" container = get_client().containers.get(container) cmd = 'which -a bash sh' _, shells = container.exec_run(cmd, stdout=True, stderr=False) shells = shells.splitlines() if b'/bin/bash' in shells: return '/bin/bash' if b'/bin/sh' in shells: return '/bin/sh' raise EnvironmentError('Could not find a shell for that container')
[docs]def network_exists(network: str): """Return True if a network exists in docker, else False.""" try: get_client().networks.get(network) return True except NotFound: return False
def _allow_contact_subnet(project_name: str, container: str) -> bool: _, iptables = container.exec_run(['which', 'iptables']) iptables = iptables.decode().strip() if iptables == '': return False subnet = get_subnet(project_name) + '/24' # Allow internal network try: container.exec_run([iptables, '-D', 'OUTPUT', '-d', subnet, '-j', 'ACCEPT']) finally: container.exec_run([iptables, '-A', 'OUTPUT', '-d', subnet, '-j', 'ACCEPT']) return True def _extract_container_info(project_name: str, ct_id: str): """Get a hash of info about a container : name, ports, image, ip ...""" try: ct_data = get_api_client().inspect_container(ct_id) except NotFound: return None cts_info = { 'id': ct_id, 'name': ct_data['Name'].lstrip('/'), 'compose_name': ct_data['Config']['Labels']['com.docker.compose.service'], 'ports': _extract_host_ports(ct_data), 'image': ct_data['Config']['Image'], 'traefik_host': _get_traefik_host(ct_data['Config']['Labels']), 'ip': _get_ip_from_networks(project_name, ct_data['NetworkSettings']['Networks']), 'running': ct_data['State']['Running'] } return cts_info def _extract_host_ports(config: list): ports = [] for _, host_ports in config['HostConfig']['PortBindings'].items(): ports += [host_port['HostPort'] for host_port in host_ports] return ports def _get_ip_from_networks(project_name: str, networks: list): """Get the ip of a network.""" network_settings = {} if '{}_stakkr'.format(project_name) in networks: network_settings = networks['{}_stakkr'.format(project_name)] return network_settings['IPAddress'] if 'IPAddress' in network_settings else '' def _container_in_network(container: str, expected_network: str): """Return True if a container is in a network else false. Used by add_container_to_network.""" try: ct_data = get_api_client().inspect_container(container) except NotFound: raise LookupError('Container {} does not seem to exist'.format(container)) for connected_network in ct_data['NetworkSettings']['Networks'].keys(): if connected_network == expected_network: return True return False def _get_traefik_host(labels: list): if 'traefik.frontend.rule' not in labels: return 'No traefik rule' rules = labels['traefik.frontend.rule'].split(':') return rules[1]