Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for criteria based DWPT selection inside DocumentWriter #13387

Open
RS146BIJAY opened this issue May 20, 2024 · 20 comments
Open

Support for criteria based DWPT selection inside DocumentWriter #13387

RS146BIJAY opened this issue May 20, 2024 · 20 comments

Comments

@RS146BIJAY
Copy link

RS146BIJAY commented May 20, 2024

Description

Issue

Today, Lucene internally creates multiple DocumentWriterPerThread (DWPT) instances per index to facilitate concurrent indexing across different ingestion threads. When documents are indexed by the same DWPT, they are grouped into the same segment post flush. As DWPT assignment to documents is only concurrency based, it’s not possible to predict or control the distribution of documents within the segments. For instance, during the indexing of time series logs, its possible for a single DWPT to index logs with both 5xx and 2xx status codes, leading to segments that contains a heterogeneous mix of documents.

Typically, in scenarios like log analytics, users are more interested in a certain subset of data (errors (4XX) and/or fault requests (5XX) requests logs). Randomly assigning DWPT to index document can disperse these relevant documents across multiple segments. Furthermore, if these documents are sparse, they will be thinly spread out even within the segments, necessitating the iteration over many less relevant documents for search queries. While the optimisation to use BKD tree to skip non competitive documents by the collectors significantly improves query performance, actual number of documents iterated still depends on arrangement of data in the segment and how underlying BKD gets constructed.

Storing relevant log documents separately from relatively less relevant ones, such as 2xx logs, can prevent their scattering across multiple segments. This model can markedly enhance query performance by streamlining searches to involve fewer segments and omitting documents that are less relevant. Moreover, clustering related data allows for the pre-computation of aggregations for frequently executed queries (e.g., count, minimum, maximum) and store them as separate metadata. Corresponding queries can be served from the metadata itself, thus optimizing both on the latency and compute.

Proposal

In this proposal, we suggest adding support for DWPT selection mechanism based on a specific criteria within the DocumentWriter. Users can define this criteria through a grouping function as a new IndexWriterConfig configuration. This grouping criteria can be based on the anticipated query pattern in the workload to store frequently queried data together. During indexing, this function would be evaluated for each document, ensuring that documents with differing criteria are indexed using separate DWPTs. For instance, in the context of http request logs, the grouping function could be tailored to assign DWPTs according to the status code in the log entry.

Associated OpenSearch RFC

opensearch-project/OpenSearch#13183

Improvements with new DWPT distribution strategy

We worked on a POC in Lucene and tried integrating it with OpenSearch. We validated DWPT distribution based on different criterias such as status code, timestamp etc against different types of workload. We observed a 50% - 60% improvements in performance of range, aggregation and sort queries with proposed DWPT selection approach.

Implementation Details

User defined grouping criteria function will be passed to DocumentWriter as a new IndexWriterConfig configuration. During indexing of a document, the DocumentWriter will evaluate this grouping function and pass this outcome to the DocumentWriterFlushControl and DocumentWriterThreadPool when requesting a DWPT for indexing the document. The DocumentWriterThreadPool will now maintain a distinct pool of DWPTs for each possible outcome. The specific pool selected for indexing a document will depend on the outcome of the document for the grouping function. Should the relevant pool be empty, a new DWPT will be created and added to this pool. Connecting with above example for http request logs, having a distinct pools for 2xx and 5xx status code logs would ensure that 2xx logs are indexed using a separate set of DWPTs from the 5xx status codes logs. Once a DWPT is designated for flushing, it is checked out of the thread pool and won't be reused for indexing.

Further, in order to ensure that grouping criteria invariant is maintained even during segment merges, we propose a new merge policy that acts as a decorator over the existing Tiered Merge policy. During a segment merge, this policy would categorize segments according to their grouping function outcomes before merging segments within the same category, thus maintaining the grouping criteria’s integrity throughout the merge process.

Guardrails

To mange the system’s resources effectively, guardrails will be implemented to limit the numbers of groups that can be generated from grouping function. User will need to provide a predefined list of acceptable outcomes for the grouping function, along with the function itself. Documents whose grouping function outcome is not within this list will be indexed using a default pool of DWPTs. This limits the number of DWPTs created during indexing, preventing the formation of numerous small segments that could lead to frequent segment merges. Additionally, a cap on DWPT count keeps the JVM utilization and garbage collection in check.

@mikemccand
Copy link
Member

I like this idea! I hope we can find a simple enough API exposed through IWC to enable the optional grouping.

