6
6
import json
7
7
import socket
8
8
import ssl
9
- import requests
10
9
import sys
10
+ import requests
11
11
from devo .common import DateParser , Buffer
12
12
13
13
PY3 = sys .version_info [0 ] > 2
14
14
15
15
16
16
class DevoClientException (Exception ):
17
17
""" Default Devo Client Exception """
18
- pass
19
18
20
19
21
20
if not PY3 :
@@ -25,10 +24,12 @@ def __init__(self, *args, **kwargs): # real signature unknown
25
24
pass
26
25
27
26
28
- class Client ( object ) :
27
+ class Client :
29
28
"""
30
29
The Devo SERach REst Api main class
31
30
"""
31
+ CLIENT_DEFAULT_APP_NAME = 'python-sdk-app'
32
+ CLIENT_DEFAULT_USER = 'python-sdk-user'
32
33
URL_AWS_EU = 'https://api-eu.logtrust.com'
33
34
URL_VDC = 'https://spainapi.logtrust.com'
34
35
URL_AWS_USA = 'https://api-us.logtrust.com'
@@ -44,11 +45,11 @@ def __init__(self, *args, **kwargs):
44
45
:param buffer: Buffer object, if want another diferent queue
45
46
"""
46
47
self .time_start = int (round (time .time () * 1000 ))
47
- if len (args ) is 3 :
48
+ if len (args ) == 3 :
48
49
self .key = args [0 ]
49
50
self .secret = args [1 ]
50
51
url = args [2 ]
51
- elif len ( args ) is 0 :
52
+ elif not args :
52
53
self .key = kwargs .get ("key" ,
53
54
kwargs .get ("api_key" ,
54
55
kwargs .get ("apiKey" , None )))
@@ -65,9 +66,12 @@ def __init__(self, *args, **kwargs):
65
66
"3 arguments: key, secret and url, "
66
67
"in that order. " )
67
68
69
+ self .user = kwargs .get ('user' , self .CLIENT_DEFAULT_USER )
70
+ self .app_name = kwargs .get ('app_name' , self .CLIENT_DEFAULT_APP_NAME )
68
71
self .token = kwargs .get ("token" ,
69
- kwargs .get ("auth_token" ,
70
- kwargs .get ("authToken" , None )))
72
+ kwargs .get (
73
+ "auth_token" ,
74
+ kwargs .get ("authToken" , None )))
71
75
72
76
self .jwt = kwargs .get ("jwt" , None )
73
77
@@ -95,11 +99,25 @@ def __get_url_parts(self, url):
95
99
Split the two parts of the api url
96
100
:param url: Url of the api
97
101
"""
98
- return self .__verify_url_complement (url .split ("//" )[- 1 ]
99
- .split ("/" , maxsplit = 1 )
100
- if PY3
101
- else url .split ("//" )[- 1 ]
102
- .split ("/" , 1 ))
102
+ return self .__verify_url_complement (
103
+ url .split ("//" )[- 1 ].split ("/" , maxsplit = 1 ) if PY3
104
+ else url .split ("//" )[- 1 ].split ("/" , 1 ))
105
+
106
+ def __generate_pragmas (self , comment = None ):
107
+ """
108
+ Generate pragmas to add to query
109
+ :comment: Pragma comment free
110
+ :user: Pragma comment user
111
+ :app_name: Pragma comment id. App name.
112
+ """
113
+ str_pragmas = ' pragma comment.id:"{}" ' \
114
+ 'pragma comment.user:"{}"' \
115
+ .format (self .app_name , self .user )
116
+
117
+ if comment :
118
+ return str_pragmas + ' pragma comment.free:"{}"' .format (comment )
119
+
120
+ return str_pragmas
103
121
104
122
def __verify_url_complement (self , url_list ):
105
123
"""
@@ -118,7 +136,7 @@ def from_config(config):
118
136
return Client (** config )
119
137
120
138
@staticmethod
121
- def generate_dates (dates ):
139
+ def __generate_dates (dates ):
122
140
"""
123
141
Generate and merge dates object
124
142
:param dates: object with optios for query, see doc
@@ -149,36 +167,42 @@ def query(self, **kwargs):
149
167
150
168
query = kwargs .get ('query' , None )
151
169
query_id = kwargs .get ('query_id' , None )
152
- dates = self .generate_dates (kwargs .get ('dates' , None ))
170
+ dates = self .__generate_dates (kwargs .get ('dates' , None ))
153
171
stream = kwargs .get ('stream' , True )
154
172
processor = kwargs .get ('processor' , None )
173
+ if query is not None :
174
+ query += self .__generate_pragmas (comment = kwargs .get ('comment' , None ))
155
175
156
176
opts = {'limit' : kwargs .get ('limit' , None ),
157
177
'response' : kwargs .get ('response' , self .response ),
158
178
'offset' : kwargs .get ('offset' , None ),
159
- 'destination' : kwargs .get ('destination' , None )}
179
+ 'destination' : kwargs .get ('destination' , None )
180
+ }
181
+
182
+ if not self .__stream_available (opts ['response' ]) or not stream :
183
+ if not dates ['to' ]:
184
+ dates ['to' ] = "now()"
160
185
161
- if self .stream_available (opts ['response' ]) or not stream :
162
186
return self ._call (
163
187
self ._get_payload (query , query_id , dates , opts ),
164
188
processor
165
189
)
166
- else :
167
- if self .socket is None :
168
- self .connect ()
169
- elif not self .status ():
170
- self .connect ()
171
-
172
- if self .buffer is None :
173
- self .buffer = Buffer ()
174
- self .buffer .create_thread (
175
- target = self ._call_stream ,
176
- kwargs = ({'payload' : self ._get_payload (query , query_id ,
177
- dates , opts )})
178
- )
179
190
180
- self .buffer .start ()
181
- return self .buffer
191
+ if self .socket is None :
192
+ self .connect ()
193
+ elif not self .status ():
194
+ self .connect ()
195
+
196
+ if self .buffer is None :
197
+ self .buffer = Buffer ()
198
+ self .buffer .create_thread (
199
+ target = self ._call_stream ,
200
+ kwargs = ({'payload' : self ._get_payload (query , query_id ,
201
+ dates , opts )})
202
+ )
203
+
204
+ self .buffer .start ()
205
+ return self .buffer
182
206
183
207
def status (self ):
184
208
"""
@@ -187,19 +211,21 @@ def status(self):
187
211
timeit = int (round (time .time () * 1000 )) - self .time_start
188
212
if self .socket is None :
189
213
return False
190
- elif self .timeout < timeit :
214
+
215
+ if self .timeout < timeit :
191
216
self .close ()
192
217
return False
218
+
193
219
return True
194
220
195
221
@staticmethod
196
- def stream_available (resp ):
222
+ def __stream_available (resp ):
197
223
"""
198
224
Verify if can stream resp from API by type of resp in opts
199
225
:param resp: str
200
226
:return: bool
201
227
"""
202
- return resp == "json" or resp == "json/compact"
228
+ return resp not in [ "json" , "json/compact" ]
203
229
204
230
# API Call
205
231
def _call (self , payload , processor ):
@@ -212,11 +238,11 @@ def _call(self, payload, processor):
212
238
tries = 0
213
239
while tries < self .retries :
214
240
try :
215
- response = requests .post ("https://%s/%s" %
216
- (self .url , self .query_url ),
217
- data = payload ,
218
- headers = self ._get_no_stream_headers (payload ),
219
- verify = True , timeout = self .timeout )
241
+ response = requests .post (
242
+ "https://%s/%s" % (self .url , self .query_url ),
243
+ data = payload ,
244
+ headers = self ._get_no_stream_headers (payload ),
245
+ verify = True , timeout = self .timeout )
220
246
except ConnectionError as error :
221
247
return {"status" : 404 , "error" : error }
222
248
@@ -225,13 +251,12 @@ def _call(self, payload, processor):
225
251
"error" in response .text [0 :15 ].lower ():
226
252
return {"status" : response .status_code ,
227
253
"error" : response .text }
228
- else :
229
- if processor is not None :
230
- return processor (response .text )
231
- return response .text
232
- else :
233
- tries += 1
234
- time .sleep (self .sleep )
254
+
255
+ if processor is not None :
256
+ return processor (response .text )
257
+ return response .text
258
+ tries += 1
259
+ time .sleep (self .sleep )
235
260
return {}
236
261
237
262
def _call_stream (self , payload = None ):
@@ -241,8 +266,10 @@ def _call_stream(self, payload=None):
241
266
"""
242
267
if self .socket is not None :
243
268
self .socket .send (self ._get_stream_headers (payload ))
244
- if not self .buffer .close and not self .buffer .error and self .socket is not None :
245
- result , data = self .buffer .proccess_first_line (self .socket .recv (5000 ))
269
+ if not self .buffer .close and not self .buffer .error \
270
+ and self .socket is not None :
271
+ result , data = self .buffer .proccess_first_line (
272
+ self .socket .recv (5000 ))
246
273
if result :
247
274
try :
248
275
while self .buffer .proccess_recv (self .socket .recv (5000 )):
@@ -267,9 +294,8 @@ def _get_payload(query, query_id, dates, opts):
267
294
:return: Return the formed payload
268
295
"""
269
296
payload = {"from" : int (DateParser .default_from (dates ['from' ]) / 1000 ),
270
- "to" : int (DateParser .default_to (dates ['to' ]) / 1000 )
271
- if dates ['to' ] is not None else None ,
272
- "query" : query , "queryId" : query_id ,
297
+ "to" : int (DateParser .default_to (dates ['to' ]) / 1000 ) if
298
+ dates ['to' ] is not None else None ,
273
299
"mode" : {"type" : opts ['response' ]}}
274
300
275
301
if query :
@@ -305,21 +331,23 @@ def _get_no_stream_headers(self, data):
305
331
'x-logtrust-timestamp' : tstamp ,
306
332
'x-logtrust-sign' : sign
307
333
}
308
- elif self .token :
334
+
335
+ if self .token :
309
336
return {
310
337
'Content-Type' : 'application/json' ,
311
338
'x-logtrust-timestamp' : tstamp ,
312
339
'Authorization' : "Bearer %s" % self .token
313
340
}
314
- elif self .jwt :
341
+
342
+ if self .jwt :
315
343
return {
316
344
'Content-Type' : 'application/json' ,
317
345
'x-logtrust-timestamp' : tstamp ,
318
346
'Authorization' : "jwt %s" % self .jwt
319
347
}
320
348
321
- raise DevoClientException ("Devo-Client|Client dont have key&secret or auth token/jwt" )
322
-
349
+ raise DevoClientException ("Devo-Client|Client dont have key&secret"
350
+ " or auth token/jwt" )
323
351
324
352
def _get_stream_headers (self , payload ):
325
353
"""
@@ -330,33 +358,34 @@ def _get_stream_headers(self, payload):
330
358
tstamp = str (int (time .time ()) * 1000 )
331
359
332
360
headers = ("POST /%s HTTP/1.1\r \n "
333
- "Host: %s\r \n "
334
- "Content-Type: application/json\r \n "
335
- "Content-Length: %s \r \n "
336
- "Cache-Control: no-cache\r \n "
337
- "x-logtrust-timestamp: %s\r \n "
361
+ "Host: %s\r \n "
362
+ "Content-Type: application/json\r \n "
363
+ "Content-Length: %s \r \n "
364
+ "Cache-Control: no-cache\r \n "
365
+ "x-logtrust-timestamp: %s\r \n "
338
366
% (self .query_url , self .url , str (len (payload )), tstamp ))
339
367
340
368
if self .key and self .secret :
341
369
return ("%s"
342
370
"x-logtrust-apikey: %s\r \n "
343
371
"x-logtrust-sign: %s\r \n "
344
372
"\r \n %s\r \n "
345
- % (headers , self .key , self ._get_sign (payload , tstamp ),
373
+ % (headers , self .key , self ._get_sign (payload , tstamp ),
346
374
payload )).encode ("utf-8" )
347
- elif self .token :
375
+ if self .token :
348
376
return ("%s"
349
377
"Authorization: Bearer %s\r \n "
350
378
"\r \n %s\r \n "
351
379
% (headers , self .token , payload )).encode ("utf-8" )
352
- elif self .jwt :
380
+ if self .jwt :
353
381
return ("%s"
354
382
"Authorization: jwt %s\r \n "
355
383
"\r \n %s\r \n "
356
384
% (headers , self .jwt , payload )).encode ("utf-8" )
357
385
358
386
self .buffer .error = "Client dont have key&secret or auth token/jwt"
359
- raise DevoClientException ("Devo-Client|Client dont have key&secret or auth token/jwt" )
387
+ raise DevoClientException ("Devo-Client|Client dont have key&secret"
388
+ " or auth token/jwt" )
360
389
361
390
def _get_sign (self , data , tstamp ):
362
391
"""
@@ -402,4 +431,3 @@ def close(self):
402
431
if self .socket is not None :
403
432
self .socket .close ()
404
433
self .socket = None
405
-
0 commit comments