6
6
import json
7
7
import requests
8
8
from devo .common import default_from , default_to
9
- from .processors import processors , proc_json , proc_default , \
9
+ from .processors import processors , proc_json , \
10
10
json_compact_simple_names , proc_json_compact_simple_to_jobj
11
11
12
12
CLIENT_DEFAULT_APP_NAME = 'python-sdk-app'
@@ -41,7 +41,10 @@ def raise_exception(error):
41
41
if isinstance (error , str ):
42
42
raise DevoClientException (proc_json ()(error ))
43
43
elif isinstance (error , DevoClientException ):
44
- raise DevoClientException (error .args [0 ])
44
+ if isinstance (error .args [0 ], str ):
45
+ raise DevoClientException (proc_json ()(error .args [0 ]))
46
+ else :
47
+ raise DevoClientException (error .args [0 ])
45
48
elif isinstance (error , dict ):
46
49
raise DevoClientException (error )
47
50
else :
@@ -56,84 +59,131 @@ def _format_error(error):
56
59
'"object": "%s"}' % str (error ).replace ("\" " , "\\ \" " )
57
60
58
61
59
- class Options :
62
+ class ClientConfig :
63
+ """
64
+ Main class for configuration of Client class. With diferent configurations
65
+ """
60
66
def __init__ (self , processor = DEFAULT , response = "json/simple/compact" ,
61
- destination = None , stream = True ):
62
-
67
+ destination = None , stream = True , pragmas = None ):
68
+ """
69
+ Initialize the API with this params, all optionals
70
+ :param processor: processor for response, default is None
71
+ :param response: format of response
72
+ :param destination: Destination options, see Documentation for more info
73
+ :param stream: Stream queries or not
74
+ :param pragmas: pragmas por query: user, app_name and comment
75
+ """
63
76
self .stream = stream
64
77
self .response = response
65
78
self .destination = destination
66
79
self .proc = processor
67
80
self .processor = processors ()[self .proc ]()
81
+ if pragmas :
82
+ self .pragmas = pragmas
83
+ if "user" not in self .pragmas .keys ():
84
+ self .pragmas ['user' ] = CLIENT_DEFAULT_USER
85
+ if "app_name" not in self .pragmas .keys ():
86
+ self .pragmas ['app_name' ] = CLIENT_DEFAULT_APP_NAME
87
+ else :
88
+ self .pragmas = {"user" : CLIENT_DEFAULT_USER ,
89
+ "app_name" : CLIENT_DEFAULT_APP_NAME }
68
90
69
91
def set_processor (self , processor = None ):
92
+ """
93
+ Set processor of response
94
+ :param processor: lambda function
95
+ :return:
96
+ """
70
97
if processor :
71
98
self .proc = processor
72
99
self .processor = processors ()[self .proc ]()
73
100
return True
74
101
102
+ def set_user (self , user = CLIENT_DEFAULT_USER ):
103
+ """
104
+ Set user to put in pragma when make the query
105
+ :param user: username string
106
+ :return:
107
+ """
108
+ self .pragmas ['user' ] = user
109
+ return True
110
+
111
+ def set_app_name (self , app_name = CLIENT_DEFAULT_APP_NAME ):
112
+
113
+ """
114
+ Set app_name to put in pragma when make the query
115
+ :param app_name: app_name string
116
+ :return:
117
+ """
118
+ self .pragmas ['app_name' ] = app_name
119
+ return True
120
+
75
121
76
122
class Client :
77
123
"""
78
- The Devo SERach REst Api main class
124
+ The Devo seach rest api main class
79
125
"""
80
- def __init__ (self , address = None , auth = None , options = None ,
81
- retries = 3 , timeout = 30 ):
126
+ def __init__ (self , address = None , auth = None , config = None ,
127
+ retries = None , timeout = None ):
82
128
"""
83
129
Initialize the API with this params, all optionals
84
130
:param address: endpoint
85
131
:param auth: object with auth params (key, secret, token, jwt)
132
+ :param options: options class for Client and queries
86
133
:param retries: number of retries for a query
87
134
:param timeout: timeout of socket
88
135
"""
89
- self .auth = auth
136
+ if config is None :
137
+ self .config = ClientConfig ()
138
+ elif isinstance (config , ClientConfig ):
139
+ self .config = config
140
+ else :
141
+ address = address if address else config .get ("address" , None )
142
+ auth = auth if auth else config .get ("auth" ,
143
+ {"key" : config .get ("key" , None ),
144
+ "secret" : config .get ("secret" ,
145
+ None ),
146
+ "jwt" : config .get ("jwt" , None ),
147
+ "token" : config .get ("token" ,
148
+ None )})
149
+
150
+ retries = retries if retries else config .get ("retries" , 3 )
151
+ timeout = timeout if retries else config .get ("retries" , 30 )
152
+ self .config = self ._from_dict (config )
153
+
154
+ retries = retries if retries else 3
155
+ timeout = timeout if timeout else 30
90
156
157
+ self .auth = auth
91
158
if not address :
92
159
raise raise_exception (
93
160
_format_error (ERROR_MSGS ['no_endpoint' ])
94
161
)
95
162
96
163
self .address = self .__get_address_parts (address )
97
164
98
- self .pragmas = {"user" : CLIENT_DEFAULT_USER ,
99
- "app_name" : CLIENT_DEFAULT_APP_NAME }
100
-
101
- self .opts = Options () if not options else options
102
-
103
165
self .retries = retries
104
166
self .timeout = timeout
105
167
self .verify = True
106
168
107
- def set_user (self , user = CLIENT_DEFAULT_USER ):
108
- self .pragmas ['user' ] = user
109
- return True
110
-
111
- def set_app_name (self , app_name = CLIENT_DEFAULT_APP_NAME ):
112
- self .pragmas ['app_name' ] = app_name
113
- return True
114
-
115
169
@staticmethod
116
- def from_dict (config ):
170
+ def _from_dict (config ):
117
171
"""
118
172
Create Client object from config file values
119
173
:param config: lt-common config standar
120
174
"""
121
- options = Options (processor = config .get ("processor" , DEFAULT ),
122
- response = config .get ("response" , "json/simple/compact" )
123
- ,destination = config .get ("destination" , None ),
124
- stream = config .get ("stream" , True ))
175
+ return ClientConfig (processor = config .get ("processor" , DEFAULT ),
176
+ response = config .get ("response" ,
177
+ "json/simple/compact" ),
178
+ destination = config .get ("destination" , None ),
179
+ stream = config .get ("stream" , True ))
125
180
126
- if "auth" in config .keys ():
127
- return Client (address = config .get ("address" ),
128
- auth = config .get ("auth" ),
129
- options = options )
130
- else :
131
- return Client (address = config .get ("address" ),
132
- auth = {"key" : config .get ("key" , None ),
133
- "secret" : config .get ("secret" , None ),
134
- "jwt" : config .get ("jwt" , None ),
135
- "token" : config .get ("token" , None )},
136
- options = options )
181
+ def verify_certificates (self , option = True ):
182
+ """
183
+ Set if verify or not the TSL certificates in https calls
184
+ :param option: (bool) True or False
185
+ """
186
+ self .verify = option
137
187
138
188
def __get_address_parts (self , address ):
139
189
"""
@@ -187,18 +237,18 @@ def _is_correct_response(line):
187
237
except ValueError :
188
238
return False
189
239
190
- def options (self , processor = None , response = None , stream = None ,
191
- destination = None ):
240
+ def configurate (self , processor = None , response = None , stream = None ,
241
+ destination = None ):
192
242
193
- self .opts .set_processor (processor )
243
+ self .config .set_processor (processor )
194
244
if response :
195
- self .opts .response = response
245
+ self .config .response = response
196
246
197
247
if stream :
198
- self .opts .stream = stream
248
+ self .config .stream = stream
199
249
200
250
if destination :
201
- self .opts .destination = destination
251
+ self .config .destination = destination
202
252
203
253
def query (self , query = None , query_id = None , dates = None ,
204
254
limit = None , offset = None , comment = None ):
@@ -210,24 +260,24 @@ def query(self, query=None, query_id=None, dates=None,
210
260
:param limit: Max number of rows
211
261
:param offset: start of needle for query
212
262
:param comment: comment for query
213
- :return: Result of the query (dict) or Buffer object
263
+ :return: Result of the query (dict) or Iterator object
214
264
"""
215
265
dates = self ._generate_dates (dates )
216
266
217
267
if query is not None :
218
268
query += self ._generate_pragmas (comment = comment )
219
269
220
270
query_opts = {'limit' : limit ,
221
- 'response' : self .opts .response ,
271
+ 'response' : self .config .response ,
222
272
'offset' : offset ,
223
- 'destination' : self .opts .destination
273
+ 'destination' : self .config .destination
224
274
}
225
275
226
- if not self .stream_available (self .opts .response ) \
227
- or not self .opts .stream :
276
+ if not self .stream_available (self .config .response ) \
277
+ or not self .config .stream :
228
278
if not dates ['to' ]:
229
279
dates ['to' ] = "now()"
230
- self .opts .stream = False
280
+ self .config .stream = False
231
281
232
282
return self ._call (
233
283
self ._get_payload (query , query_id , dates , query_opts )
@@ -240,13 +290,13 @@ def _call(self, payload):
240
290
:param stream: boolean, indicate if one call is a streaming call
241
291
:return: Response from API
242
292
"""
243
- if self .opts .stream :
293
+ if self .config .stream :
244
294
return self ._return_stream (payload )
245
295
response = self ._make_request (payload )
246
296
247
297
if isinstance (response , str ):
248
298
return proc_json ()(response )
249
- return self .opts .processor (response .text )
299
+ return self .config .processor (response .text )
250
300
251
301
def _return_stream (self , payload ):
252
302
"""If its a stream call, return yield lines
@@ -262,15 +312,15 @@ def _return_stream(self, payload):
262
312
raise_exception (response )
263
313
264
314
if self ._is_correct_response (first ):
265
- if self .opts .proc == SIMPLECOMPACT_TO_OBJ :
315
+ if self .config .proc == SIMPLECOMPACT_TO_OBJ :
266
316
aux = json_compact_simple_names (proc_json ()(first )['m' ])
267
- self .opts .processor = proc_json_compact_simple_to_jobj (aux )
268
- elif self .opts .proc == SIMPLECOMPACT_TO_ARRAY :
317
+ self .config .processor = proc_json_compact_simple_to_jobj (aux )
318
+ elif self .config .proc == SIMPLECOMPACT_TO_ARRAY :
269
319
pass
270
320
else :
271
- yield self .opts .processor (first )
321
+ yield self .config .processor (first )
272
322
for line in response :
273
- yield self .opts .processor (line )
323
+ yield self .config .processor (line )
274
324
else :
275
325
yield proc_json ()(first )
276
326
@@ -289,11 +339,11 @@ def _make_request(self, payload):
289
339
headers = self ._get_headers (payload ),
290
340
verify = self .verify ,
291
341
timeout = self .timeout ,
292
- stream = self .opts .stream )
342
+ stream = self .config .stream )
293
343
if response .status_code != 200 :
294
344
raise DevoClientException (response )
295
345
296
- if self .opts .stream :
346
+ if self .config .stream :
297
347
return response .iter_lines ()
298
348
return response
299
349
except requests .exceptions .ConnectionError as error :
@@ -402,7 +452,8 @@ def _generate_pragmas(self, comment=None):
402
452
"""
403
453
str_pragmas = ' pragma comment.id:"{}" ' \
404
454
'pragma comment.user:"{}"' \
405
- .format (self .pragmas ['app_name' ], self .pragmas ['user' ])
455
+ .format (self .config .pragmas ['app_name' ],
456
+ self .config .pragmas ['user' ])
406
457
407
458
if comment :
408
459
return str_pragmas + ' pragma comment.free:"{}"' .format (comment )
@@ -460,7 +511,7 @@ def _call_jobs(self, address):
460
511
response = None
461
512
try :
462
513
response = requests .get ("https://{}" .format (address ),
463
- headers = self ._get_jobs_headers ( ),
514
+ headers = self ._get_headers ( "" ),
464
515
verify = self .verify ,
465
516
timeout = self .timeout )
466
517
except ConnectionError as error :
@@ -478,10 +529,3 @@ def _call_jobs(self, address):
478
529
tries += 1
479
530
time .sleep (self .timeout )
480
531
return {}
481
-
482
- def _get_jobs_headers (self ):
483
- tstamp = str (int (time .time ()) * 1000 )
484
- return {'x-logtrust-timestamp' : tstamp ,
485
- 'x-logtrust-apikey' : self .auth .get ("key" ),
486
- 'x-logtrust-sign' : self ._get_sign ("" , tstamp )
487
- }
0 commit comments