Skip to content

Commit 0baf3c6

Browse files
author
Jesse Pollak
committed
Merge pull request #6 from jessepollak/fix-thread-safety
Add `async_buffers` to fix thread safety issues
2 parents b6a89c7 + 8ef7b71 commit 0baf3c6

File tree

2 files changed

+48
-7
lines changed

2 files changed

+48
-7
lines changed

mixpanel_async/async_buffered_consumer.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import absolute_import
22
import json
3+
import copy
34
import threading
45
from datetime import datetime, timedelta
56
from mixpanel import BufferedConsumer as SynchronousBufferedConsumer
@@ -78,6 +79,8 @@ def __init__(self, flush_after=timedelta(0, 10), flush_first=True, max_size=20,
7879
self.flush_after = flush_after
7980
self.flush_first = flush_first
8081

82+
self._async_buffers = copy.deepcopy(self._buffers)
83+
8184
if not self.flush_first:
8285
self.last_flushed = datetime.now()
8386
else:
@@ -106,7 +109,7 @@ def _should_flush(self, endpoint=None):
106109
full = False
107110

108111
if endpoint:
109-
full = len(self._buffers[endpoint]) >= self._max_size
112+
full = len(self._async_buffers[endpoint]) >= self._max_size
110113

111114
# always flush the first event
112115
stale = self.last_flushed is None
@@ -145,10 +148,10 @@ def send(self, endpoint, json_message):
145148
:type json_message: str
146149
:raises: MixpanelException
147150
'''
148-
if endpoint not in self._buffers:
149-
raise MixpanelException('No such endpoint "{0}". Valid endpoints are one of {1}'.format(self._buffers.keys()))
151+
if endpoint not in self._async_buffers:
152+
raise MixpanelException('No such endpoint "{0}". Valid endpoints are one of {1}'.format(self._async_buffers.keys()))
150153

151-
buf = self._buffers[endpoint]
154+
buf = self._async_buffers[endpoint]
152155
buf.append(json_message)
153156

154157
should_flush = self._should_flush(endpoint)
@@ -186,6 +189,8 @@ def flush(self, endpoint=None, async=True):
186189
with self.flush_lock:
187190
if self._flush_thread_is_free():
188191

192+
self.transfer_buffers(endpoint=endpoint)
193+
189194
self.flushing_thread = FlushThread(self, endpoint=endpoint)
190195
self.flushing_thread.start()
191196

@@ -206,7 +211,8 @@ def flush(self, endpoint=None, async=True):
206211
flushing = False
207212

208213
else:
209-
self._sync_flush()
214+
self.transfer_buffers(endpoint=endpoint)
215+
self._sync_flush(endpoint=endpoint)
210216
flushing = True
211217

212218
if flushing:
@@ -215,6 +221,25 @@ def flush(self, endpoint=None, async=True):
215221
return flushing
216222

217223

224+
def transfer_buffers(self, endpoint=None):
225+
"""
226+
Transfer events from the `_async_buffers` where they are stored to the
227+
`_buffers` where they will be flushed from by the flushing thread.
228+
229+
:param endpoint (str): One of 'events' or 'people, the Mixpanel endpoint
230+
that is about to be flushed
231+
"""
232+
if endpoint:
233+
keys = [endpoint]
234+
else:
235+
keys = self._async_buffers.keys()
236+
237+
for key in keys:
238+
buf = self._async_buffers[key]
239+
while buf:
240+
self._buffers[key].append(buf.pop(0))
241+
242+
218243
def _flush_endpoint(self, endpoint, async=True):
219244
# we override flush with endpoint so as to keep all the
220245
# threading logic in one place, while still allowing individual

tests/async_buffered_consumer_test.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,13 @@
1515
try:
1616
from mock import Mock, patch, DEFAULT
1717
except ImportError:
18-
print 'mixpanel-python requires the mock package to run the test suite'
19-
raise
18+
raise Exception(
19+
"""
20+
mixpanel-python-async requires the mock package to run the test suite.
21+
Please run:
22+
23+
$ pip install mock
24+
""")
2025

2126
from mixpanel_async import AsyncBufferedConsumer
2227

@@ -129,6 +134,17 @@ def test_endpoint_events_get_flushed_instantly_with_max_size_1(self, sync_flush)
129134

130135
sync_flush.assert_called_once_with(endpoint=self.ENDPOINT)
131136

137+
def test_does_not_drop_events(self):
138+
self.consumer = AsyncBufferedConsumer(flush_first=True)
139+
send_patch = patch.object(self.consumer._consumer, 'send').start()
140+
141+
self.send_event()
142+
self.send_event()
143+
144+
self.wait_for_threads()
145+
146+
send_patch.assert_called_once_with(self.ENDPOINT, '[{"test": true}]')
147+
self.assertEqual(self.consumer._async_buffers[self.ENDPOINT], [self.JSON])
132148

133149
def send_event(self, endpoint=None):
134150
endpoint = endpoint or self.ENDPOINT

0 commit comments

Comments
 (0)