This also has nice mechanical sympathy / symmetry with the distributed search engine analog. A distributed search engine like OpenSearch indexes and searches into N shards across multiple servers, and this is nearly precisely the same logical problem that Lucene tackles on a single multi-core server when indexing and searching into N segments, especially as Lucene's intra-query concurrency becomes the norm/default and improves (e.g. allowing intra-segment per query concurrency as well). We should cross-fertilize more based on this analogy: the two problems are nearly the same. A shard, a segment, same thing heh (nearly).

So this proposal is bringing custom document routing feature from OpenSearch, down into Lucene's segments.

@jpountz
Copy link
Contributor

jpountz commented May 22, 2024

This is an interesting idea!

You do not mention it explicitly in the issue description, but presumably this only makes sense if an index sort is configured, otherwise merges may break the clustering that you are trying to create in the first place?

The DocumentWriterThreadPool will now maintain a distinct pool of DWPTs for each possible outcome.

I'm a bit uncomfortable with this approach. It is so heavy that it wouldn't perform much better than maintaining a separate IndexWriter per group? I wonder if we could do something within a single DWPT pool, e.g. could we use rendez-vous hashing to optimistically try to reuse the same DWPT for the same group as often as possible, but only on a best-effort basis, not trading concurrency or creating more DWPTs than indexing concurrency requires?

@RS146BIJAY
Copy link
Author

RS146BIJAY commented May 23, 2024

Thanks Mike and Adrian for the feedback.

You do not mention it explicitly in the issue description, but presumably this only makes sense if an index sort is configured, otherwise merges may break the clustering that you are trying to create in the first place?

Not exactly. As mentioned, in order to ensure that grouping criteria invariant is maintained even during segment merges, we are introducing a new merge policy that acts as a decorator over the existing Tiered Merge policy. During a segment merge, this policy would categorize segments according to their grouping function outcomes before merging segments within the same category, thus maintaining the grouping criteria’s integrity throughout the merge process.

I wonder if we could do something within a single DWPT pool, e.g. could we use rendez-vous hashing to optimistically try to reuse the same DWPT for the same group as often as possible, but only on a best-effort basis, not trading concurrency or creating more DWPTs than indexing concurrency requires?

I believe even if we use a single DWPT pool with rendezvous hashing to distribute DWPTs we would end up creating same number of DWPTs as having different DWPT pools for different group. Consider an example where we are grouping logs based on status code for an index and 8 concurrent indexing thread is indexing 2xx status code logs. This will create 8 DWPTs. Now 4 threads starts indexing 4xx status code logs concurrently, this will require 4 extra DWPTs for indexing logs if we want to maintain status code based grouping. Instead of creating new DWPTs, we can try reusing existing 4 DWPTs created for 2xx status code logs on best effort basis. But this will again mix 4xx status code logs with 2xx status code logs defeating the purpose of status code based grouping of logs. Also to ensure that number of DWPTs created are in check, we will be creating guardrails on number of groups that can be generated from grouping function. Let me know if my understanding is correct.

@jpountz
Copy link
Contributor

jpountz commented May 24, 2024

Thanks for explaining.

The concern I have given how we're planning on never flushing/merging segments from the same group is that this would essentially perform the same as maintaining one IndexWriter per group, which is quite heavy, and can already be done easily on top of Lucene?

To get similar benefits from clustering but without incurring the overhead of segments, I feel like we should rather improve our support for clustering at the doc ID level, ie. index sorting. And maybe ideas like this criteria-based selection of DWPTs could help speed up the creation of sorted indexes?

@RS146BIJAY
Copy link
Author

RS146BIJAY commented May 24, 2024

