Skip to content

Commit 616e25a

Browse files
author
Jesse Pollak
committed
Merge branch 'master' of github.com:jessepollak/mixpanel-python-async
2 parents 73f1d50 + 4bcf3d7 commit 616e25a

File tree

2 files changed

+22
-17
lines changed

2 files changed

+22
-17
lines changed

mixpanel_async/async_buffered_consumer.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import threading
55
from datetime import datetime, timedelta
66
from mixpanel import BufferedConsumer as SynchronousBufferedConsumer
7+
from mixpanel import MixpanelException
78

89
class FlushThread(threading.Thread):
910
'''
@@ -44,18 +45,18 @@ class AsyncBufferedConsumer(SynchronousBufferedConsumer):
4445
4546
Because AsyncBufferedConsumer holds events until the `flush_after` timeout
4647
or an endpoint queue hits the size of _max_queue_size, you should call
47-
flush(async=False) before you terminate any process where you have been
48+
flush(async=False) before you terminate any process where you have been
4849
using the AsyncBufferedConsumer.
4950
'''
5051

5152
# constants used in the _should_flush method
5253
ALL = "ALL"
5354
ENDPOINT = "ENDPOINT"
5455

55-
def __init__(self, flush_after=timedelta(0, 10), flush_first=True, max_size=20,
56+
def __init__(self, flush_after=timedelta(0, 10), flush_first=True, max_size=20,
5657
events_url=None, people_url=None, *args, **kwargs):
5758
'''
58-
Create a new instance of a AsyncBufferedConsumer class.
59+
Create a new instance of a AsyncBufferedConsumer class.
5960
6061
:param flush_after (datetime.timedelta): the time period after which
6162
the AsyncBufferedConsumer will flush the events upon receiving a
@@ -68,12 +69,12 @@ def __init__(self, flush_after=timedelta(0, 10), flush_first=True, max_size=20,
6869
:param people_url: the Mixpanel API URL that people events will be sent to
6970
'''
7071
super(AsyncBufferedConsumer, self).__init__(
71-
max_size=max_size,
72-
events_url=events_url,
72+
max_size=max_size,
73+
events_url=events_url,
7374
people_url=people_url
7475
)
7576

76-
# remove the minimum max size that the SynchronousBufferedConsumer
77+
# remove the minimum max size that the SynchronousBufferedConsumer
7778
# class sets
7879
self._max_size = max_size
7980
self.flush_after = flush_after
@@ -149,7 +150,7 @@ def send(self, endpoint, json_message):
149150
:raises: MixpanelException
150151
'''
151152
if endpoint not in self._async_buffers:
152-
raise MixpanelException('No such endpoint "{0}". Valid endpoints are one of {1}'.format(self._async_buffers.keys()))
153+
raise MixpanelException('No such endpoint "{0}". Valid endpoints are one of {1}'.format(endpoint, self._async_buffers.keys()))
153154

154155
buf = self._async_buffers[endpoint]
155156
buf.append(json_message)
@@ -175,8 +176,8 @@ def flush(self, endpoint=None, async=True):
175176
thrown will have a message property, containing the text of the message,
176177
and an endpoint property containing the endpoint that failed.
177178
178-
179-
:param endpoint (str): One of 'events' or 'people, the Mixpanel endpoint
179+
180+
:param endpoint (str): One of 'events' or 'people, the Mixpanel endpoint
180181
for sending the data
181182
:param async (bool): Whether to flush the data in a seperate thread or not
182183
'''
@@ -209,7 +210,7 @@ def flush(self, endpoint=None, async=True):
209210
# event is added this second flush will be retriggered and
210211
# will complete.
211212
flushing = False
212-
213+
213214
else:
214215
self.transfer_buffers(endpoint=endpoint)
215216
self._sync_flush(endpoint=endpoint)
@@ -226,17 +227,17 @@ def transfer_buffers(self, endpoint=None):
226227
Transfer events from the `_async_buffers` where they are stored to the
227228
`_buffers` where they will be flushed from by the flushing thread.
228229
229-
:param endpoint (str): One of 'events' or 'people, the Mixpanel endpoint
230+
:param endpoint (str): One of 'events' or 'people, the Mixpanel endpoint
230231
that is about to be flushed
231232
"""
232-
if endpoint:
233+
if endpoint:
233234
keys = [endpoint]
234235
else:
235236
keys = self._async_buffers.keys()
236237

237238
for key in keys:
238239
buf = self._async_buffers[key]
239-
while buf:
240+
while buf:
240241
self._buffers[key].append(buf.pop(0))
241242

242243

tests/async_buffered_consumer_test.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
except ImportError:
1818
raise Exception(
1919
"""
20-
mixpanel-python-async requires the mock package to run the test suite.
21-
Please run:
20+
mixpanel-python-async requires the mock package to run the test suite.
21+
Please run:
2222
2323
$ pip install mock
2424
""")
@@ -32,7 +32,7 @@ class AsyncBufferedConsumerTestCase(unittest.TestCase):
3232

3333
def setUp(self):
3434
self.consumer = AsyncBufferedConsumer(
35-
max_size=self.MAX_SIZE,
35+
max_size=self.MAX_SIZE,
3636
flush_first=False
3737
)
3838

@@ -58,7 +58,7 @@ def test_sync_flush_calls_buffered_consumer_flush_endpoint(self, flush_endpoint)
5858
flush_endpoint.assert_called_with(self.ENDPOINT)
5959

6060

61-
@patch.object(AsyncBufferedConsumer, '_sync_flush')
61+
@patch.object(AsyncBufferedConsumer, '_sync_flush')
6262
def test_flush_gets_called_in_different_thread_if_async(self, sync_flush):
6363
main_thread_id = thread.get_ident()
6464
flush_thread_id = None
@@ -146,6 +146,10 @@ def test_does_not_drop_events(self):
146146
send_patch.assert_called_once_with(self.ENDPOINT, '[{"test": true}]')
147147
self.assertEqual(self.consumer._async_buffers[self.ENDPOINT], [self.JSON])
148148

149+
def test_raises_exception_with_bad_endpoint(self):
150+
with self.assertRaises(mixpanel.MixpanelException):
151+
self.consumer.send('badendpoint', True)
152+
149153
def send_event(self, endpoint=None):
150154
endpoint = endpoint or self.ENDPOINT
151155
self.consumer.send(endpoint, self.JSON)

0 commit comments

Comments
 (0)