diff --git a/src/swsssdk/__init__.py b/src/swsssdk/__init__.py index d7cf391f550c..982286020342 100644 --- a/src/swsssdk/__init__.py +++ b/src/swsssdk/__init__.py @@ -9,7 +9,7 @@ try: from .dbconnector import SonicDBConfig, SonicV2Connector - from .configdb import ConfigDBConnector + from .configdb import ConfigDBConnector, ConfigDBPipeConnector from .sonic_db_dump_load import sonic_db_dump_load except (KeyError, ValueError): msg = "Failed to database connector objects -- incorrect database config schema." diff --git a/src/swsssdk/configdb.py b/src/swsssdk/configdb.py index 4ccbb0dca32e..ad99564bd925 100644 --- a/src/swsssdk/configdb.py +++ b/src/swsssdk/configdb.py @@ -108,12 +108,12 @@ def listen(self): (table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1) if self.handlers.has_key(table): client = self.get_redis_client(self.db_name) - data = self.__raw_to_typed(client.hgetall(key)) + data = self.raw_to_typed(client.hgetall(key)) self.__fire(table, row, data) except ValueError: pass #Ignore non table-formated redis entries - def __raw_to_typed(self, raw_data): + def raw_to_typed(self, raw_data): if raw_data == None: return None typed_data = {} @@ -141,7 +141,7 @@ def __raw_to_typed(self, raw_data): typed_data[key] = raw_data[raw_key] return typed_data - def __typed_to_raw(self, typed_data): + def typed_to_raw(self, typed_data): if typed_data == None: return None elif typed_data == {}: @@ -187,7 +187,7 @@ def set_entry(self, table, key, data): client.delete(_hash) else: original = self.get_entry(table, key) - client.hmset(_hash, self.__typed_to_raw(data)) + client.hmset(_hash, self.typed_to_raw(data)) for k in [ k for k in original.keys() if k not in data.keys() ]: if type(original[k]) == list: k = k + '@' @@ -208,7 +208,7 @@ def mod_entry(self, table, key, data): if data == None: client.delete(_hash) else: - client.hmset(_hash, self.__typed_to_raw(data)) + client.hmset(_hash, self.typed_to_raw(data)) def get_entry(self, table, key): """Read a table entry from config db. @@ -222,7 +222,7 @@ def get_entry(self, table, key): key = self.serialize_key(key) client = self.get_redis_client(self.db_name) _hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key) - return self.__raw_to_typed(client.hgetall(_hash)) + return self.raw_to_typed(client.hgetall(_hash)) def get_keys(self, table, split=True): """Read all keys of a table from config db. @@ -266,7 +266,7 @@ def get_table(self, table): data = {} for key in keys: try: - entry = self.__raw_to_typed(client.hgetall(key)) + entry = self.raw_to_typed(client.hgetall(key)) if entry != None: if PY3K: key = key.decode('utf-8') @@ -328,10 +328,135 @@ def get_config(self): key = key.decode('utf-8') try: (table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1) - entry = self.__raw_to_typed(client.hgetall(key)) + entry = self.raw_to_typed(client.hgetall(key)) if entry != None: data.setdefault(table_name, {})[self.deserialize_key(row)] = entry except ValueError: pass #Ignore non table-formated redis entries return data + +class ConfigDBPipeConnector(ConfigDBConnector): + REDIS_SCAN_BATCH_SIZE = 30 + + def __init__(self, **kwargs): + super(ConfigDBPipeConnector, self).__init__(**kwargs) + + def __delete_entries(self, client, pipe, pattern, cursor): + """Helper method to delete table entries from config db using Redis pipeline + with batch size of REDIS_SCAN_BATCH_SIZE. + The caller should call pipeline execute once ready + Args: + client: Redis client + pipe: Redis DB pipe + pattern: key pattern + cursor: position to start scanning from + + Returns: + cur: poition of next item to scan + """ + cur, keys = client.scan(cursor=cursor, match=pattern, count=self.REDIS_SCAN_BATCH_SIZE) + for key in keys: + pipe.delete(key) + + return cur + + def __delete_table(self, client, pipe, table): + """Helper method to delete table entries from config db using Redis pipeline. + The caller should call pipeline execute once ready + Args: + client: Redis client + pipe: Redis DB pipe + table: Table name. + """ + pattern = '{}{}*'.format(table.upper(), self.TABLE_NAME_SEPARATOR) + cur = self.__delete_entries(client, pipe, pattern, 0) + while cur != 0: + cur = self.__delete_entries(client, pipe, pattern, cur) + + def __mod_entry(self, pipe, table, key, data): + """Modify a table entry to config db. + Args: + table: Table name. + pipe: Redis DB pipe + table: Table name. + key: Key of table entry, or a tuple of keys if it is a multi-key table. + data: Table row data in a form of dictionary {'column_key': 'value', ...}. + Pass {} as data will create an entry with no column if not already existed. + Pass None as data will delete the entry. + """ + key = self.serialize_key(key) + _hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key) + if data == None: + pipe.delete(_hash) + else: + pipe.hmset(_hash, self.typed_to_raw(data)) + + def mod_config(self, data): + """Write multiple tables into config db. + Extra entries/fields in the db which are not in the data are kept. + Args: + data: config data in a dictionary form + { + 'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...}, + 'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...}, + ... + } + """ + client = self.get_redis_client(self.db_name) + pipe = client.pipeline() + for table_name in data: + table_data = data[table_name] + if table_data == None: + self.__delete_table(client, pipe, table_name) + continue + for key in table_data: + self.__mod_entry(pipe, table_name, key, table_data[key]) + pipe.execute() + client.bgsave() + + def _get_config(self, client, pipe, data, cursor): + """Read config data in batches of size REDIS_SCAN_BATCH_SIZE using Redis pipelines + Args: + client: Redis client + pipe: Redis DB pipe + data: config dictionary + cursor: position to start scanning from + + Returns: + cur: poition of next item to scan + """ + cur, keys = client.scan(cursor=cursor, match='*', count=self.REDIS_SCAN_BATCH_SIZE) + keys = [key.decode('utf-8') for key in keys if key != self.INIT_INDICATOR] + for key in keys: + pipe.hgetall(key) + records = pipe.execute() + + for index, key in enumerate(keys): + (table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1) + entry = self.raw_to_typed(records[index]) + if entry is not None: + data.setdefault(table_name, {})[self.deserialize_key(row)] = entry + + return cur + + def get_config(self): + """Read all config data. + Returns: + Config data in a dictionary form of + { + 'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...}, + 'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...}, + ... + } + """ + client = self.get_redis_client(self.db_name) + pipe = client.pipeline() + data = {} + + cur = self._get_config(client, pipe, data, 0) + while cur != 0: + cur = self._get_config(client, pipe, data, cur) + + return data +