Skip to content
This repository was archived by the owner on Dec 13, 2018. It is now read-only.

Commit 1394f45

Browse files
committed
do not reconfig haproxy when event is accumulating; restart haproxy when it dies
1 parent a51b58d commit 1394f45

6 files changed

Lines changed: 107 additions & 90 deletions

File tree

haproxy/eventhandler.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import config
1111
import helper.cloud_link_helper
12-
from haproxycfg import run_haproxy, Haproxy
12+
from haproxycfg import add_haproxy_run_task, Haproxy
1313
from utils import get_uuid_from_resource_uri
1414

1515
logger = logging.getLogger("haproxy")
@@ -30,16 +30,16 @@ def on_cloud_event(message):
3030
len(Haproxy.cls_linked_services.intersection(set(event.get("parents", [])))) > 0:
3131
msg = "Docker Cloud Event: %s %s is %s" % (
3232
event["type"], get_uuid_from_resource_uri(event.get("resource_uri", "")), event["state"].lower())
33-
run_haproxy(msg)
33+
add_haproxy_run_task(msg)
3434

3535
# Add/remove services linked to haproxy
3636
if event.get("state", "") == "Success" and config.HAPROXY_SERVICE_URI in event.get("parents", []):
37-
run_haproxy("Docker Cloud Event: New action is executed on the Haproxy container")
37+
add_haproxy_run_task("Docker Cloud Event: New action is executed on the Haproxy container")
3838

3939

4040
def on_websocket_open():
4141
helper.cloud_link_helper.LINKED_CONTAINER_CACHE.clear()
42-
run_haproxy("Websocket open")
42+
add_haproxy_run_task("Websocket open")
4343

4444

4545
def on_websocket_close():
@@ -48,10 +48,7 @@ def on_websocket_close():
4848

4949
def on_user_reload():
5050
Haproxy.cls_cfg = None
51-
if config.LINK_MODE == "legacy":
52-
logger.info("User reload is not supported in legacy link mode")
53-
else:
54-
run_haproxy("User reload")
51+
add_haproxy_run_task("User reload")
5552

5653

5754
def on_cloud_error(e):
@@ -74,24 +71,27 @@ def listen_dockercloud_events():
7471

7572

7673
def listen_docker_events():
77-
try:
78-
74+
while True:
7975
try:
80-
docker = docker_client()
81-
except:
82-
docker = docker_client(os.environ)
83-
84-
docker.ping()
85-
for event in docker.events(decode=True):
86-
logger.debug(event)
87-
attr = event.get("Actor", {}).get("Attributes")
88-
compose_project = attr.get("com.docker.compose.project", "")
89-
compose_service = attr.get("com.docker.compose.service", "")
90-
container_name = attr.get("name", "")
91-
event_action = event.get("Action", "")
92-
service = "%s_%s" % (compose_project, compose_service)
93-
if service in Haproxy.cls_linked_services and event_action in ["start", "die"]:
94-
msg = "Docker event: container %s %s" % (container_name, event_action)
95-
run_haproxy(msg)
96-
except APIError as e:
97-
logger.info("Docker API error: %s" % e)
76+
try:
77+
docker = docker_client()
78+
except:
79+
docker = docker_client(os.environ)
80+
81+
docker.ping()
82+
for event in docker.events(decode=True):
83+
logger.debug(event)
84+
attr = event.get("Actor", {}).get("Attributes")
85+
compose_project = attr.get("com.docker.compose.project", "")
86+
compose_service = attr.get("com.docker.compose.service", "")
87+
container_name = attr.get("name", "")
88+
event_action = event.get("Action", "")
89+
service = "%s_%s" % (compose_project, compose_service)
90+
if service in Haproxy.cls_linked_services and event_action in ["start", "die"]:
91+
msg = "Docker event: container %s %s" % (container_name, event_action)
92+
add_haproxy_run_task(msg)
93+
except APIError as e:
94+
logger.info("Docker API error: %s" % e)
95+
96+
time.sleep(1)
97+
add_haproxy_run_task("Reconnect docker events")

haproxy/haproxycfg.py

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import copy
22
import logging
3+
import time
34
from collections import OrderedDict
45

6+
import gevent
57
from compose.cli.docker_client import docker_client
68

79
import config
@@ -19,10 +21,30 @@
1921

2022
logger = logging.getLogger("haproxy")
2123

