From fabc7f617dcefbdcbf6a80a9e6e5741ed0d55d20 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Wed, 10 Jul 2024 16:45:22 -0700 Subject: [PATCH 01/10] [ENH] upserts should also trigger persisting of local HNSW --- chromadb/segment/impl/vector/local_hnsw.py | 2 + .../impl/vector/local_persistent_hnsw.py | 8 ++++ chromadb/test/property/test_persist.py | 41 +++++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/chromadb/segment/impl/vector/local_hnsw.py b/chromadb/segment/impl/vector/local_hnsw.py index 7358270c7d6..232ba33e39c 100644 --- a/chromadb/segment/impl/vector/local_hnsw.py +++ b/chromadb/segment/impl/vector/local_hnsw.py @@ -66,6 +66,7 @@ def __init__(self, system: System, segment: Segment): self._index = None self._dimensionality = None self._total_elements_added = 0 + self._total_elements_updated = 0 self._max_seq_id = self._consumer.min_seqid() self._id_to_seq_id = {} @@ -275,6 +276,7 @@ def _apply_batch(self, batch: Batch) -> None: # If that succeeds, update the total count self._total_elements_added += batch.add_count + self._total_elements_updated += batch.update_count # If that succeeds, finally the seq ID self._max_seq_id = batch.max_seq_id diff --git a/chromadb/segment/impl/vector/local_persistent_hnsw.py b/chromadb/segment/impl/vector/local_persistent_hnsw.py index 9dadc906e98..63ca531ae9e 100644 --- a/chromadb/segment/impl/vector/local_persistent_hnsw.py +++ b/chromadb/segment/impl/vector/local_persistent_hnsw.py @@ -41,6 +41,7 @@ class PersistentData: dimensionality: Optional[int] total_elements_added: int + total_elements_updated: int max_seq_id: SeqId id_to_label: Dict[str, int] @@ -51,6 +52,7 @@ def __init__( self, dimensionality: Optional[int], total_elements_added: int, + total_elements_updated: int, max_seq_id: int, id_to_label: Dict[str, int], label_to_id: Dict[int, str], @@ -58,6 +60,7 @@ def __init__( ): self.dimensionality = dimensionality self.total_elements_added = total_elements_added + self.total_elements_updated = total_elements_updated self.max_seq_id = max_seq_id self.id_to_label = id_to_label self.label_to_id = label_to_id @@ -121,6 +124,7 @@ def __init__(self, system: System, segment: Segment): self._persist_data = PersistentData( self._dimensionality, self._total_elements_added, + self._total_elements_updated, self._max_seq_id, self._id_to_label, self._label_to_id, @@ -195,6 +199,7 @@ def _persist(self) -> None: # Persist the metadata self._persist_data.dimensionality = self._dimensionality self._persist_data.total_elements_added = self._total_elements_added + self._persist_data.total_elements_updated = self._total_elements_updated self._persist_data.max_seq_id = self._max_seq_id # TODO: This should really be stored in sqlite, the index itself, or a better @@ -215,6 +220,9 @@ def _apply_batch(self, batch: Batch) -> None: if ( self._total_elements_added - self._persist_data.total_elements_added >= self._sync_threshold + ) or ( + self._total_elements_updated - self._persist_data.total_elements_updated + >= self._sync_threshold ): self._persist() diff --git a/chromadb/test/property/test_persist.py b/chromadb/test/property/test_persist.py index f91b0ed78fd..6ee1ef84ceb 100644 --- a/chromadb/test/property/test_persist.py +++ b/chromadb/test/property/test_persist.py @@ -9,6 +9,7 @@ import chromadb from chromadb.api import ClientAPI, ServerAPI from chromadb.config import Settings, System +from chromadb.segment import SegmentManager, VectorReader import chromadb.test.property.strategies as strategies import chromadb.test.property.invariants as invariants from chromadb.test.property.test_embeddings import ( @@ -122,6 +123,46 @@ def test_persist( del system_2 +def test_sync_threshold(settings: Settings) -> None: + system = System(settings) + system.start() + client = ClientCreator.from_system(system) + + collection = client.create_collection( + name="test", metadata={"hnsw:batch_size": 3, "hnsw:sync_threshold": 3} + ) + + manager = system.instance(SegmentManager) + segment = manager.get_segment(collection.id, VectorReader) + + def get_index_last_modified_at() -> float: + try: + return os.path.getmtime(segment._get_metadata_file()) # type: ignore[attr-defined] + except FileNotFoundError: + return -1 + + last_modified_at = get_index_last_modified_at() + + collection.add(ids=["1", "2"], embeddings=[[1.0], [2.0]]) + + # Should not have yet persisted + assert get_index_last_modified_at() == last_modified_at + last_modified_at = get_index_last_modified_at() + + # Now there's 3 additions, and the sync threshold is 3... + collection.add(ids=["3"], embeddings=[[3.0]]) + + # ...so it should have persisted + assert get_index_last_modified_at() > last_modified_at + last_modified_at = get_index_last_modified_at() + + # The same thing should happen with upserts + collection.upsert(ids=["1", "2", "3"], embeddings=[[1.0], [2.0], [3.0]]) + + # Should have persisted + assert get_index_last_modified_at() > last_modified_at + + def load_and_check( settings: Settings, collection_name: str, From 36f421877e72470c7476c7726ecb864527a3a48d Mon Sep 17 00:00:00 2001 From: Max Isom Date: Thu, 11 Jul 2024 10:16:56 -0700 Subject: [PATCH 02/10] Take mixed usage into account --- chromadb/segment/impl/vector/local_persistent_hnsw.py | 11 ++++++++--- chromadb/test/property/test_persist.py | 8 ++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/chromadb/segment/impl/vector/local_persistent_hnsw.py b/chromadb/segment/impl/vector/local_persistent_hnsw.py index 63ca531ae9e..6e190b926f1 100644 --- a/chromadb/segment/impl/vector/local_persistent_hnsw.py +++ b/chromadb/segment/impl/vector/local_persistent_hnsw.py @@ -217,11 +217,16 @@ def _persist(self) -> None: @override def _apply_batch(self, batch: Batch) -> None: super()._apply_batch(batch) - if ( + num_elements_added_since_last_persist = ( self._total_elements_added - self._persist_data.total_elements_added - >= self._sync_threshold - ) or ( + ) + num_elements_updated_since_last_persist = ( self._total_elements_updated - self._persist_data.total_elements_updated + ) + + if ( + num_elements_added_since_last_persist + + num_elements_updated_since_last_persist >= self._sync_threshold ): self._persist() diff --git a/chromadb/test/property/test_persist.py b/chromadb/test/property/test_persist.py index 6ee1ef84ceb..6410cf2f0a4 100644 --- a/chromadb/test/property/test_persist.py +++ b/chromadb/test/property/test_persist.py @@ -161,6 +161,14 @@ def get_index_last_modified_at() -> float: # Should have persisted assert get_index_last_modified_at() > last_modified_at + last_modified_at = get_index_last_modified_at() + + # Mixed usage should also trigger persistence + collection.add(ids=["4"], embeddings=[[4.0]]) + collection.upsert(ids=["1", "2"], embeddings=[[1.0], [2.0]]) + + # Should have persisted + assert get_index_last_modified_at() > last_modified_at def load_and_check( From 2304597032fa796d19e69b340feb021d7d3054f9 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Thu, 11 Jul 2024 14:59:02 -0700 Subject: [PATCH 03/10] Also persist for invalid operations --- chromadb/segment/impl/vector/local_hnsw.py | 1 + .../segment/impl/vector/local_persistent_hnsw.py | 13 ++++++++++++- chromadb/test/property/test_persist.py | 9 +++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/chromadb/segment/impl/vector/local_hnsw.py b/chromadb/segment/impl/vector/local_hnsw.py index 232ba33e39c..86a19cad05a 100644 --- a/chromadb/segment/impl/vector/local_hnsw.py +++ b/chromadb/segment/impl/vector/local_hnsw.py @@ -43,6 +43,7 @@ class LocalHnswSegment(VectorReader): _index: Optional[hnswlib.Index] _dimensionality: Optional[int] _total_elements_added: int + _total_elements_updated: int _max_seq_id: SeqId _lock: ReadWriteLock diff --git a/chromadb/segment/impl/vector/local_persistent_hnsw.py b/chromadb/segment/impl/vector/local_persistent_hnsw.py index 6e190b926f1..74e9dbc3778 100644 --- a/chromadb/segment/impl/vector/local_persistent_hnsw.py +++ b/chromadb/segment/impl/vector/local_persistent_hnsw.py @@ -89,6 +89,8 @@ class PersistentLocalHnswSegment(LocalHnswSegment): _persist_directory: str _allow_reset: bool + _invalid_operations_since_last_persist: int = 0 + _opentelemtry_client: OpenTelemetryClient def __init__(self, system: System, segment: Segment): @@ -211,6 +213,8 @@ def _persist(self) -> None: with open(self._get_metadata_file(), "wb") as metadata_file: pickle.dump(self._persist_data, metadata_file, pickle.HIGHEST_PROTOCOL) + self._invalid_operations_since_last_persist = 0 + @trace_method( "PersistentLocalHnswSegment._apply_batch", OpenTelemetryGranularity.ALL ) @@ -227,6 +231,7 @@ def _apply_batch(self, batch: Batch) -> None: if ( num_elements_added_since_last_persist + num_elements_updated_since_last_persist + + self._invalid_operations_since_last_persist >= self._sync_threshold ): self._persist() @@ -275,6 +280,7 @@ def _write_records(self, records: Sequence[LogRecord]) -> None: logger.warning( f"Update of nonexisting embedding ID: {record['record']['id']}" ) + self._invalid_operations_since_last_persist += 1 elif op == Operation.ADD: if record["record"]["embedding"] is not None: if not exists_in_index: @@ -282,11 +288,16 @@ def _write_records(self, records: Sequence[LogRecord]) -> None: self._brute_force_index.upsert([record]) else: logger.warning(f"Add of existing embedding ID: {id}") + self._invalid_operations_since_last_persist += 1 elif op == Operation.UPSERT: if record["record"]["embedding"] is not None: self._curr_batch.apply(record, exists_in_index) self._brute_force_index.upsert([record]) - if len(self._curr_batch) >= self._batch_size: + + if ( + len(self._curr_batch) + self._invalid_operations_since_last_persist + >= self._batch_size + ): self._apply_batch(self._curr_batch) self._curr_batch = Batch() self._brute_force_index.clear() diff --git a/chromadb/test/property/test_persist.py b/chromadb/test/property/test_persist.py index 6410cf2f0a4..a23e42a0acd 100644 --- a/chromadb/test/property/test_persist.py +++ b/chromadb/test/property/test_persist.py @@ -169,6 +169,15 @@ def get_index_last_modified_at() -> float: # Should have persisted assert get_index_last_modified_at() > last_modified_at + last_modified_at = get_index_last_modified_at() + + # Invalid updates should also trigger persistence + collection.add(ids=["5"], embeddings=[[5.0]]) + collection.add(ids=["1", "2"], embeddings=[[1.0], [2.0]]) + + # Should have persisted + assert get_index_last_modified_at() > last_modified_at + last_modified_at = get_index_last_modified_at() def load_and_check( From 1547d097554a637252d570210dfedacc1307b206 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Thu, 11 Jul 2024 15:45:46 -0700 Subject: [PATCH 04/10] Fix test on Windows --- chromadb/test/property/test_persist.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/chromadb/test/property/test_persist.py b/chromadb/test/property/test_persist.py index a23e42a0acd..68e07283836 100644 --- a/chromadb/test/property/test_persist.py +++ b/chromadb/test/property/test_persist.py @@ -2,6 +2,7 @@ import multiprocessing from multiprocessing.connection import Connection import multiprocessing.context +import time from typing import Generator, Callable from hypothesis import given import hypothesis.strategies as st @@ -136,6 +137,8 @@ def test_sync_threshold(settings: Settings) -> None: segment = manager.get_segment(collection.id, VectorReader) def get_index_last_modified_at() -> float: + # Time resolution on Windows can be up to 10ms + time.sleep(0.1) try: return os.path.getmtime(segment._get_metadata_file()) # type: ignore[attr-defined] except FileNotFoundError: From cbe04fac58395a7e493ce6b48867de63d6bd77dc Mon Sep 17 00:00:00 2001 From: Max Isom Date: Thu, 11 Jul 2024 17:15:35 -0700 Subject: [PATCH 05/10] Update test to trigger pickle load, fix pickle load for previous versions --- .../segment/impl/vector/local_persistent_hnsw.py | 5 +++++ chromadb/test/property/strategies.py | 8 ++++++-- .../test/property/test_cross_version_persist.py | 14 +++++++++++++- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/chromadb/segment/impl/vector/local_persistent_hnsw.py b/chromadb/segment/impl/vector/local_persistent_hnsw.py index 74e9dbc3778..abd63711b77 100644 --- a/chromadb/segment/impl/vector/local_persistent_hnsw.py +++ b/chromadb/segment/impl/vector/local_persistent_hnsw.py @@ -66,6 +66,11 @@ def __init__( self.label_to_id = label_to_id self.id_to_seq_id = id_to_seq_id + def __setstate__(self, state): + self.__dict__.update(state) + # Field was added after the initial implementation + self.total_elements_updated = 0 + @staticmethod def load_from_file(filename: str) -> "PersistentData": """Load persistent data from a file""" diff --git a/chromadb/test/property/strategies.py b/chromadb/test/property/strategies.py index d6ca4e4c9ed..ccb06ac5e7a 100644 --- a/chromadb/test/property/strategies.py +++ b/chromadb/test/property/strategies.py @@ -282,6 +282,8 @@ def collections( has_embeddings: Optional[bool] = None, has_documents: Optional[bool] = None, with_persistent_hnsw_params: bool = False, + max_hnsw_batch_size: int = 2000, + max_hnsw_sync_threshold: int = 2000, ) -> Collection: """Strategy to generate a Collection object. If add_filterable_data is True, then known_metadata_keys and known_document_keywords will be populated with consistent data.""" @@ -302,9 +304,11 @@ def collections( metadata = {} metadata.update(test_hnsw_config) if with_persistent_hnsw_params: - metadata["hnsw:batch_size"] = draw(st.integers(min_value=3, max_value=2000)) + metadata["hnsw:batch_size"] = draw( + st.integers(min_value=3, max_value=max_hnsw_batch_size) + ) metadata["hnsw:sync_threshold"] = draw( - st.integers(min_value=3, max_value=2000) + st.integers(min_value=3, max_value=max_hnsw_sync_threshold) ) # Sometimes, select a space at random if draw(st.booleans()): diff --git a/chromadb/test/property/test_cross_version_persist.py b/chromadb/test/property/test_cross_version_persist.py index 8d3087777aa..b88f31f8484 100644 --- a/chromadb/test/property/test_cross_version_persist.py +++ b/chromadb/test/property/test_cross_version_persist.py @@ -272,7 +272,15 @@ def persist_generated_data_with_old_version( # Since we can't pickle the embedding function, we always generate record sets with embeddings collection_st: st.SearchStrategy[strategies.Collection] = st.shared( - strategies.collections(with_hnsw_params=True, has_embeddings=True), key="coll" + strategies.collections( + with_hnsw_params=True, + has_embeddings=True, + with_persistent_hnsw_params=True, + # By default, these are set to 2000, which makes it unlikely that index mutations will ever be fully flushed + max_hnsw_sync_threshold=10, + max_hnsw_batch_size=10, + ), + key="coll", ) @@ -336,6 +344,10 @@ def test_cycle_versions( name=collection_strategy.name, embedding_function=not_implemented_ef(), # type: ignore ) + + # Should be able to add embeddings + coll.add(**embeddings_strategy) # type: ignore + invariants.count(coll, embeddings_strategy) invariants.metadatas_match(coll, embeddings_strategy) invariants.documents_match(coll, embeddings_strategy) From cd982666294353d0e0e263e2ad6fdfb57f4d6404 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Thu, 11 Jul 2024 17:48:14 -0700 Subject: [PATCH 06/10] Fix persistent HNSW migration --- chromadb/db/mixins/sysdb.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/chromadb/db/mixins/sysdb.py b/chromadb/db/mixins/sysdb.py index d4b56df5bbd..42375c109ed 100644 --- a/chromadb/db/mixins/sysdb.py +++ b/chromadb/db/mixins/sysdb.py @@ -19,6 +19,7 @@ UniqueConstraintError, ) from chromadb.db.system import SysDB +from chromadb.segment.impl.vector.hnsw_params import PersistentHnswParams from chromadb.telemetry.opentelemetry import ( add_attributes_to_current_span, OpenTelemetryClient, @@ -776,7 +777,12 @@ def _insert_config_from_legacy_params( collections_t = Table("collections") # Get any existing HNSW params from the metadata - hnsw_metadata_params = HnswParams.extract(metadata or {}) + metadata = metadata or {} + if metadata.get("hnsw:batch_size") or metadata.get("hnsw:sync_threshold"): + hnsw_metadata_params = PersistentHnswParams.extract(metadata) + else: + hnsw_metadata_params = HnswParams.extract(metadata) + hnsw_configuration = HNSWConfigurationInternal.from_legacy_params( hnsw_metadata_params # type: ignore[arg-type] ) From ead99cd6f1d5857a83ee26671f9ab91764b2137d Mon Sep 17 00:00:00 2001 From: Max Isom Date: Fri, 12 Jul 2024 11:57:19 -0700 Subject: [PATCH 07/10] [BUG] fix persistent HNSW parameter migration --- chromadb/db/mixins/sysdb.py | 8 +++++++- chromadb/test/property/strategies.py | 10 ++++++---- chromadb/test/property/test_cross_version_persist.py | 7 ++++++- chromadb/test/property/test_persist.py | 4 +++- 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/chromadb/db/mixins/sysdb.py b/chromadb/db/mixins/sysdb.py index d4b56df5bbd..42375c109ed 100644 --- a/chromadb/db/mixins/sysdb.py +++ b/chromadb/db/mixins/sysdb.py @@ -19,6 +19,7 @@ UniqueConstraintError, ) from chromadb.db.system import SysDB +from chromadb.segment.impl.vector.hnsw_params import PersistentHnswParams from chromadb.telemetry.opentelemetry import ( add_attributes_to_current_span, OpenTelemetryClient, @@ -776,7 +777,12 @@ def _insert_config_from_legacy_params( collections_t = Table("collections") # Get any existing HNSW params from the metadata - hnsw_metadata_params = HnswParams.extract(metadata or {}) + metadata = metadata or {} + if metadata.get("hnsw:batch_size") or metadata.get("hnsw:sync_threshold"): + hnsw_metadata_params = PersistentHnswParams.extract(metadata) + else: + hnsw_metadata_params = HnswParams.extract(metadata) + hnsw_configuration = HNSWConfigurationInternal.from_legacy_params( hnsw_metadata_params # type: ignore[arg-type] ) diff --git a/chromadb/test/property/strategies.py b/chromadb/test/property/strategies.py index d6ca4e4c9ed..136811e5e15 100644 --- a/chromadb/test/property/strategies.py +++ b/chromadb/test/property/strategies.py @@ -281,7 +281,7 @@ def collections( with_hnsw_params: bool = False, has_embeddings: Optional[bool] = None, has_documents: Optional[bool] = None, - with_persistent_hnsw_params: bool = False, + with_persistent_hnsw_params: st.SearchStrategy[bool] = st.just(False), ) -> Collection: """Strategy to generate a Collection object. If add_filterable_data is True, then known_metadata_keys and known_document_keywords will be populated with consistent data.""" @@ -292,16 +292,18 @@ def collections( dimension = draw(st.integers(min_value=2, max_value=2048)) dtype = draw(st.sampled_from(float_types)) - if with_persistent_hnsw_params and not with_hnsw_params: + use_persistent_hnsw_params = draw(with_persistent_hnsw_params) + + if use_persistent_hnsw_params and not with_hnsw_params: raise ValueError( - "with_hnsw_params requires with_persistent_hnsw_params to be true" + "with_persistent_hnsw_params requires with_hnsw_params to be true" ) if with_hnsw_params: if metadata is None: metadata = {} metadata.update(test_hnsw_config) - if with_persistent_hnsw_params: + if use_persistent_hnsw_params: metadata["hnsw:batch_size"] = draw(st.integers(min_value=3, max_value=2000)) metadata["hnsw:sync_threshold"] = draw( st.integers(min_value=3, max_value=2000) diff --git a/chromadb/test/property/test_cross_version_persist.py b/chromadb/test/property/test_cross_version_persist.py index 8d3087777aa..cb3cfe88e9c 100644 --- a/chromadb/test/property/test_cross_version_persist.py +++ b/chromadb/test/property/test_cross_version_persist.py @@ -272,7 +272,12 @@ def persist_generated_data_with_old_version( # Since we can't pickle the embedding function, we always generate record sets with embeddings collection_st: st.SearchStrategy[strategies.Collection] = st.shared( - strategies.collections(with_hnsw_params=True, has_embeddings=True), key="coll" + strategies.collections( + with_hnsw_params=True, + has_embeddings=True, + with_persistent_hnsw_params=st.booleans(), + ), + key="coll", ) diff --git a/chromadb/test/property/test_persist.py b/chromadb/test/property/test_persist.py index f91b0ed78fd..a1c6add1619 100644 --- a/chromadb/test/property/test_persist.py +++ b/chromadb/test/property/test_persist.py @@ -58,7 +58,9 @@ def settings(request: pytest.FixtureRequest) -> Generator[Settings, None, None]: collection_st = st.shared( - strategies.collections(with_hnsw_params=True, with_persistent_hnsw_params=True), + strategies.collections( + with_hnsw_params=True, with_persistent_hnsw_params=st.just(True) + ), key="coll", ) From 7515455e8a05a6d21fe384b93e1c2355d6507b54 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Fri, 12 Jul 2024 13:38:52 -0700 Subject: [PATCH 08/10] Persist # of invalid operations --- chromadb/segment/impl/vector/local_hnsw.py | 4 ++++ .../impl/vector/local_persistent_hnsw.py | 22 +++++++++++-------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/chromadb/segment/impl/vector/local_hnsw.py b/chromadb/segment/impl/vector/local_hnsw.py index 86a19cad05a..a439033e446 100644 --- a/chromadb/segment/impl/vector/local_hnsw.py +++ b/chromadb/segment/impl/vector/local_hnsw.py @@ -44,6 +44,7 @@ class LocalHnswSegment(VectorReader): _dimensionality: Optional[int] _total_elements_added: int _total_elements_updated: int + _total_invalid_operations: int _max_seq_id: SeqId _lock: ReadWriteLock @@ -303,6 +304,7 @@ def _write_records(self, records: Sequence[LogRecord]) -> None: batch.apply(record) else: logger.warning(f"Delete of nonexisting embedding ID: {id}") + self._total_invalid_operations += 1 elif op == Operation.UPDATE: if record["record"]["embedding"] is not None: @@ -312,11 +314,13 @@ def _write_records(self, records: Sequence[LogRecord]) -> None: logger.warning( f"Update of nonexisting embedding ID: {record['record']['id']}" ) + self._total_invalid_operations += 1 elif op == Operation.ADD: if not label: batch.apply(record, False) else: logger.warning(f"Add of existing embedding ID: {id}") + self._total_invalid_operations += 1 elif op == Operation.UPSERT: batch.apply(record, label is not None) diff --git a/chromadb/segment/impl/vector/local_persistent_hnsw.py b/chromadb/segment/impl/vector/local_persistent_hnsw.py index abd63711b77..ae4c9fc0bd3 100644 --- a/chromadb/segment/impl/vector/local_persistent_hnsw.py +++ b/chromadb/segment/impl/vector/local_persistent_hnsw.py @@ -42,6 +42,7 @@ class PersistentData: dimensionality: Optional[int] total_elements_added: int total_elements_updated: int + total_invalid_operations: int max_seq_id: SeqId id_to_label: Dict[str, int] @@ -53,6 +54,7 @@ def __init__( dimensionality: Optional[int], total_elements_added: int, total_elements_updated: int, + total_invalid_operations: int, max_seq_id: int, id_to_label: Dict[str, int], label_to_id: Dict[int, str], @@ -61,6 +63,7 @@ def __init__( self.dimensionality = dimensionality self.total_elements_added = total_elements_added self.total_elements_updated = total_elements_updated + self.total_invalid_operations = total_invalid_operations self.max_seq_id = max_seq_id self.id_to_label = id_to_label self.label_to_id = label_to_id @@ -68,8 +71,9 @@ def __init__( def __setstate__(self, state): self.__dict__.update(state) - # Field was added after the initial implementation + # Fields were added after the initial implementation self.total_elements_updated = 0 + self.total_invalid_operations = 0 @staticmethod def load_from_file(filename: str) -> "PersistentData": @@ -94,8 +98,6 @@ class PersistentLocalHnswSegment(LocalHnswSegment): _persist_directory: str _allow_reset: bool - _invalid_operations_since_last_persist: int = 0 - _opentelemtry_client: OpenTelemetryClient def __init__(self, system: System, segment: Segment): @@ -132,6 +134,7 @@ def __init__(self, system: System, segment: Segment): self._dimensionality, self._total_elements_added, self._total_elements_updated, + self._total_invalid_operations, self._max_seq_id, self._id_to_label, self._label_to_id, @@ -218,8 +221,6 @@ def _persist(self) -> None: with open(self._get_metadata_file(), "wb") as metadata_file: pickle.dump(self._persist_data, metadata_file, pickle.HIGHEST_PROTOCOL) - self._invalid_operations_since_last_persist = 0 - @trace_method( "PersistentLocalHnswSegment._apply_batch", OpenTelemetryGranularity.ALL ) @@ -232,11 +233,14 @@ def _apply_batch(self, batch: Batch) -> None: num_elements_updated_since_last_persist = ( self._total_elements_updated - self._persist_data.total_elements_updated ) + num_invalid_operations_since_last_persist = ( + self._total_invalid_operations - self._persist_data.total_invalid_operations + ) if ( num_elements_added_since_last_persist + num_elements_updated_since_last_persist - + self._invalid_operations_since_last_persist + + num_invalid_operations_since_last_persist >= self._sync_threshold ): self._persist() @@ -285,7 +289,7 @@ def _write_records(self, records: Sequence[LogRecord]) -> None: logger.warning( f"Update of nonexisting embedding ID: {record['record']['id']}" ) - self._invalid_operations_since_last_persist += 1 + self._total_invalid_operations += 1 elif op == Operation.ADD: if record["record"]["embedding"] is not None: if not exists_in_index: @@ -293,14 +297,14 @@ def _write_records(self, records: Sequence[LogRecord]) -> None: self._brute_force_index.upsert([record]) else: logger.warning(f"Add of existing embedding ID: {id}") - self._invalid_operations_since_last_persist += 1 + self._total_invalid_operations += 1 elif op == Operation.UPSERT: if record["record"]["embedding"] is not None: self._curr_batch.apply(record, exists_in_index) self._brute_force_index.upsert([record]) if ( - len(self._curr_batch) + self._invalid_operations_since_last_persist + len(self._curr_batch) + self._total_invalid_operations >= self._batch_size ): self._apply_batch(self._curr_batch) From 71c1fceed1bf0db6b844128ffcc74e3fe3585cf2 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Fri, 12 Jul 2024 15:24:05 -0700 Subject: [PATCH 09/10] Fix test --- chromadb/segment/impl/vector/local_hnsw.py | 1 + 1 file changed, 1 insertion(+) diff --git a/chromadb/segment/impl/vector/local_hnsw.py b/chromadb/segment/impl/vector/local_hnsw.py index a439033e446..215f04dcf7d 100644 --- a/chromadb/segment/impl/vector/local_hnsw.py +++ b/chromadb/segment/impl/vector/local_hnsw.py @@ -69,6 +69,7 @@ def __init__(self, system: System, segment: Segment): self._dimensionality = None self._total_elements_added = 0 self._total_elements_updated = 0 + self._total_invalid_operations = 0 self._max_seq_id = self._consumer.min_seqid() self._id_to_seq_id = {} From e01e386f49cdc8f6d81cc635a0ae01ad644330cb Mon Sep 17 00:00:00 2001 From: Max Isom Date: Mon, 15 Jul 2024 09:28:54 -0700 Subject: [PATCH 10/10] Whoops --- chromadb/segment/impl/vector/local_persistent_hnsw.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chromadb/segment/impl/vector/local_persistent_hnsw.py b/chromadb/segment/impl/vector/local_persistent_hnsw.py index ae4c9fc0bd3..afe561d33f7 100644 --- a/chromadb/segment/impl/vector/local_persistent_hnsw.py +++ b/chromadb/segment/impl/vector/local_persistent_hnsw.py @@ -70,10 +70,10 @@ def __init__( self.id_to_seq_id = id_to_seq_id def __setstate__(self, state): - self.__dict__.update(state) # Fields were added after the initial implementation self.total_elements_updated = 0 self.total_invalid_operations = 0 + self.__dict__.update(state) @staticmethod def load_from_file(filename: str) -> "PersistentData":