@@ -14,13 +14,8 @@ class ThreadMySQL(ThreadBase):
14
14
""" Polls mysql and inserts data into queue """
15
15
is_running = True
16
16
connection = None
17
- reconnect_attempt = 0
18
17
recovery_attempt = 0
19
18
reconnect_delay = 5
20
- max_reconnect = 30
21
- max_recovery = 10
22
- die_on_max_reconnect = True
23
- die_on_max_recovery = True
24
19
stats_checks = {}
25
20
check_lastrun = {}
26
21
@@ -37,6 +32,9 @@ def configure(self, config_dict):
37
32
self .username = config_dict .get ('mysql' ).get ('username' , 'root' )
38
33
self .password = config_dict .get ('mysql' ).get ('password' , '' )
39
34
35
+ self .max_reconnect = int (config_dict .get ('mysql' ).get ('max_reconnect' , 30 ))
36
+ self .max_recovery = int (config_dict .get ('mysql' ).get ('max_recovery' , 10 ))
37
+
40
38
#Set the stats checks for MySQL
41
39
for stats_type in config_dict .get ('mysql' ).get ('stats_types' ).split (',' ):
42
40
if config_dict .get ('mysql' ).get ('query_' + stats_type ) and \
@@ -56,11 +54,23 @@ def configure(self, config_dict):
56
54
return self .host , self .port , self .sleep_interval
57
55
58
56
def setup_connection (self ):
59
- try :
57
+ connection_attempt = 0
58
+
59
+ while self .max_reconnect == 0 or connection_attempt <= self .max_reconnect :
60
+ try :
60
61
self .connection = mdb .connect (host = self .host , user = self .username , port = self .port , passwd = self .password )
61
62
return self .connection
62
- except Exception :
63
- self .reconnect ()
63
+ except Exception :
64
+ pass
65
+
66
+ # If we got here, connection failed
67
+ connection_attempt += 1
68
+ time .sleep (self .reconnect_delay )
69
+ print ('Attempting reconnect #{0}...' .format (connection_attempt ))
70
+
71
+ # If we get out of the while loop, we've passed max_reconnect
72
+ raise ThreadMySQLMaxReconnectException
73
+
64
74
65
75
def stop (self ):
66
76
""" Stop running this thread and close connection """
@@ -132,18 +142,9 @@ def _preprocess(self, check_type, column_names, rows):
132
142
133
143
return executing_class .process (rows , * extra_args )
134
144
135
- def reconnect (self ):
136
- if self .die_on_max_reconnect and self .reconnect_attempt >= self .max_reconnect :
137
- raise ThreadMySQLMaxReconnectException
138
-
139
- self .reconnect_attempt += 1
140
- print ('Attempting reconnect #{0}...' .format (self .reconnect_attempt ))
141
- time .sleep (self .reconnect_delay )
142
- self .setup_connection ()
143
-
144
145
def recover_errors (self , ex ):
145
146
"""Decide whether we should continue."""
146
- if self .die_on_max_recovery and self .recovery_attempt >= self .max_recovery :
147
+ if self .max_recovery > 0 and self .recovery_attempt >= self .max_recovery :
147
148
print ("Giving up after {} consecutive errors" .format (self .recovery_attempt ))
148
149
raise
149
150
@@ -162,10 +163,8 @@ def run(self):
162
163
self .setup_connection ()
163
164
164
165
while self .is_running :
165
- if self .connection .open :
166
- self .reconnect_attempt = 0
167
- else :
168
- self .reconnect ()
166
+ if not self .connection .open :
167
+ self .setup_connection ()
169
168
170
169
try :
171
170
self ._run ()
0 commit comments