24+
tasks = None
2225

23-
def run_haproxy(msg=None):
24-
haproxy = Haproxy(config.LINK_MODE, msg)
25-
haproxy.update()
26+
27+
def add_haproxy_run_task(msg=None):
28+
logger.info("=> Add task: %s", msg)
29+
gevent.spawn(tasks.put, (config.LINK_MODE, msg))
30+
31+
32+
def run_haproxy():
33+
while True:
34+
delay = 1
35+
mode, msg = tasks.get()
36+
time.sleep(delay)
37+
while not tasks.empty():
38+
if mode != "cloud":
39+
delay = 0.1
40+
logger.info("=> Task accumulated, skip: %s", msg)
41+
mode, msg = tasks.get()
42+
time.sleep(delay)
43+
continue
44+
logger.info("=> Executing task: %s", msg)
45+
46+
haproxy = Haproxy(config.LINK_MODE)
47+
haproxy.update()
2648

2749

2850
class Haproxy(object):
@@ -32,10 +54,8 @@ class Haproxy(object):
3254
cls_certs = []
3355
cls_ca_certs = []
3456

35-
def __init__(self, link_mode="", msg=""):
57+
def __init__(self, link_mode=""):
3658
logger.info("==========BEGIN==========")
37-
if msg:
38-
logger.info(msg)
3959

4060
self.link_mode = link_mode
4161
self.ssl_bind_string = None
@@ -91,7 +111,7 @@ def _init_new_links():
91111
links, Haproxy.cls_linked_services = NewLinkHelper.get_new_links(docker, haproxy_container)
92112

93113
if ADDITIONAL_SERVICES:
94-
additional_links, additional_services= NewLinkHelper.get_additional_links(docker, ADDITIONAL_SERVICES)
114+
additional_links, additional_services = NewLinkHelper.get_additional_links(docker, ADDITIONAL_SERVICES)
95115
if additional_links and additional_services:
96116
links.update(additional_links)
97117
Haproxy.cls_linked_services.update(additional_services)
@@ -118,22 +138,17 @@ def update(self):
118138
logger.info("Internal error: Specs is not initialized")
119139

120140
def _update_haproxy(self, cfg):
121-
if self.link_mode in ["cloud", "new"]:
122-
if Haproxy.cls_cfg != cfg:
123-
logger.info("HAProxy configuration:\n%s" % cfg)
124-
Haproxy.cls_cfg = cfg
125-
if save_to_file(HAPROXY_CONFIG_FILE, cfg):
126-
Haproxy.cls_process = UpdateHelper.run_reload(Haproxy.cls_process)
127-
elif self.ssl_updated:
128-
logger.info("SSL certificates have been changed")
129-
Haproxy.cls_process = UpdateHelper.run_reload(Haproxy.cls_process)
130-
else:
131-
logger.info("HAProxy configuration remains unchanged")
132-
logger.info("===========END===========")
133-
elif self.link_mode in ["legacy"]:
141+
if Haproxy.cls_cfg != cfg:
134142
logger.info("HAProxy configuration:\n%s" % cfg)
143+
Haproxy.cls_cfg = cfg
135144
if save_to_file(HAPROXY_CONFIG_FILE, cfg):
136-
UpdateHelper.run_once()
145+
Haproxy.cls_process = UpdateHelper.run_reload(Haproxy.cls_process)
146+
elif self.ssl_updated:
147+
logger.info("SSL certificates have been changed")
148+
Haproxy.cls_process = UpdateHelper.run_reload(Haproxy.cls_process)
149+
else:
150+
logger.info("HAProxy configuration remains unchanged")
151+
logger.info("===========END===========")
137152

138153
def _config_ssl(self):
139154
ssl_bind_string = ""
@@ -198,8 +213,8 @@ def _config_global_section():
198213
if NBPROC > 1:
199214
statements.append("nbproc %s" % NBPROC)
200215
statements.append("stats bind-process %s" % NBPROC)
201-
for x in range(1, NBPROC+1):
202-
statements.append("cpu-map %s %s" % (x, x-1))
216+
for x in range(1, NBPROC + 1):
217+
statements.append("cpu-map %s %s" % (x, x - 1))
203218

204219
statements.extend(ConfigHelper.config_ssl_bind_options(SSL_BIND_OPTIONS))
205220
statements.extend(ConfigHelper.config_ssl_bind_ciphers(SSL_BIND_CIPHERS))

