#!/usr/bin/python3

import os
import sys
import time
import unittest
import tempfile
import subprocess


class TestNetworkd(unittest.TestCase):
    @classmethod
    def setUpClass(klass):
        # enable those, so that net.agent will call if-*.d/
        subprocess.check_call(['systemctl', 'enable', '--quiet', 'systemd-networkd'])

    @classmethod
    def tearDownClass(klass):
        subprocess.check_call(['systemctl', 'disable', '--quiet', 'systemd-networkd'])

    def setUp(self):
        self.iface = 'test_eth42'
        self.if_router = 'router_eth42'
        self.dnsmasq = None
        self.workdir_obj = tempfile.TemporaryDirectory()
        self.workdir = self.workdir_obj.name
        self.config = '/run/systemd/network/test_eth42.network'
        os.makedirs(os.path.dirname(self.config), exist_ok=True)

        # avoid "Failed to open /dev/tty" errors in containers
        os.environ['SYSTEMD_LOG_TARGET'] = 'journal'

        # create if-*.d hooks
        for hook in ('up', 'post-down'):
            with open('/etc/network/if-%s.d/test-networkd' % hook, 'w') as f:
                f.write('''#!/bin/sh -e
L=%s/if-%s.log
if [ -e $L ]; then
echo "ERROR: $L already exists, called twice?" > $L
exit 1
fi
echo "called: args='$@' IFACE=$IFACE" > $L
networkctl status "$IFACE" >> $L''' % (self.workdir, hook))
                os.fchmod(f.fileno(), 0o755)

    def tearDown(self):
        self.shutdown_iface()
        if os.path.exists(self.config):
            os.unlink(self.config)
        subprocess.call(['systemctl', 'stop', 'systemd-networkd'])

        # let all networkd-if-up.d@.service jobs settle down
        timeout = 5
        while timeout > 0:
            out = subprocess.check_output(['systemctl', 'list-units', '--no-legend', '-tservice',
                                           'networkd-if-up.d@*']).strip()
            if not out:
                break
            time.sleep(1)
            timeout -= 1
        else:
            self.fail('timed out waiting for networkd-if-up.d@.service to settle:\n' + out)

    def create_iface(self, ipv6=False):
        '''Create test interface with DHCP server behind it'''

        # add veth pair
        subprocess.check_call(['ip', 'link', 'add', 'name', self.iface, 'type',
                               'veth', 'peer', 'name', self.if_router])

        # give our router an IP
        subprocess.check_call(['ip', 'a', 'flush', 'dev', self.if_router])
        subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.if_router])
        if ipv6:
            subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.if_router])
        subprocess.check_call(['ip', 'link', 'set', self.if_router, 'up'])

        # add DHCP server
        self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log')
        lease_file = os.path.join(self.workdir, 'dnsmasq.leases')
        if ipv6:
            extra_opts = ['--enable-ra', '--dhcp-range=2600::10,2600::20']
        else:
            extra_opts = []
        self.dnsmasq = subprocess.Popen(
            ['dnsmasq', '--keep-in-foreground', '--log-queries',
             '--log-facility=' + self.dnsmasq_log, '--conf-file=/dev/null',
             '--dhcp-leasefile=' + lease_file, '--bind-interfaces',
             '--interface=' + self.if_router, '--except-interface=lo',
             '--dhcp-range=192.168.5.10,192.168.5.200'] + extra_opts)

    def shutdown_iface(self):
        '''Remove test interface and stop DHCP server'''

        if self.if_router:
            subprocess.check_call(['ip', 'link', 'del', 'dev', self.if_router])
            self.if_router = None
        if self.dnsmasq:
            self.dnsmasq.kill()
            self.dnsmasq.wait()
            self.dnsmasq = None

    def do_test(self, coldplug=True, ipv6=False):
        with open(self.config, 'w') as f:
            f.write('''[Match]
Name=%s
[Network]
DHCP=yes''' % self.iface)

        if coldplug:
            # create interface first, then start networkd
            self.create_iface(ipv6=ipv6)
            subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
        else:
            # start networkd first, then create interface
            subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
            self.create_iface(ipv6=ipv6)

        subprocess.check_call(['/lib/systemd/systemd-networkd-wait-online',
                               '--interface', self.iface, '--timeout=10'])

        try:
            if ipv6:
                # check iface state and IP 6 address; FIXME: we need to wait a bit
                # longer, as the iface is "configure" already with IPv4
                timeout = 10
                while timeout > 0:
                    out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface])
                    if b'state UP' in out and 'scope global' in out:
                        break
                    time.sleep(1)
                    timeout -= 1

                self.assertRegex(out, b'inet6 2600::.* scope global .*dynamic')
                self.assertRegex(out, b'inet6 fe80::.* scope link')
            else:
                # should have link-local address on IPv6 only
                out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface])
                self.assertRegex(out, b'inet6 fe80::.* scope link')
                self.assertNotIn(b'scope global', out)

            # should have IPv4 address
            out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
            self.assertIn(b'state UP', out)
            self.assertRegex(out, b'inet 192.168.5.\d+/.* scope global dynamic')

            # check networkctl state
            out = subprocess.check_output(['networkctl'], universal_newlines=True)
            self.assertRegex(out, '%s\s+ether\s+routable\s+unmanaged' % self.if_router)
            self.assertRegex(out, '%s\s+ether\s+routable\s+configured' % self.iface)

            out = subprocess.check_output(['networkctl', 'status', self.iface],
                                          universal_newlines=True)
            self.assertRegex(out, 'Type:\s+ether')
            self.assertRegex(out, 'State:\s+routable.*configured')
            self.assertRegex(out, 'Address:\s+192.168.5.\d+')
            if ipv6:
                self.assertRegex(out, '2600::')
            else:
                self.assertNotIn('2600::', out)
            self.assertRegex(out, 'fe80::')
            self.assertRegex(out, 'Gateway:\s+192.168.5.1')
            self.assertRegex(out, 'DNS:\s+192.168.5.1')
        except AssertionError:
            # show dnsmasq log on failure
            with open(self.dnsmasq_log) as f:
                sys.stderr.write('\n\n----- dnsmasq log ----\n%s\n------\n' % f.read())
            raise

        # give if-up.d/ up to 5s to run
        up_log = os.path.join(self.workdir, 'if-up.log')
        down_log = os.path.join(self.workdir, 'if-post-down.log')
        timeout = 50
        while timeout > 0 and not os.path.exists(up_log):
            time.sleep(0.1)
            timeout -= 1

        # verify that if-up.d/ script ran
        with open(up_log) as f:
            out = f.read()
        self.assertNotIn('ERROR', out)
        self.assertRegex(out, 'called:.*IFACE=%s' % self.iface)
        self.assertIn(self.config, out)
        self.assertRegex(out, 'State:\s+routable.*configured')

        if os.path.exists(down_log):
            with open(down_log) as f:
                self.fail('Not expecting if-post-down.d to have run already:\n%s' % f.read())
        self.assertFalse(os.path.exists(down_log))

        # verify resolv.conf if it gets dynamically managed
        if os.path.islink('/etc/resolv.conf'):
            with open('/etc/resolv.conf') as f:
                self.assertIn('nameserver 192.168.5.1\n', f.read())

        if not coldplug:
            # check post-down.d hook
            self.shutdown_iface()

            # give if-post-down.d/ up to 5s to run
            timeout = 50
            while timeout > 0 and not os.path.exists(down_log):
                time.sleep(0.1)
                timeout -= 1

            # verify that if-post-down.d/ script ran
            with open(down_log) as f:
                out = f.read()
            self.assertNotIn('ERROR', out)
            self.assertRegex(out, 'called:.*IFACE=%s' % self.iface)
            # networkctl status should fail
            self.assertNotIn('State:', out)

    def test_coldplug_dhcp_ip4(self):
        self.do_test(coldplug=True, ipv6=False)

    def test_coldplug_dhcp_ip6(self):
        self.do_test(coldplug=True, ipv6=True)

    def test_hotplug_dhcp_ip4(self):
        self.do_test(coldplug=False, ipv6=False)

    def test_hotplug_dhcp_ip6(self):
        self.do_test(coldplug=False, ipv6=True)

if __name__ == '__main__':
    unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
                                                     verbosity=2))
