Skip to content

Commit b74f5ec

Browse files
authored
Merge pull request #31 from DevoInc/release-next
Refactoring code, and fixes in API
2 parents fa3e434 + 656180c commit b74f5ec

File tree

10 files changed

+129
-96
lines changed

10 files changed

+129
-96
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
55
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
66

7+
## [1.6.0] - 2019-01-09
8+
#### Changed
9+
* Mild refactoring of Sender class
10+
* Refactoring API Response processing
11+
* Typos in docs
12+
* API Socket recv size
13+
14+
#### Fixed
15+
* API responses blank lines and splitted lines
16+
* Problems with API CLI and automatic shutdowns
17+
718

819
## [1.5.1] - 2018-12-28
920
#### Fixed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
[![master Build Status](https://travis-ci.com/DevoInc/python-sdk.svg?branch=master)](https://travis-ci.com/DevoInc/python-sdk) [![LICENSE](https://img.shields.io/dub/l/vibe-d.svg)](https://github.com/DevoInc/python-sdk/blob/master/LICENSE)
33

4-
[![wheel](https://img.shields.io/badge/wheel-yes-brightgreen.svg)](https://pypi.org/project/devo-sdk/) [![version](https://img.shields.io/badge/version-1.5.1-blue.svg)](https://pypi.org/project/devo-sdk/) [![python](https://img.shields.io/badge/python-2.7%20%7C%203.3%20%7C%203.4%20%7C%203.5%20%7C%203.6%20%7C%203.7-blue.svg)](https://pypi.org/project/devo-sdk/)
4+
[![wheel](https://img.shields.io/badge/wheel-yes-brightgreen.svg)](https://pypi.org/project/devo-sdk/) [![version](https://img.shields.io/badge/version-1.6.0-blue.svg)](https://pypi.org/project/devo-sdk/) [![python](https://img.shields.io/badge/python-2.7%20%7C%203.3%20%7C%203.4%20%7C%203.5%20%7C%203.6%20%7C%203.7-blue.svg)](https://pypi.org/project/devo-sdk/)
55

66

77
# Devo Python SDK

devo/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
__description__ = 'Devo Python Library.'
22
__url__ = 'http://www.devo.com'
3-
__version__ = "1.5.1"
3+
__version__ = "1.6.0"
44
__author__ = 'Devo'
55
__author_email__ = 'support@devo.com'
66
__license__ = 'MIT'

devo/api/client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,10 @@ def query(self, **kwargs):
157157
:param kwargs: query -> Query to perform
158158
:param kwargs: query_id -> Query ID to perform the query
159159
:param kwargs: dates -> Dict with "from" and "to" keys
160-
:param kwargs: proccessor -> proccessor for each row of the response,
160+
:param kwargs: processor -> processor for each row of the response,
161161
if stream
162162
:param kwargs: stream -> if stream or full response: Object with
163-
options of query: proccessor, if stream
163+
options of query: processor, if stream
164164
:param kwargs: response -> response format
165165
:return: Result of the query (dict) or Buffer object
166166
"""
@@ -268,11 +268,11 @@ def _call_stream(self, payload=None):
268268
self.socket.send(self._get_stream_headers(payload))
269269
if not self.buffer.close and not self.buffer.error\
270270
and self.socket is not None:
271-
result, data = self.buffer.proccess_first_line(
272-
self.socket.recv(5000))
271+
result, data = self.buffer.process_first_line(
272+
self.socket.recv(4096))
273273
if result:
274274
try:
275-
while self.buffer.proccess_recv(self.socket.recv(5000)):
275+
while self.buffer.decode(self.socket.recv(4096)):
276276
pass
277277
except socket.timeout:
278278
while not self.buffer.is_empty() or self.buffer.close:
@@ -357,7 +357,7 @@ def _get_stream_headers(self, payload):
357357
"""
358358
tstamp = str(int(time.time()) * 1000)
359359

360-
headers = ("POST /%s HTTP/1.1\r\n"
360+
headers = ("POST /%s HTTP/2.0\r\n"
361361
"Host: %s\r\n"
362362
"Content-Type: application/json\r\n"
363363
"Content-Length: %s \r\n"

devo/api/scripts/client_cli.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import sys
44
import os
55
import click
6+
import time
67
from devo.common import Configuration
78
from devo.api import Client, DevoClientException
89
from devo.common.data.buffer import Buffer
@@ -42,11 +43,11 @@ def cli():
4243
help='Flag for make streaming query or full query with '
4344
'start and end. Default is true', default=True)
4445
@click.option('--proc', help='if flag exists, dont return raw query reply. In '
45-
'compact replies you receive proccessed lines.',
46+
'compact replies you receive processed lines.',
4647
is_flag=True)
4748
@click.option('--output', help='File path to store query response if not want '
4849
'stdout')
49-
@click.option('--format', '-f', default="json/simple/compact",
50+
@click.option('--response', '-r', default="json/simple/compact",
5051
help='The output format. Default is json/simple/compact')
5152
@click.option('--from', default=None,
5253
help='From date, and time for the query (YYYY-MM-DD hh:mm:ss). '
@@ -65,7 +66,7 @@ def query(**kwargs):
6566
dates={"from": config['from'],
6667
"to": config['to'] if "to" in config.keys()
6768
else None},
68-
format=config['format'],
69+
response=config['response'],
6970
stream=config['stream'])
7071

7172
process_response(buffer, config)
@@ -79,19 +80,19 @@ def identify_response(response, config):
7980
Identify what type of response are we received from Client API
8081
:param response: data received from Devo Client API
8182
:param config: array with launch options
82-
:return: proccessed line (List or string normally)
83+
:return: processed line (List or string normally)
8384
"""
8485
return {
8586
'json': lambda x, y: proc_default(x) if y else x,
8687
'json/compact': lambda x, y: x,
8788
'json/simple': lambda x, y: list(x),
8889
'json/simple/compact': lambda x, y: list(x)
89-
}[config['format']](response, config['proc'])
90+
}[config['response']](response, config['proc'])
9091

9192

9293
def process_response(response, config):
9394
"""
94-
Proccess responses from Client API
95+
process responses from Client API
9596
:param response: data received from Devo API
9697
:param config: array with launch options
9798
:return: None

devo/common/data/buffer.py

Lines changed: 47 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ class Buffer(object):
1818
def __init__(self, buffer_max_size=1000, api_response="json/compact"):
1919
self.queue = Queue.Queue(maxsize=buffer_max_size)
2020
self.thread = None
21-
self.temp = None
21+
self.temp = ""
22+
self.temp_event = ""
23+
self.octet = 0
2224
self.error = None
2325
self.close = False
24-
self.timeout = 0
25-
self.response_split = "\n" if api_response is "csv" else "\r\n "
26+
self.timeout = None
27+
self.response_split = "\n" if api_response is "csv" else "\r\n"
2628

2729
def is_alive(self):
2830
return self.thread.isAlive()
@@ -42,58 +44,74 @@ def start(self):
4244
""" Function for call the threat start """
4345
self.thread.start()
4446

45-
def get(self, proccessor=None, block=True, timeout=None):
46-
""" Get one proccessed item from the queue """
47+
def get(self, processor=None, block=True, timeout=None):
48+
""" Get one processed item from the queue """
4749
if timeout is None:
4850
timeout = self.timeout
4951

5052
if not self.error:
5153
try:
52-
return proccessor(self.queue.get(block=block, timeout=timeout)) \
53-
if proccessor is not None \
54+
return processor(self.queue.get(block=block, timeout=timeout)) \
55+
if processor is not None \
5456
else self.queue.get(block=block, timeout=timeout)
5557
except Queue.Empty:
5658
self.close = True
5759
else:
5860
raise DevoBufferException("Devo-Buffer|%s" % str(self.error))
5961

60-
def proccess_first_line(self, data):
61-
""" Proccess first line of the Query call (For delete headers) """
62+
def process_first_line(self, data):
63+
""" process first line of the Query call (For delete headers) """
6264
if not isinstance(data, str):
6365
data = data.decode('utf8')
6466

6567
if "200 OK" in data.split("\r\n\r\n")[0]:
66-
self.proccess_recv(data[data.find("\r\n\r\n")+4:])
67-
return True, None
68+
return self.decode(data[data.find("\r\n\r\n")+4:]), None
6869

6970
self.error = data
7071
return False, data
7172

72-
def size(self):
73-
""" Verify queue size """
74-
return self.queue.qsize()
75-
76-
def proccess_recv(self, data):
77-
""" Proccess received data """
73+
def decode(self, data):
7874
if not isinstance(data, str):
7975
data = data.decode('utf8')
76+
return self.buffering(data)
8077

81-
data = data[data.find("\r\n") + 2:].split(self.response_split)
78+
def buffering(self, data):
79+
if not self.octet:
80+
pointer = data.find("\r\n") + 2
81+
size = int(data[:pointer], 16)
82+
data = self.temp_event + data[pointer:]
83+
self.octet = len(self.temp_event) + size
84+
self.temp_event = ""
8285

83-
data_len = len(data)
84-
if self.temp is not None:
85-
data[0] = self.temp + data[0]
86+
if len(self.temp + data) < self.octet or \
87+
(not self.octet and not data.find("\r\n")):
88+
self.temp += data
89+
return not self.close
8690

87-
if len(data) > 1:
88-
for aux in range(0, data_len - 1):
89-
self.queue.put(data[aux].strip(), block=True)
91+
data = self.temp + data
92+
self.temp = data[self.octet + 2:]
93+
return self.process_recv(data[:self.octet])
9094

91-
if data[data_len-1][-4:] == "\r\n\r\n":
92-
self.queue.put(data[data_len-1][:-4].strip(), block=True)
93-
self.temp = None
94-
else:
95-
self.temp = data[data_len-1][:-2].strip()
95+
def size(self):
96+
""" Verify queue size """
97+
return self.queue.qsize()
9698

99+
def process_recv(self, data):
100+
""" process received data """
101+
data_list = data.strip().split(self.response_split)
102+
if data[-(len(self.response_split)):] != self.response_split:
103+
self.temp_event = data_list.pop()
104+
105+
for aux in range(0, len(data_list)):
106+
self.queue.put(data_list[aux].strip(), block=True)
107+
108+
self.octet = 0
109+
if len(self.temp.strip()):
110+
data = self.temp
111+
self.temp = ""
112+
if data.find("\r\n") == 0:
113+
data = data[2:]
114+
return self.buffering(data)
97115
return not self.close
98116

99117
def close(self):

0 commit comments

Comments
 (0)