haproxy/helper/update_helper.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,6 @@
77
logger = logging.getLogger("haproxy")
88

99

10-
def run_once():
11-
logger.info("Launching HAProxy")
12-
p = subprocess.Popen(HAPROXY_RUN_COMMAND)
13-
logger.info("HAProxy has been launched(PID: %s)", str(p.pid))
14-
logger.info("===========END===========")
15-
p.wait()
16-
17-
1810
def run_reload(old_process):
1911
if old_process:
2012
# Reload haproxy

haproxy/main.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,31 @@
11
from gevent import monkey
2+
23
monkey.patch_all()
34

45
import logging
56
import os
67
import sys
78
import signal
89
import gevent
9-
10+
import time
1011
import dockercloud
1112
from compose.cli.docker_client import docker_client
13+
from gevent import queue
1214

1315
import config
1416
from config import DEBUG, PID_FILE, HAPROXY_CONTAINER_URI, HAPROXY_SERVICE_URI, API_AUTH
1517
from eventhandler import on_user_reload, listen_docker_events, listen_dockercloud_events
1618
from haproxy import __version__
17-
from haproxycfg import run_haproxy
19+
import haproxycfg
20+
from haproxycfg import add_haproxy_run_task, run_haproxy, Haproxy
1821
from utils import save_to_file
1922

2023
dockercloud.user_agent = "dockercloud-haproxy/%s" % __version__
2124

2225
logger = logging.getLogger("haproxy")
2326

27+
haproxycfg.tasks = queue.Queue()
28+
2429

2530
def create_pid_file():
2631
pid = str(os.getpid())
@@ -35,32 +40,50 @@ def main():
3540
logging.getLogger("python-dockercloud").setLevel(logging.DEBUG)
3641

3742
config.LINK_MODE = check_link_mode(HAPROXY_CONTAINER_URI, HAPROXY_SERVICE_URI, API_AUTH)
43+
3844
gevent.signal(signal.SIGUSR1, on_user_reload)
3945
gevent.signal(signal.SIGTERM, sys.exit)
4046

47+
gevent.spawn(run_haproxy)
48+
4149
pid = create_pid_file()
4250
logger.info("dockercloud/haproxy PID: %s" % pid)
4351

4452
if config.LINK_MODE == "cloud":
45-
listen_dockercloud_events()
53+
gevent.spawn(listen_dockercloud_events)
54+
4655
elif config.LINK_MODE == "new":
47-
run_haproxy("Initial start")
48-
while True:
49-
listen_docker_events()
50-
run_haproxy("Reconnect docker events")
56+
add_haproxy_run_task("Initial start")
57+
gevent.spawn(listen_docker_events)
5158

5259
elif config.LINK_MODE == "legacy":
53-
run_haproxy()
60+
add_haproxy_run_task("legacy link start")
61+
62+
while True:
63+
time.sleep(5)
64+
if Haproxy.cls_process:
65+
if is_process_running(Haproxy.cls_process):
66+
continue
67+
Haproxy.cls_cfg = None
68+
add_haproxy_run_task("haproxy %s died , restart" % Haproxy.cls_process.pid)
69+
70+
71+
def is_process_running(p):
72+
try:
73+
os.kill(p.pid, 0)
74+
return True
75+
except OSError:
76+
return False
5477

5578

