-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdocker-alias-daemon.py
More file actions
82 lines (63 loc) · 2.23 KB
/
docker-alias-daemon.py
File metadata and controls
82 lines (63 loc) · 2.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os.path
import sys
import time
import traceback
import click
import daemoniker
from psutil import pid_exists
from lib.config import INIConfig
from lib.shim_binary import ShimBinaryManager, collect_defined_shim_binaries
DEFAULT_PID_FILE = os.path.join(INIConfig().get_config_dir(), "docker-alias.pid")
def get_pid_file() -> str:
return os.environ.get("DOCKER_ALIAS_PID_FILE", DEFAULT_PID_FILE)
class Daemon:
def __init__(self) -> None:
self._shim_binary_manager = ShimBinaryManager()
def run(self) -> None:
while True:
try:
ini_config = INIConfig()
if not ini_config.is_enabled():
self._shim_binary_manager.remove_all()
time.sleep(10)
continue
defined_shim_binaries = collect_defined_shim_binaries(ini_config)
self._shim_binary_manager.sync(defined_shim_binaries)
if not ini_config.is_enabled():
# Handle case where configuration toggled while syncing
self._shim_binary_manager.remove_all()
continue
time.sleep(10)
except Exception:
traceback.print_exc()
@click.group()
def cli() -> None:
pass
@cli.command("start")
@click.option("--no-daemon", is_flag=True, flag_value=True, default=False)
def start(no_daemon: bool) -> None:
if no_daemon:
try:
Daemon().run()
except KeyboardInterrupt:
pass
else:
with daemoniker.Daemonizer() as (_, daemonizer):
try:
is_parent, *_ = daemonizer(get_pid_file())
Daemon().run()
except SystemExit as e:
if str(e) == 'Unable to acquire PID file.':
with open(get_pid_file()) as f:
pid = int(f.read())
if pid_exists(pid):
sys.exit(0)
os.remove(get_pid_file())
@cli.command("stop")
def stop() -> None:
if not os.path.isfile(get_pid_file()):
print("Warning: docker-alias was not running!")
else:
daemoniker.send(get_pid_file(), daemoniker.SIGINT)
if __name__ == '__main__':
cli()