39
39
pQuery = ql2_pb2 .Query .QueryType
40
40
41
41
42
- @asyncio .coroutine
43
- def _read_until (streamreader , delimiter ):
42
+ async def _read_until (streamreader , delimiter ):
44
43
"""Naive implementation of reading until a delimiter"""
45
44
buffer = bytearray ()
46
45
47
46
while True :
48
- c = yield from streamreader .read (1 )
47
+ c = await streamreader .read (1 )
49
48
if c == b"" :
50
49
break # EOF
51
50
buffer .append (c [0 ])
@@ -69,12 +68,12 @@ def reusable_waiter(loop, timeout):
69
68
else :
70
69
deadline = None
71
70
72
- def wait (future ):
71
+ async def wait (future ):
73
72
if deadline is not None :
74
73
new_timeout = max (deadline - loop .time (), 0 )
75
74
else :
76
75
new_timeout = None
77
- return (yield from asyncio .wait_for (future , new_timeout , loop = loop ))
76
+ return (await asyncio .wait_for (future , new_timeout ))
78
77
79
78
return wait
80
79
@@ -100,20 +99,18 @@ def __init__(self, *args, **kwargs):
100
99
def __aiter__ (self ):
101
100
return self
102
101
103
- @asyncio .coroutine
104
- def __anext__ (self ):
102
+ async def __anext__ (self ):
105
103
try :
106
- return (yield from self ._get_next (None ))
104
+ return (await self ._get_next (None ))
107
105
except ReqlCursorEmpty :
108
106
raise StopAsyncIteration
109
107
110
- @asyncio .coroutine
111
- def close (self ):
108
+ async def close (self ):
112
109
if self .error is None :
113
110
self .error = self ._empty_error ()
114
111
if self .conn .is_open ():
115
112
self .outstanding_requests += 1
116
- yield from self .conn ._parent ._stop (self )
113
+ await self .conn ._parent ._stop (self )
117
114
118
115
def _extend (self , res_buf ):
119
116
Cursor ._extend (self , res_buf )
@@ -122,16 +119,15 @@ def _extend(self, res_buf):
122
119
123
120
# Convenience function so users know when they've hit the end of the cursor
124
121
# without having to catch an exception
125
- @asyncio .coroutine
126
- def fetch_next (self , wait = True ):
122
+ async def fetch_next (self , wait = True ):
127
123
timeout = Cursor ._wait_to_timeout (wait )
128
124
waiter = reusable_waiter (self .conn ._io_loop , timeout )
129
125
while len (self .items ) == 0 and self .error is None :
130
126
self ._maybe_fetch_batch ()
131
127
if self .error is not None :
132
128
raise self .error
133
129
with translate_timeout_errors ():
134
- yield from waiter (asyncio .shield (self .new_response ))
130
+ await waiter (asyncio .shield (self .new_response ))
135
131
# If there is a (non-empty) error to be received, we return True, so the
136
132
# user will receive it on the next `next` call.
137
133
return len (self .items ) != 0 or not isinstance (self .error , RqlCursorEmpty )
@@ -141,15 +137,14 @@ def _empty_error(self):
141
137
# with mechanisms to return from a coroutine.
142
138
return RqlCursorEmpty ()
143
139
144
- @asyncio .coroutine
145
- def _get_next (self , timeout ):
140
+ async def _get_next (self , timeout ):
146
141
waiter = reusable_waiter (self .conn ._io_loop , timeout )
147
142
while len (self .items ) == 0 :
148
143
self ._maybe_fetch_batch ()
149
144
if self .error is not None :
150
145
raise self .error
151
146
with translate_timeout_errors ():
152
- yield from waiter (asyncio .shield (self .new_response ))
147
+ await waiter (asyncio .shield (self .new_response ))
153
148
return self .items .popleft ()
154
149
155
150
def _maybe_fetch_batch (self ):
@@ -185,8 +180,7 @@ def client_address(self):
185
180
if self .is_open ():
186
181
return self ._streamwriter .get_extra_info ("sockname" )[0 ]
187
182
188
- @asyncio .coroutine
189
- def connect (self , timeout ):
183
+ async def connect (self , timeout ):
190
184
try :
191
185
ssl_context = None
192
186
if len (self ._parent .ssl ) > 0 :
@@ -198,10 +192,9 @@ def connect(self, timeout):
198
192
ssl_context .check_hostname = True # redundant with match_hostname
199
193
ssl_context .load_verify_locations (self ._parent .ssl ["ca_certs" ])
200
194
201
- self ._streamreader , self ._streamwriter = yield from asyncio .open_connection (
195
+ self ._streamreader , self ._streamwriter = await asyncio .open_connection (
202
196
self ._parent .host ,
203
197
self ._parent .port ,
204
- loop = self ._io_loop ,
205
198
ssl = ssl_context ,
206
199
)
207
200
self ._streamwriter .get_extra_info ("socket" ).setsockopt (
@@ -226,26 +219,25 @@ def connect(self, timeout):
226
219
break
227
220
# This may happen in the `V1_0` protocol where we send two requests as
228
221
# an optimization, then need to read each separately
229
- if request is not "" :
222
+ if request != "" :
230
223
self ._streamwriter .write (request )
231
224
232
- response = yield from asyncio .wait_for (
225
+ response = await asyncio .wait_for (
233
226
_read_until (self ._streamreader , b"\0 " ),
234
227
timeout ,
235
- loop = self ._io_loop ,
236
228
)
237
229
response = response [:- 1 ]
238
230
except ReqlAuthError :
239
- yield from self .close ()
231
+ await self .close ()
240
232
raise
241
233
except ReqlTimeoutError as err :
242
- yield from self .close ()
234
+ await self .close ()
243
235
raise ReqlDriverError (
244
236
"Connection interrupted during handshake with %s:%s. Error: %s"
245
237
% (self ._parent .host , self ._parent .port , str (err ))
246
238
)
247
239
except Exception as err :
248
- yield from self .close ()
240
+ await self .close ()
249
241
raise ReqlDriverError (
250
242
"Could not connect to %s:%s. Error: %s"
251
243
% (self ._parent .host , self ._parent .port , str (err ))
@@ -259,8 +251,7 @@ def connect(self, timeout):
259
251
def is_open (self ):
260
252
return not (self ._closing or self ._streamreader .at_eof ())
261
253
262
- @asyncio .coroutine
263
- def close (self , noreply_wait = False , token = None , exception = None ):
254
+ async def close (self , noreply_wait = False , token = None , exception = None ):
264
255
self ._closing = True
265
256
if exception is not None :
266
257
err_message = "Connection is closed (%s)." % str (exception )
@@ -280,39 +271,37 @@ def close(self, noreply_wait=False, token=None, exception=None):
280
271
281
272
if noreply_wait :
282
273
noreply = Query (pQuery .NOREPLY_WAIT , token , None , None )
283
- yield from self .run_query (noreply , False )
274
+ await self .run_query (noreply , False )
284
275
285
276
self ._streamwriter .close ()
286
277
await self ._streamwriter .wait_closed ()
287
278
# We must not wait for the _reader_task if we got an exception, because that
288
279
# means that we were called from it. Waiting would lead to a deadlock.
289
280
if self ._reader_task and exception is None :
290
- yield from self ._reader_task
281
+ await self ._reader_task
291
282
292
283
return None
293
284
294
- @asyncio .coroutine
295
- def run_query (self , query , noreply ):
285
+ async def run_query (self , query , noreply ):
296
286
self ._streamwriter .write (query .serialize (self ._parent ._get_json_encoder (query )))
297
287
if noreply :
298
288
return None
299
289
300
290
response_future = asyncio .Future ()
301
291
self ._user_queries [query .token ] = (query , response_future )
302
- return (yield from response_future )
292
+ return (await response_future )
303
293
304
294
# The _reader coroutine runs in parallel, reading responses
305
295
# off of the socket and forwarding them to the appropriate Future or Cursor.
306
296
# This is shut down as a consequence of closing the stream, or an error in the
307
297
# socket/protocol from the server. Unexpected errors in this coroutine will
308
298
# close the ConnectionInstance and be passed to any open Futures or Cursors.
309
- @asyncio .coroutine
310
- def _reader (self ):
299
+ async def _reader (self ):
311
300
try :
312
301
while True :
313
- buf = yield from self ._streamreader .readexactly (12 )
302
+ buf = await self ._streamreader .readexactly (12 )
314
303
(token , length ,) = struct .unpack ("<qL" , buf )
315
- buf = yield from self ._streamreader .readexactly (length )
304
+ buf = await self ._streamreader .readexactly (length )
316
305
317
306
cursor = self ._cursor_cache .get (token )
318
307
if cursor is not None :
@@ -341,7 +330,7 @@ def _reader(self):
341
330
raise ReqlDriverError ("Unexpected response received." )
342
331
except Exception as ex :
343
332
if not self ._closing :
344
- yield from self .close (exception = ex )
333
+ await self .close (exception = ex )
345
334
346
335
347
336
class Connection (ConnectionBase ):
@@ -354,30 +343,25 @@ def __init__(self, *args, **kwargs):
354
343
"Could not convert port %s to an integer." % self .port
355
344
)
356
345
357
- @asyncio .coroutine
358
- def __aenter__ (self ):
346
+ async def __aenter__ (self ):
359
347
return self
360
348
361
- @asyncio .coroutine
362
- def __aexit__ (self , exception_type , exception_val , traceback ):
363
- yield from self .close (False )
349
+ async def __aexit__ (self , exception_type , exception_val , traceback ):
350
+ await self .close (False )
364
351
365
- @asyncio .coroutine
366
- def _stop (self , cursor ):
352
+ async def _stop (self , cursor ):
367
353
self .check_open ()
368
354
q = Query (pQuery .STOP , cursor .query .token , None , None )
369
- return (yield from self ._instance .run_query (q , True ))
355
+ return (await self ._instance .run_query (q , True ))
370
356
371
- @asyncio .coroutine
372
- def reconnect (self , noreply_wait = True , timeout = None ):
357
+ async def reconnect (self , noreply_wait = True , timeout = None ):
373
358
# We close before reconnect so reconnect doesn't try to close us
374
359
# and then fail to return the Future (this is a little awkward).
375
- yield from self .close (noreply_wait )
360
+ await self .close (noreply_wait )
376
361
self ._instance = self ._conn_type (self , ** self ._child_kwargs )
377
- return (yield from self ._instance .connect (timeout ))
362
+ return (await self ._instance .connect (timeout ))
378
363
379
- @asyncio .coroutine
380
- def close (self , noreply_wait = True ):
364
+ async def close (self , noreply_wait = True ):
381
365
if self ._instance is None :
382
366
return None
383
- return (yield from ConnectionBase .close (self , noreply_wait = noreply_wait ))
367
+ return (await ConnectionBase .close (self , noreply_wait = noreply_wait ))
0 commit comments