5679
def check_link_mode(container_uri, service_uri, api_auth):
5780
if container_uri and service_uri and api_auth:
5881
if container_uri and service_uri:
5982
if api_auth:
60-
logger.info("dockercloud/haproxy %s has access to the Docker Cloud API - will reload list of backends" \
83+
logger.info("dockercloud/haproxy %s has access to the Docker Cloud API - will reload list of backends"
6184
" in real-time" % __version__)
6285
else:
63-
logger.info("dockercloud/haproxy %s is unable to access the Docker cloud API - you might want to" \
86+
logger.info("dockercloud/haproxy %s is unable to access the Docker cloud API - you might want to"
6487
" give an API role to this service for automatic backend reconfiguration" % __version__)
6588
return "cloud"
6689
else:

tests/unit/test_eventhandler.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ def tearDown(self):
2121
Haproxy.cls_linked_services = self.linked_services
2222
config.HAPROXY_SERVICE_URI = self.service_uri
2323

24-
@mock.patch("haproxy.eventhandler.run_haproxy")
25-
def test_event_reload_in_service_opterations(self, mock_run_haproxy):
24+
@mock.patch("haproxy.eventhandler.add_haproxy_run_task")
25+
def test_event_reload_in_service_opterations(self, mock_add_haproxy_run_task):
2626
not_triggered_event_01 = '{}'
2727
not_triggered_event_02 = '{"state": "In progress", "type": "container", "parents": ["/svc/a/"]}'
2828
not_triggered_event_03 = '{"state": "Pending", "type": "container", "parents": ["/svc/a/"]}'
@@ -43,7 +43,7 @@ def test_event_reload_in_service_opterations(self, mock_run_haproxy):
4343
on_cloud_event(not_triggered_event_08)
4444
on_cloud_event(not_triggered_event_09)
4545

46-
self.assertEqual(0, mock_run_haproxy.call_count)
46+
self.assertEqual(0, mock_add_haproxy_run_task.call_count)
4747

4848
triggered_event_01 = '{"state": "Stopped", "type": "container", "parents": ["/svc/a/"]}'
4949
triggered_event_02 = '{"state": "Started", "type": "container", "parents": ["/svc/a/"]}'
@@ -61,10 +61,10 @@ def test_event_reload_in_service_opterations(self, mock_run_haproxy):
6161
on_cloud_event(triggered_event_06)
6262
on_cloud_event(triggered_event_07)
6363

64-
self.assertEqual(7, mock_run_haproxy.call_count)
64+
self.assertEqual(7, mock_add_haproxy_run_task.call_count)
6565

66-
@mock.patch("haproxy.eventhandler.run_haproxy")
67-
def test_event_reload_in_link_opterations(self, mock_run_haproxy):
66+
@mock.patch("haproxy.eventhandler.add_haproxy_run_task")
67+
def test_event_reload_in_link_opterations(self, mock_add_haproxy_run_task):
6868
not_triggered_event_01 = '{}'
6969
not_triggered_event_02 = '{"state": "Failed", "parents": ["/svc/a/"]}'
7070
not_triggered_event_03 = '{"state": "Success", "parents": ["/svc/a/"]}'
@@ -74,11 +74,11 @@ def test_event_reload_in_link_opterations(self, mock_run_haproxy):
7474
on_cloud_event(not_triggered_event_02)
7575
on_cloud_event(not_triggered_event_03)
7676
on_cloud_event(not_triggered_event_04)
77-
self.assertEqual(0, mock_run_haproxy.call_count)
77+
self.assertEqual(0, mock_add_haproxy_run_task.call_count)
7878

7979
triggered_event_01 = '{"state": "Success", "parents": ["/svc/uuid/"]}'
8080
triggered_event_02 = '{"state": "Success", "parents": ["/svc/a/", "/svc/uuid/"]}'
8181

8282
on_cloud_event(triggered_event_01)
8383
on_cloud_event(triggered_event_02)
84-
self.assertEqual(2, mock_run_haproxy.call_count)
84+
self.assertEqual(2, mock_add_haproxy_run_task.call_count)

tests/unit/test_haproxycfg.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -83,19 +83,6 @@ def test_update_haproxy_cfg_no_updates(self, mock_init, mock_save, mock_run_relo
8383
self.assertFalse(mock_save.called)
8484
self.assertFalse(mock_run_reload.called)
8585

86-
@mock.patch("haproxy.haproxycfg.UpdateHelper.run_once")
87-
@mock.patch("haproxy.haproxycfg.UpdateHelper.run_reload")
88-
@mock.patch("haproxy.haproxycfg.save_to_file")
89-
@mock.patch.object(haproxycfg.Haproxy, '_initialize')
90-
def test_update_haproxy_cfg_only_once(self, mock_init, mock_save, mock_run_reload, mock_run_once):
91-
haproxy = Haproxy()
92-
haproxy.link_mode = "legacy"
93-
cfg = {"key": "value"}
94-
haproxy._update_haproxy(cfg)
95-
mock_save.assert_called_with(HAPROXY_CONFIG_FILE, cfg)
96-
self.assertFalse(mock_run_reload.called)
97-
self.assertTrue(mock_run_once)
98-
9986

10087
class HaproxyConfigSSLTestCase(unittest.TestCase):
10188
@mock.patch.object(haproxycfg.Haproxy, '_config_ssl_cacerts')

0 commit comments

Comments
 (0)