Skip to content

Commit

Permalink
perf: cache shard and minishard indices (#298)
Browse files Browse the repository at this point in the history
* perf: cache shard and minishard indices

* fix: don't try to pull down 0 bytes from a file

* refactor: replace lru-dict with pylru

This will be sure to be cross-platform compatible without
requiring a C compiler.
  • Loading branch information
william-silversmith authored Dec 3, 2019
1 parent 149d965 commit 5d3aacf
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 17 deletions.
74 changes: 57 additions & 17 deletions cloudvolume/datasource/precomputed/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from collections import namedtuple, defaultdict
import copy
import json
import struct

import numpy as np
import struct
import pylru
from tqdm import tqdm

from . import mmh3
Expand Down Expand Up @@ -171,19 +172,30 @@ def validate(self):
if self.data_encoding not in ('raw', 'gzip'):
raise SpecViolation("data_encoding only supports values 'raw' or 'gzip'.")

def __str__(self):
return "ShardingSpecification::" + str(self.to_dict())

class ShardReader(object):
def __init__(self, meta, cache, spec):
def __init__(
self, meta, cache, spec,
shard_index_cache_size=512,
minishard_index_cache_size=128,
):
self.meta = meta
self.cache = cache
self.spec = spec

def get_index(self, label, path=""):
shard_loc = self.spec.compute_shard_location(label)
self.shard_index_cache = pylru.lrucache(shard_index_cache_size)
self.minishard_index_cache = pylru.lrucache(minishard_index_cache_size)

filename = str(shard_loc.shard_number)
def get_index(self, label, shard_number, path=""):
filename = str(shard_number)
index_path = self.meta.join(path, filename + '.shard')
alias_path = self.meta.join(path, filename + '.index')

if shard_number in self.shard_index_cache:
return self.shard_index_cache[shard_number]

index_length = self.spec.index_length()

binary = self.cache.download_single_as(
Expand All @@ -199,28 +211,30 @@ def get_index(self, label, path=""):
))

index = np.frombuffer(binary, dtype=np.uint64)
return index.reshape( (index.size // 2, 2), order='C' )

def get_data(self, label, path=""):
shard_loc = self.spec.compute_shard_location(label)

if self.cache.enabled:
cached = self.cache.get_single(self.meta.join(path, str(label)), progress=False)
if cached is not None:
return cached
index = index.reshape( (index.size // 2, 2), order='C' )
self.shard_index_cache[shard_number] = index
return index

def get_minishard_index(self, index, shard_no, minishard_no, path=""):
index_offset = self.spec.index_length()
bytes_start, bytes_end = index[minishard_no]

# most typically: [0,0] for an incomplete shard
if bytes_start == bytes_end:
return None

index = self.get_index(label, path)
bytes_start, bytes_end = index[shard_loc.minishard_number]
bytes_start += index_offset
bytes_end += index_offset
bytes_start, bytes_end = int(bytes_start), int(bytes_end)

filename = shard_loc.shard_number + ".shard"
filename = shard_no + ".shard"

full_path = self.meta.join(self.meta.cloudpath, path)

cache_key = (filename, bytes_start, bytes_end)
if cache_key in self.minishard_index_cache:
return self.minishard_index_cache[cache_key]

with SimpleStorage(full_path) as stor:
minishard_index = stor.get_file(filename, start=bytes_start, end=bytes_end)

Expand All @@ -236,15 +250,41 @@ def get_data(self, label, path=""):
minishard_index[i, 0] += minishard_index[i-1, 0]
minishard_index[i, 1] += minishard_index[i-1, 1] + minishard_index[i-1, 2]

self.minishard_index_cache[cache_key] = minishard_index
return minishard_index

def get_data(self, label, path=""):
shard_loc = self.spec.compute_shard_location(label)

if self.cache.enabled:
cached = self.cache.get_single(self.meta.join(path, str(label)), progress=False)
if cached is not None:
return cached

index = self.get_index(label, shard_loc.shard_number, path)

minishard_index = self.get_minishard_index(
index, shard_loc.shard_number,
shard_loc.minishard_number, path
)

if minishard_index is None:
return None

idx = np.where(minishard_index[:,0] == label)[0]
if len(idx) == 0:
return None
else:
idx = idx[0]

_, offset, size = minishard_index[idx,:]

index_offset = self.spec.index_length()
offset = int(offset + index_offset)

full_path = self.meta.join(self.meta.cloudpath, path)
filename = shard_loc.shard_number + ".shard"

with SimpleStorage(full_path) as stor:
binary = stor.get_file(filename, start=offset, end=int(offset + size))

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ numpy>=1.13.3
python-jsonschema-objects>=0.3.3
Pillow>=4.2.1
protobuf>=3.3.0
pylru
requests>=2.22.0
six>=1.10.0
tenacity>=4.10.0
Expand Down

0 comments on commit 5d3aacf

Please sign in to comment.