Thanks for the suggestion. Above suggestion for clustering within the segment does improves skipping of documents (especially when combined with BKD optimisation to skip non competitive documents). But it still limits us from building multiple optimisations which could be done by having separate DWPT pools for different groups:

  • Having separate pool of DWPTs (thus creating separate segments) for different groups, will reduce the cardinality of values within a segment for a specific field. Optimisation like precomputing aggregations with StartTree index tends to perform better when cardinality of the field is not too high.
  • With the above approach, size of the segments can be still high. If we store more relevant logs (like 5xx and 4xx) in a different segments than less relevant ones (like 2xx), size of segments containing error and fault logs will be smaller (since error logs are generally less). This will help us to do storage optimisations like storing more relevant logs (like 5xx logs) on hot storage (like on the node's disk) whereas less relevant logs can be directly stored in cheaper remote storage (e.g.: AWS S3, Google Cloud Storage, MinIO, etc.).

Actually, we won't be able to build multiple optimizations on top of the segment topology if we store them together. Let me know if this makes sense.

@jpountz
Copy link
Contributor

jpountz commented May 27, 2024

I agree that better organizing data across segments yields significant benefits, I'm only advocating for doing this by maintaining a separate IndexWriter for each group instead of doing it inside of DocumentsWriter.

@RS146BIJAY
Copy link
Author

RS146BIJAY commented May 27, 2024

I agree that better organizing data across segments yields significant benefits, I'm only advocating for doing this by maintaining a separate IndexWriter for each group instead of doing it inside of DocumentsWriter.

Sorry missed answering this part in my earlier response. We did explore this approach of creating an IndexWriter/Lucene Index (or OpenSearch shard) for each group. However, implementing this approach would lead to significant overhead on the client side (such as OpenSearch) both in the terms of code changes and operational overhead like metadata management. On the other hand, maintaining separate DWPT pools for different groups would require minimal changes inside Lucene. The overhead will be lesser here as Lucene shard will still be maintained as a single physical unit. Let me know if this makes sense.

@RS146BIJAY
Copy link
Author

Attaching a preliminary PR for the POC related to above issue to share my understanding. Please note that this is not the final PR.

@jpountz
Copy link
Contributor

jpountz commented Jun 3, 2024

However, implementing this approach would lead to significant overhead on the client side (such as OpenSearch) both in the terms of code changes and operational overhead like metadata management.

Can you give more details? The main difference that comes to mind is that using multiple IndexWriters requires multiple Directorys as well and OpenSearch may have a strong assumption that there is a 1:1 mapping between shards and folders on disk. But this could be worked around with a filter Directory that flags each index file with a prefix that identifies the group that each index file belongs to?

@mikemccand
Copy link
Member

I like @jpountz's idea of just using separate IndexWriters for this use-case, instead of adding custom routing logic to the separate DWPTs inside a single IndexWriter and then also needing a custom MergePolicy that ensures that only the like-segments are merged. A separate IndexWriter would cleanly achieve both of these?

The idea of using a single underlying multi-tenant Directory with multiple FilterDirectory wrappers (one per IndexWriter) is interesting -- do we have such a class already (that would distinguish the tenants via filename prefix or so)? That's a nice idea all by itself (separate from this use case) -- maybe open a spinoff to explore that?

You would also need a clean-ish way to manage a single total allowed RAM bytes across the N IndexWriters? I think IndexWriter's flushing policy or RAM accounting was already generalized to allow for this use case, but I don't remember the details.

Searching across the N separate shards as if they were a single index is also possible via MultiReader, though, I'm not sure how well intra-query concurrency works -- maybe it works just fine because the search-time leaves/slices are all union'd across the N shards?

@jpountz
Copy link
Contributor

jpountz commented Jun 4, 2024

do we have such a class already (that would distinguish the tenants via filename prefix or so)? That's a nice idea all by itself (separate from this use case) -- maybe open a spinoff to explore that?

I don't think we do. +1 to exploring this separately. I like that we then wouldn't need to tune the merge policy because it would naturally only see segments that belong to its group.

You would also need a clean-ish way to manage a single total allowed RAM bytes across the N IndexWriters? I think IndexWriter's flushing policy or RAM accounting was already generalized to allow for this use case, but I don't remember the details.

Right, IndexWriter#flushNextBuffer() and IndexWriter#ramBytesUsed() allow building this sort of thing on top of Lucene. It would be nice if Lucene provided more ready-to-use utilities around this.

Searching across the N separate shards as if they were a single index is also possible via MultiReader, though, I'm not sure how well intra-query concurrency works -- maybe it works just fine because the search-time leaves/slices are all union'd across the N shards?

Indeed, I'd expect it to work just fine.

@RS146BIJAY
Copy link
Author

RS146BIJAY commented Jun 27, 2024

Thanks a lot for suggestions @jpountz and @mikemccand.

As suggested above, we worked on a POC to explore using separate IndexWriter for different groups. Each IndexWriter is associated with a distinct logical filter directories, which attaches a filename prefix according to the group. These directories are backed by a single multi tenant directory.

However this approach presents several challenges on the Client (OpenSearch) side. Each IndexWriter now generates its own sequence number. In a service like OpenSearch where Translog operates based on sequence numbers at the Lucene Index level. When the same sequence number is generated across different IndexWriter for a same Lucene Index, conflicts can occur during operation like Translog replay. Additionally, local and global checkpoints maintained during recovery operation in service like OpenSearch require sequence number to be a continuous increasing number which won't be valid with multiple IndexWriter.

We did not face these issue when different groups were represented by different DWPT pools. This is because there was only a single IndexWriter writing to a Lucene Index, generating a continuous increasing sequence number. The complexity of handling different segments for different groups is managed internally at Lucene level, rather than propagating it to the client side. Feel free to share any further suggestions you may have on this.

@mikemccand
Copy link
Member

Each IndexWriter now generates its own sequence number.

This would indeed get somewhat tricky. But is OpenSearch really using Lucene's returned sequence numbers? I had thought Elasticsearch's sequence number implementation predated the Lucene change adding sequence numbers to every low-level Lucene operation that mutates the index.

Under the hood, IndexWriter just uses AtomicLong.incrementAndGet -- maybe we could allow the user to pass in their own AtomicLong and then you could share the same one across N IndexWriters?

@vigyasharma
Copy link
Contributor

I wonder if we can leverage IndexWriter's addIndexes(Directory... dirs) API for this. We could create separate indexes for every category (log groups 2xx, 4xx, 5xx in the example here), and combine them into one using this API. Internally, this version of the API simply copies over all segment files in the directory, so it should be pretty fast.

This could mean that each shard for an OpenSearch/Elasticsearch index would maintain internal indexes for each desired category, and use the API to combine them into a common "shard" index at every flush? We'd still need a way to maintain category labels for a segment during merging, but that's a common problem for any approach we take.

@RS146BIJAY
Copy link
Author

RS146BIJAY commented Sep 19, 2024

Thanks mikemccand and vigyasharma for suggestions. Evaluated different approaches to use different IndexWriter for different groups:

Approach 1: Using filter directory for each group

approach1

In this approach, each group (for above example grouping criteria is status code) has its own IndexWriter, associated with distinct logical filter directories that attach a filename prefix to the segments according to their respective group (200_, 400_ etc.). These directories are backed by a single physical directory. Since different IndexWriter manages segments belonging to different groups, segments belonging to the same group are always merged together. A CompositeIndexWriter wraps the group-level IndexWriters for client (OpenSearch) interaction. While adding or updating a document, this CompositeIndexWriter delegates the operation to corresponding criteria specific IndexWriter. CompositeIndexWriter is associated with the top level physical directory.

To address the sequence number conflict between different IndexWriters, a common sequence number generator was used for all IndexWriters within a shard. This ensures that sequence number are always continuous increasing number for the IndexWriters in the same shard.

Pros

  1. Using separate IndexWriters for different groups ensures that documents from groups are categorised into distinct segments. This approach also eliminates the need to modify merge policy.
  2. Using a common sequence number generator prevent sequence number conflict among IndexWriters belonging to same group. However, since sequence number generation is delegated to the Client (OpenSearch), they must ensure that the sequence numbers are monotonically increasing.

Cons

  1. Lucene internally search for files starting with segments_ or pending_segments_ for operations like getting last commit generation of index or write.lock for checking if lock is applied on directory, etc. Attaching a prefix name to these files may break Lucene’s internal operations.

@RS146BIJAY
Copy link
Author

RS146BIJAY commented Sep 19, 2024

Approach 2: Using a physical directory for each group

approach2

To segregate segments belonging to different groups and avoid attaching a prefix to segment names, we associated group-level IndexWriters with a physical directory instead of a filter directory. CompositeIndexWriter are linked to the top-level multi-tenant directory while group-level IndexWriters are connected to individual directories specific to each group within the parent directory. Since segments belonging to each groups are now in separate directory, there is no need to prefix segment names, thus solving the prefix name issue with above approach. Separate IndexWriter ensures only segments belonging to same group are merged together.

Pros

  1. Having a different directories for each group’s IndexWriter reduces the chances of failing any Lucene’s internal calls.

Cons

  1. Multiple IndexWriters still do not function as a single entity when interacting with the client (OpenSearch). Each IndexWriter has its own associated SegmentInfos, Index commit, SegmentInfos generation and version. This breaks multiple features like segment replication and it’s derivative remote store. For example, in a remote store enabled cluster, we maintain a replica of the shard (single Lucene index) on separate remote storage (such as S3). To achieve this, during each checkpoint, we take a snapshot of the current generation of SegmentInfos associated with the Lucene Index and upload the associated files along with a metadata file (associated with a generation of SegmentInfo) to a remote store. Now with multiple IndexWriter for the same shard, a list of SegmentInfos (one for each group) will be associated. We can handle this by creating a list of snapshots and their separate metadata files, but this essentially translates to maintaining separate Lucene indexes for each shard, essentially making each segment group becoming a shard on the client (OpenSearch) end.
  2. In order to address the above issue, we can try creating a common wrapper for the list of SegmentInfos, similar to what we did for IndexWriters with CompositeIndexWriter. However, this approach also has issues, as the common wrapper would need a common generation and version. Additionally, it should be possible to associate the common wrapper with a specific index commit to allow opening a CompositeIndexWriter at a specific Index commit point. Furthermore, when a CompositeIndexWriter is opened using a commit point, it should be possible to open all the group level sub IndexWriters at that Index commit point. While this is doable, it is extremely complex to implement it.

@RS146BIJAY
Copy link
Author

RS146BIJAY commented Sep 19, 2024

Approach 3: Combining group level IndexWriter with addIndexes

approach3

In this approach, in order to make multiple group-level IndexWriters function as a unified entity, we use the Lucene’s addIndxes api to combine them. This ensures that the top-level IndexWriter shares a common segment_N, SegmentCommitInfos, generation and version. During indexing or update request, the client (such as OpenSearch) will continue to route requests to the appropriate IndexWriter based on the documents’s criteria evaluation. During flush, in addition to flushing the segments of the group-level IndexWriters, we will merge/move them into a single parent IndexWriter using the addIndexes API call. For read (or replication) operations, the client (like OpenSearch) will now open a Reader on the parent IndexWriter.

Pros

  1. Having a common IndexWriter with a single SegmentCommitInfos, generation etc, ensures that client (OpenSearch) is still interacting with Lucene using a single entity.

Cons

  1. When segments of different groups are combined into a single index, we must ensure that only segment within a group are merged together. This will require a new merge policy for top level IndexWriter.
  2. The Lucene addIndexes API seems to acquire a write lock on each directory associated with group level IndexWriters, preventing active writes during the Index merging process. This can cause a downtime on the client (OpenSearch) side during this period. However, this issue could be mitigated if Lucene provided an API to combine these group level indexes as a soft reference, without copying the segment files or locking the group level IndexWriters.
  3. Additionally, index merging involves copying files from group level IndexWriters’ directory to parent IndexWriter directory. This is a resource intensive operation, consuming disk IO and CPU cycles. Moreover, since we open a Reader on the parent IndexWriter (combined IndexWriter from group level IndexWriters), slow index merging may impact reader refresh times delaying visibility of changes for search.

@RS146BIJAY
Copy link
Author

RS146BIJAY commented Sep 19, 2024

Summary

In summary the problem can be broken down into three sub problems.

  1. Having abstraction to write the data into different groups (Multiple Writers)
  2. Having a single interface/entity for multiple groups for client (OpenSearch) interaction (for Sequence id generation, segment replication, etc) with Lucene.
  3. Merging the segments belonging to the same group.

With the different approaches we investigated, none of them satisfies/solves the above 3 sub problems cleanly with decent complexity. That leaves us with the originally suggested approach of using different DWPTs to represent different groups. The original approach:

  1. Uses single IndexWriter and different DWPT which provides clear abstraction for different groups.
  2. With single IndexWriter performs updates, at any given time, only a single SegmentInfos, generation and version were associated with a Lucene index.
  3. Does require a new merge policy to merge the segments belonging to the same group.

Exploring

In parallel, we are still exploring if we can introduce an API for addIndexes in Lucene to combine group level indexes as a soft reference, without copying the segment files or locking the group level IndexWriters. This may solve issues with approach 3.

Open for thoughts and suggestions.

@vigyasharma
Copy link
Contributor

3. Does require a new merge policy to merge the segments belonging to the same group.

How do background index merges work with the original, separate DWPT based approach? Don't you need to ensure that you only merge segments that belong to a single group?

@RS146BIJAY
Copy link
Author

RS146BIJAY commented Sep 20, 2024

Further, in order to ensure that grouping criteria invariant is maintained even during segment merges, we propose a new merge policy that acts as a decorator over the existing Tiered Merge policy. During a segment merge, this policy would categorize segments according to their grouping function outcomes before merging segments within the same category, thus maintaining the grouping criteria’s integrity throughout the merge process.

We will be introducing a new merge policy in this case as well to ensure grouping criteria invariant is maintained even during segment merges. Original changes proposed was DWPT side of changes with a new merge policy which ensure same group segments are merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants