-
Notifications
You must be signed in to change notification settings - Fork 511
/
askar.py
411 lines (345 loc) · 12.6 KB
/
askar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
"""Aries-Askar implementation of BaseStorage interface."""
from typing import Mapping, Optional, Sequence
from aries_askar import AskarError, AskarErrorCode, Session
from ..askar.profile import AskarProfile, AskarProfileSession
from .base import (
DEFAULT_PAGE_SIZE,
BaseStorage,
BaseStorageSearch,
BaseStorageSearchSession,
validate_record,
)
from .error import (
StorageDuplicateError,
StorageError,
StorageNotFoundError,
StorageSearchError,
)
from .record import StorageRecord
class AskarStorage(BaseStorage):
"""Aries-Askar Non-Secrets interface."""
def __init__(self, session: AskarProfileSession):
"""Initialize an `AskarStorage` instance.
Args:
session: The Askar profile session to use
"""
self._session = session
@property
def session(self) -> Session:
"""Accessor for Askar profile session instance."""
return self._session
async def add_record(self, record: StorageRecord):
"""Add a new record to the store.
Args:
record: `StorageRecord` to be stored
"""
validate_record(record)
try:
await self._session.handle.insert(
record.type, record.id, record.value, record.tags
)
except AskarError as err:
if err.code == AskarErrorCode.DUPLICATE:
raise StorageDuplicateError(
f"Duplicate record: {record.type}/{record.id}"
) from None
raise StorageError("Error when adding storage record") from err
async def get_record(
self, record_type: str, record_id: str, options: Optional[Mapping] = None
) -> StorageRecord:
"""Fetch a record from the store by type and ID.
Args:
record_type: The record type
record_id: The record id
options: A dictionary of backend-specific options
Returns:
A `StorageRecord` instance
Raises:
StorageError: If the record is not provided
StorageError: If the record ID not provided
StorageNotFoundError: If the record is not found
StorageError: If record not found
"""
if not record_type:
raise StorageError("Record type not provided")
if not record_id:
raise StorageError("Record ID not provided")
for_update = bool(options and options.get("forUpdate"))
try:
item = await self._session.handle.fetch(
record_type, record_id, for_update=for_update
)
except AskarError as err:
raise StorageError("Error when fetching storage record") from err
if not item:
raise StorageNotFoundError(f"Record not found: {record_type}/{record_id}")
return StorageRecord(
type=item.category,
id=item.name,
value=None if item.value is None else item.value.decode("utf-8"),
tags=item.tags or {},
)
async def update_record(self, record: StorageRecord, value: str, tags: Mapping):
"""Update an existing stored record's value.
Args:
record: `StorageRecord` to update
value: The new value
tags: The new tags
Raises:
StorageNotFoundError: If record not found
StorageError: If a libindy error occurs
"""
validate_record(record)
try:
await self._session.handle.replace(record.type, record.id, value, tags)
except AskarError as err:
if err.code == AskarErrorCode.NOT_FOUND:
raise StorageNotFoundError("Record not found") from None
raise StorageError("Error when updating storage record value") from err
async def delete_record(self, record: StorageRecord):
"""Delete a record.
Args:
record: `StorageRecord` to delete
Raises:
StorageNotFoundError: If record not found
StorageError: If a libindy error occurs
"""
validate_record(record, delete=True)
try:
await self._session.handle.remove(record.type, record.id)
except AskarError as err:
if err.code == AskarErrorCode.NOT_FOUND:
raise StorageNotFoundError(
f"Record not found: {record.type}/{record.id}"
) from None
else:
raise StorageError("Error when removing storage record") from err
async def find_record(
self, type_filter: str, tag_query: Mapping, options: Optional[Mapping] = None
) -> StorageRecord:
"""Find a record using a unique tag filter.
Args:
type_filter: Filter string
tag_query: Tags to query
options: Dictionary of backend-specific options
"""
for_update = bool(options and options.get("forUpdate"))
try:
results = await self._session.handle.fetch_all(
type_filter, tag_query, limit=2, for_update=for_update
)
except AskarError as err:
raise StorageError("Error when finding storage record") from err
if len(results) > 1:
raise StorageDuplicateError("Duplicate records found")
if not results:
raise StorageNotFoundError("Record not found")
row = results[0]
return StorageRecord(
type=row.category,
id=row.name,
value=None if row.value is None else row.value.decode("utf-8"),
tags=row.tags,
)
async def find_paginated_records(
self,
type_filter: str,
tag_query: Optional[Mapping] = None,
limit: int = DEFAULT_PAGE_SIZE,
offset: int = 0,
) -> Sequence[StorageRecord]:
"""Retrieve a page of records matching a particular type filter and tag query.
Args:
type_filter: The type of records to filter by
tag_query: An optional dictionary of tag filter clauses
limit: The maximum number of records to retrieve
offset: The offset to start retrieving records from
"""
results = []
async for row in self._session.store.scan(
category=type_filter,
tag_filter=tag_query,
limit=limit,
offset=offset,
profile=self._session.profile.settings.get("wallet.askar_profile"),
):
results += (
StorageRecord(
type=row.category,
id=row.name,
value=None if row.value is None else row.value.decode("utf-8"),
tags=row.tags,
),
)
return results
async def find_all_records(
self,
type_filter: str,
tag_query: Optional[Mapping] = None,
options: Optional[Mapping] = None,
):
"""Retrieve all records matching a particular type filter and tag query."""
for_update = bool(options and options.get("forUpdate"))
results = []
for row in await self._session.handle.fetch_all(
type_filter, tag_query, for_update=for_update
):
results.append(
StorageRecord(
type=row.category,
id=row.name,
value=None if row.value is None else row.value.decode("utf-8"),
tags=row.tags,
)
)
return results
async def delete_all_records(
self,
type_filter: str,
tag_query: Optional[Mapping] = None,
):
"""Remove all records matching a particular type filter and tag query."""
await self._session.handle.remove_all(type_filter, tag_query)
class AskarStorageSearch(BaseStorageSearch):
"""Active instance of an Askar storage search query."""
def __init__(self, profile: AskarProfile):
"""Initialize an `AskarStorageSearch` instance.
Args:
profile: The Askar profile instance to use
"""
self._profile = profile
def search_records(
self,
type_filter: str,
tag_query: Optional[Mapping] = None,
page_size: Optional[int] = None,
options: Optional[Mapping] = None,
) -> "AskarStorageSearchSession":
"""Search stored records.
Args:
type_filter: Filter string
tag_query: Tags to query
page_size: Page size
options: Dictionary of backend-specific options
Returns:
An instance of `AskarStorageSearchSession`
"""
return AskarStorageSearchSession(
self._profile, type_filter, tag_query, page_size, options
)
class AskarStorageSearchSession(BaseStorageSearchSession):
"""Represent an active stored records search."""
def __init__(
self,
profile: AskarProfile,
type_filter: str,
tag_query: Mapping,
page_size: Optional[int] = None,
options: Optional[Mapping] = None,
):
"""Initialize a `AskarStorageSearchSession` instance.
Args:
profile: Askar profile instance to search
type_filter: Filter string
tag_query: Tags to search
page_size: Size of page to return
options: Dictionary of backend-specific options
"""
self.tag_query = tag_query
self.type_filter = type_filter
self.page_size = page_size or DEFAULT_PAGE_SIZE
self._done = False
self._profile = profile
self._scan = None
@property
def opened(self) -> bool:
"""Accessor for open state.
Returns:
True if opened, else False
"""
return self._scan is not None
@property
def handle(self):
"""Accessor for search handle.
Returns:
The handle
"""
return self._scan
def __aiter__(self):
"""Async iterator magic method."""
return self
async def __anext__(self):
"""Async iterator magic method."""
if self._done:
raise StorageSearchError("Search query is complete")
await self._open()
try:
row = await self._scan.__anext__()
except AskarError as err:
raise StorageSearchError("Error when fetching search results") from err
except StopAsyncIteration:
self._done = True
self._scan = None
raise
return StorageRecord(
type=row.category,
id=row.name,
value=None if row.value is None else row.value.decode("utf-8"),
tags=row.tags,
)
async def fetch(
self, max_count: Optional[int] = None, offset: Optional[int] = None
) -> Sequence[StorageRecord]:
"""Fetch the next list of results from the store.
Args:
max_count: Max number of records to return
offset: The offset to start retrieving records from
Returns:
A list of `StorageRecord` instances
Raises:
StorageSearchError: If the search query has not been opened
"""
if self._done:
raise StorageSearchError("Search query is complete")
limit = max_count or self.page_size
await self._open(limit=limit, offset=offset)
count = 0
ret = []
while count < limit:
try:
row = await self._scan.__anext__()
except AskarError as err:
raise StorageSearchError("Error when fetching search results") from err
except StopAsyncIteration:
break
ret.append(
StorageRecord(
type=row.category,
id=row.name,
value=None if row.value is None else row.value.decode("utf-8"),
tags=row.tags,
)
)
count += 1
if not ret:
self._done = True
self._scan = None
return ret
async def _open(self, offset: Optional[int] = None, limit: Optional[int] = None):
"""Start the search query."""
if self._scan:
return
try:
self._scan = self._profile.store.scan(
category=self.type_filter,
tag_filter=self.tag_query,
offset=offset,
limit=limit,
profile=self._profile.settings.get("wallet.askar_profile"),
)
except AskarError as err:
raise StorageSearchError("Error opening search query") from err
async def close(self):
"""Dispose of the search query."""
self._done = True
self._scan = None