Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Context Aware Segments #13183

Open
RS146BIJAY opened this issue Apr 14, 2024 · 18 comments
Open

[RFC] Context Aware Segments #13183

RS146BIJAY opened this issue Apr 14, 2024 · 18 comments
Assignees
Labels
enhancement Enhancement or improvement to existing feature or request feature New feature or request RFC Issues requesting major changes Roadmap:Cost/Performance/Scale Project-wide roadmap label Search:Performance

Comments

@RS146BIJAY
Copy link
Contributor

RS146BIJAY commented Apr 14, 2024

Abstract

This RFC proposes a new context aware/based segment creation and merging strategy for OpenSearch to improve query performance by co-locating related data within same physical segments, particularly benefiting log analytics and metrics use cases.

Motivation

In OpenSearch, a typical workload involves log analytics and metrics data, where for the majority of search queries, only a subset of the data is more relevant. For instance, when analyzing application logs, users are often more interested in errors (4XX) and/or fault requests (5XX) requests, which generally constitute only a minor portion of the logs. Current segment creation (via flush) and merging policies/strategies (both Tiered and LogByteSize) does not incorporate the anticipated query context while grouping data into segments.

  • Tiered Merge policy: Tiered Merge policy tries to minimize write amplification by always favoring merges with lower skew (size of largest segment divided by smallest segment), smaller size and those reclaiming more deletes. However, this often disperses related data (data which will be queried together) across multiple segments.
  • LogByteSizeMergePolicy: Preserves the initial document order upon insertion. However, the insertion sequence of data may be different from the order in which it will be queried. While this will be helpful for time series data where documents are sorted based on timestamp, it doesn't work well for out of order data or queries on fields other than timestamp.

This leads to segments containing a mix of relevant and less relevant documents. We can improve query performance by grouping relevant data together and removing this dispersion as:

  • Grouping relevant documents together in same segment reduces the need to search through multiple segments for relevant documents, potentially boosting performance since fewer segments are involved in serving a query. Connecting with the above example for application logs, since users are often more interested in errors (4XX) and/or fault requests (5XX) requests, segregating these logs from the 2xx logs prevent their dispersion across multiple segments, leading to improved query efficiency due to fewer segments to examine.
  • Without grouping relevant data together, each segment will contain only a fraction of data relevant to a query, necessitating the iteration over many less relevant documents as well. While the optimisation to use BKD tree to skip non competitive documents by the collectors significantly improves query performance, actual number of documents iterated still depends on arrangement of data in the segment and how underlying BKD gets constructed. Grouping relevant data together mitigates the sparsity of relevant documents in a segment and reduces the need to sift through a large amount of less relevant data to isolate the relevant ones.
  • With the introduction of segment groups and the identification of less relevant data during ingestion, less relevant data can be directly stored in cheaper remote storage (e.g.: AWS S3, Google Cloud Storage, MinIO, etc.) , while more relevant data can be kept in hot storage (on the node's disk). This strategy conserves memory and disk space on local storage without affecting latency.
  • Storing related data together enables the pre-computation of aggregations for frequently executed queries (e.g., count, minimum, maximum, average, percentiles) and store them as separate metadata. Corresponding queries can be served from the metadata itself, thus optimizing both on the latency and compute.

Proposal

The solution introduces Context aware/based segments that group related data in the same physical segments. This grouping is achieved through a user defined predicate which can be specified as a configuration. This grouping through predicate evaluation occurs during both flush and segment merge flows, ensuring that related data is consistently co-located in the same segments.

  • Flush: The DocumentWriter will assign the same DocumentWriterPerThread (DWPT) object to documents with same predicate evaluation. This ensures related data lands in same segment during flush.
  • Merge: A new Context aware/based merge policy will ensure that predicate invariant is maintained even during segment merges.

Example Use case

For application request logs, if anticipated queries will be on status codes, user can define a predicate to group data based on status codes as a configuration (like group all successful(2xx), faults(4xx) and error(5xx) status codes separately). This will ensure that during indexing, same DWPT gets assigned to log entry with same status code. ContextAware merge policy will ensure segment with same status codes get merged together. Consequently, search queries like “number of faults in the last hour” or “number of errors in the last three hours” will be more efficient, as they will need to only process segments with 4xx or 5xx status codes, which will be a much smaller dataset, improving query performance.

Merging segment groups

The Context aware/based merge policy employs a hierarchical merging strategy to merge segments evaluated within the same group (based on the configured predicate). This strategy orchestrates the merging process across multiple hierarchical levels, in a way that reflects the natural hierarchy of data attributes. In this approach:

  • During indexing, data is initially flushed at the lower hierarchical level. For eg, for time series data with day based grouping, data could be initially flushed at hourly level.
  • Merges into broader hierarchical level are triggered when the total size of segments within a particular group exceeds a predefined threshold. In the above example, Context Aware merge policy could start merging hourly segments into daily segments once total size of segments for a day surpasses 1 GB (say).
  • When segments are elevated to a higher hierarchical level, they are merged in an ascending order of their subordinate hierarchical attributes. Connecting with above example, hourly segments would be merged into daily segments in ascending order of the hour.

This approach ensures data within segments is nearly ordered, improving query performance as skipping non competitive documents via BKD works best when data is sorted. Moreover, this strategy reduces the frequency of merges, as merges to higher level are only executed upon reaching the threshold, thereby enhancing indexing performance. The trade-off, however, is an increased number of initial segments.

Considerations

  • In the initial implementation, grouping criteria will be a user defined predicate. Future exploration could involve automatic criteria selection based on workload.
  • Selecting the appropriate grouping criteria is crucial. Too small groups can increase the number of DWPT required in DocumentWriter regressing indexing performance. Additionally, this can also lead to multiple small segments. Conversely, selecting too large group can regress query performance. Implementing a guardrail around grouping criteria can prevent excessive small or large grouping.
  • OpenSearch queries with multiple clauses can be re-written to pick clause on the entity on which grouping is done for a more efficient query execution order.

POC Links

grouping criteria: day based grouping for threshold = 300 MB
OpenSearch changes: main...RS146BIJAY:OpenSearch:data-aware-merge-policy
Lucene changes: apache/lucene@main...RS146BIJAY:lucene:grouping-segments

Next Steps

Would appreciate any feedback on the overall idea and proposal. We are in the process of assessing benchmarks for memory usage, disk space usage, throughput and latency with this optimization. We will compile the results and publish it soon.

@RS146BIJAY RS146BIJAY added enhancement Enhancement or improvement to existing feature or request untriaged labels Apr 14, 2024
@josefschiefer27
Copy link

josefschiefer27 commented Apr 14, 2024

This is a great strategy for reducing data volume with sparse data while also enhancing query performance. In the case of highly sparse data, such as with the http-error-codes example, very sparse fields could be even treated as constants within the metadata, eliminating the necessity to store any field values (e.g.store only the '200' metadata for segments that contain only the very common 200 error codes).

@RS146BIJAY RS146BIJAY added the RFC Issues requesting major changes label Apr 15, 2024
@mgodwan
Copy link
Member

mgodwan commented Apr 16, 2024

@RS146BIJAY Could you share benchmarks with the POCs you have linked?

@reta
Copy link
Collaborator

reta commented Apr 16, 2024

I think I have too many questions but one that probably stands out right now is regarding grouping predicate. This is essentially an arbitrary piece of code, right? What means user would use to supply this grouping predicate (arbitrary code) for particular index / indices + merge policy to OpenSearch?

@RS146BIJAY
Copy link
Contributor Author

RS146BIJAY commented Apr 17, 2024

This is essentially an arbitrary piece of code, right?

@reta The grouping predicate will be configurable for an index and won't be a standalone code (the above POC is for when grouping criteria is day based grouping for threshold = 300 MB). This grouping criteria passed by user will determine which set of data will be grouped together. The details around exact structure of predicate will be published separately as a separate github issue.

@peternied
Copy link
Member

[Triage - attendees 1 2 3 4 5 6 7]
@RS146BIJAY Thanks for creating this RFC

@Bukhtawar
Copy link
Collaborator

Lets link this with #12683

@shwetathareja shwetathareja added the feature New feature or request label Apr 18, 2024
@RS146BIJAY
Copy link
Contributor Author

RS146BIJAY commented Apr 19, 2024

For HTTP logs workload and daily grouping

Bulk Indexing Client = 8

Grouping criteria

In this scenario we use a day based grouping for http_logs with hour as sub grouping criteria. Context Aware merge policy will start merging hourly segments into daily segments once total size of segments for a day surpasses threshold of 300 MB.

Performance Benchmark Metrics

Indexing

Size of index

Compression works more effectively for context aware segments, resulting in an improvement of approximately 19% in the size of indices over Tiered merge and log byte size merge policy. This happens because data is nearly ordered in Context Aware segments.

Latency

There is a minor improvement (3% - 6%) in indexing latency when we group hourly logs together during flush time and merge them into daily segments in increasing order of hour inside DataAware merge policy. This is because hourly segments are merged into daily segments only when total size of segments within that particular day is above 300 MB limit.

Segment Count

With DataAware merge policy (day based grouping) and hourly flushing, segment count tends to increase (about 4 to 5 times) for http_logs workload. This increase is attributed to non uniform distribution of data across days in http_log workload. However, this scenario is unlikely to occur in reality, where logs would be more uniformly distributed across days.

Search

For context aware segments, we see a significant improvement in performance for both range and ascending sort queries. This is because data is sorted in near-ascending order within Context Aware Segment. On the flip side, the efficiency of descending sort queries regresses with this method. To fix this regression in desc sort queries, a potential solution is to traverse segment data in reverse order (will create a separate lucene issue for this and link it here).

Query type % improvement over Tiered Merge Policy % improvement over log byte size merge policy
range 71.3 68.16
asc_sort_timestamp 36.8 61.12
asc_sort_with_after_timestamp 53.8 35.89
desc_sort_timestamp -1997.03 -848.79
desc_sort_with_after_timestamp -1116.9 -329.86
hourly aggregations -15.3 -18.3

Segment merges

Intemediary number of segments is higher with Data Aware merge policy as compared to Tiered and LogByteSize merge policies. Below metrics on merge size (order is Tiered, LogByteSize and DataAware from top to bottom) shows that while Tiered merge policies do allow merging larger segments during indexing, Data Aware merge policy initially merges smaller hourly segments (keeping segment count high). And only once segment size for a day exceeds 300 MB, Data Aware merge policy shifts to merging larger segments.

merge-size

CPU, JVM and GC count

CPU and JVM remains same for all the merge policies during entire indexing operation. GC count for DataAware merge policies is slightly higher due to fact that we are allocating different DWPTs for logs with different timestamps (hour) in DocumentWriter.

(order is Tiered, LogByteSize and DataAware from top to bottom)

merge1

@RS146BIJAY
Copy link
Contributor Author

RS146BIJAY commented Apr 21, 2024

For HTTP logs workload and grouping by status code

Bulk Indexing Client =8

Grouping criteria

In this setup, we implemented grouping based on status code for the http_logs workload. Here we separated logs of successful requests from those with error (4xx) and fault(5xx) status codes. Segments are initially flushed at per status code level (sub grouping criteria). Context Aware merge policy will start merging error and fault status segments together
when thier aggregated size reaches a 2GB thrershold.

Performance Benchmark Metrics

Indexing

Size of index

Since with just status based grouping, data is not as ordered as previous case of Day based grouping with hourly sub grouping, we observe around 3% improvement in index size with DataAware merge policy.

Latency

Indexing latency remains almost same as Tiered and LogByteSize merge policy.

Segment Count

Since there are only two groups of successful (2xx) and failed status segments (4xx and 5xx), number of segments is almost same as Tiered and log byte size merge policy.

Search

With status code based grouping strategy, we see a considerable improvements in performance of range, aggregation and sort queries (order by timestamp) for fault/error logs within specific time range. This efficiency is attributed to lesser number of iterations to find fault/error logs as they are spread across fewer segments as compared to Tiered and LogByteSize merge policies.

Query type % improvement over Tiered Merge Policy % improvement over log byte size merge policy
range (400 status code logs within a given time range) 60 73.3
asc_sort_timestamp (400 status code logs within a given time range sorted by timestamp) 65 64.8
asc_sort_with_after_timestamp (500 status code logs after a timestamp and sorted by timestamp) 47.5 47.2
desc_sort_timestamp (400 status code logs within a given time range sorted in desc order by timestamp) 43.9 44.1
desc_sort_with_after_timestamp (500 status code logs after a timestamp and sorted in desc order by timestamp) 44.7 47.5
hourly aggregations (Day histogram of logs with status codes 400, 403 and 404 with min and max sizes of request) 45 46.6

With DataAware we are iterating fewer number of times (documents + segments) across segments to locate error/fault logs:

Query type Iterations count in Tiered Merge Policy Iterations count in LogByteSize merge policy
range (400 status code logs within a given time range) 3.2x more iterations than DataAware 3.8x more iterations than DataAware
asc_sort_timestamp (400 status code logs within a given time range sorted by timestamp) 15.3x more iterations than DataAware 15.6x more iterations than DataAware
asc_sort_with_after_timestamp (500 status code logs after a timestamp and sorted by timestamp) 15.3x more iterations than DataAware 15.6x more iterations than DataAware
desc_sort_timestamp (400 status code logs within a given time range sorted in desc order by timestamp) 15.3x more iterations than DataAware 15.6x more iterations than DataAware
desc_sort_with_after_timestamp (500 status code logs after a timestamp and sorted in desc order by timestamp) 15.3x more iterations than DataAware 15.6x more iterations than DataAware
hourly aggregations (Day histogram of logs with status codes 400, 403 and 404 with min and max sizes of request) 40x more iterations than DataAware 40x more iterations than DataAware

@RS146BIJAY
Copy link
Contributor Author

RS146BIJAY commented Apr 24, 2024

Would appreciate any initial feedback from @shwetathareja, @nknize, @Bukhtawar, @sachinpkale, @muralikpbhat, @reta, @msfroh and @backslasht as well, so tagging more folks for visibility. Thanks!

@RS146BIJAY
Copy link
Contributor Author

RS146BIJAY commented Apr 26, 2024

For HTTP logs workload and daily grouping

Bulk Indexing Client = 1
Grouping criteria = Day based grouping with a threshold = 300 MB (when hour grouping segments will be elevated to day grouping in DataAware merge policy)

Performance Benchmark Metrics

Indexing

Size of index

Since data is completely sorted by timestamp, with DataAware merge policy there is no considerable improvement in size of index.

Latency

Indexing latency remained almost same with DataAware merge policy as Tiered and LogByteSize merge policy.

Segment Count

Similar to case when bulk_indexing_client = 8, with DataAware merge policy (day based grouping) and hourly flushing, segment count tends to increase (about 4 to 5 times) for http_logs workload.

Search

Since logs are order in increasing order of timestamp, there is no significant difference in the
performance of LogByteSize and Data Aware merge policy for this scenario. The reason behind this is when data is organised by timestamp, the skipping mechanism with BKD ensures equal optimal performance for both time range and sort queries. Consequently, the Data Aware merge policy's approach of grouping more relevant data together doest not influence performance, as BKD would skip the same number of documents regardless. (LogByteSize merge policy performs slightly better with DataAware merge policy, but with some tuning we can match this performance).

For asc sort and desc sort query with search after timestamp, DataAware performs better over LogByteSize merge policy. This is because of a bug in skipping logic while scoring document inside Lucene and improvement is not specifically related to DataAware segments.

Query type % improvement over Tiered Merge Policy % improvement over log byte size merge policy
range (logs with a given time range with 1000 response size) 4.2 -3.3
asc_sort_timestamp (logs within a given time range sorted in asc order with 1000 response size) 53.5 -19.9
asc_sort_with_after_timestamp (logs after a timestamp, sorted in asc order 1000 response size) 99.4 97.9
desc_sort_timestamp (logs within a given time range sorted in desc order with 1000 response size) -6.1 -7.1
desc_sort_with_after_timestamp (logs after a timestamp, sorted in desc order with 1000 response size) -7.8 78.8
hourly aggregations -3.9 -2.7

@reta
Copy link
Collaborator

reta commented Apr 28, 2024

Would appreciate any initial feedback from @shwetathareja, @nknize, @Bukhtawar, @sachinpkale, @muralikpbhat, @reta, @msfroh and @backslasht as well, so tagging more folks for visibility. Thanks!

Thanks @RS146BIJAY , the numbers look promising, but don't we violate the DPWT design by having man-in-the-middle (grouping) here? To elaborate on that, my understanding is that DW uses DPWT to associate the ingest threads with writer threads, with intention to eliminate synchronization (this is also highlighted in DW javadocs). With grouping, this won't be the case anymore - the grouping layer adds the indirection where multiple threads would be routed to the same DPWT. Is that right or my understanding is not correct? Thank you.

@Bukhtawar
Copy link
Collaborator

Would appreciate any initial feedback from @shwetathareja, @nknize, @Bukhtawar, @sachinpkale, @muralikpbhat, @reta, @msfroh and @backslasht as well, so tagging more folks for visibility. Thanks!

Thanks @RS146BIJAY , the numbers look promising, but don't we violate the DPWT design by having man-in-the-middle (grouping) here? To elaborate on that, my understanding is that DW uses DPWT to associate the ingest threads with writer threads, with intention to eliminate synchronization (this is also highlighted in DW javadocs). With grouping, this won't be the case anymore - the grouping layer adds the indirection where multiple threads would be routed to the same DPWT. Is that right or my understanding is not correct? Thank you.

I had a similar concern @reta but the lock-free model of DWPT can still be improvised/matched with creating just enough number of DWPTs that can write concurrently to minimise contention or create more instances on demand if the lock is already acquired. So yes there needs to be some coordination but shouldn't directly require synchronisation.

@RS146BIJAY
Copy link
Contributor Author

RS146BIJAY commented Apr 29, 2024

With grouping, this won't be the case anymore - the grouping layer adds the indirection where multiple threads would be routed to the same DPWT. Is that right or my understanding is not correct?

Not excatly. To add to what @Bukhtawar mentioned, before this change DWPT Pool maintains a list of free DWPT on which no lock is present and no write is happening. Incase all DWPTs are locked, a new instance of DWPT is created on which write happens.

With our change, this free list is maintained at individual group level inside DWPT pool. If there is no free DWPT for a group, a new instance of DWPT for that group will be created and write will happen on it. So active number of DWPT in our case will be higher than what it was earlier, but each thread even now will be routed to different DWPT.

@RS146BIJAY
Copy link
Contributor Author

RS146BIJAY commented Apr 29, 2024

Adding few more scenarioes,

For HTTP logs workload and status grouping (bulk indexing client = 1)

Bulk Indexing Client = 1

Grouping criteria

In this setup, we implemented grouping based on status code for the http_logs workload with bulk_indexing_client = 1. Here we separated logs of successful requests from those with error (4xx) and fault(5xx) status codes. Segments are initially flushed at per status code level (sub grouping criteria). Context Aware merge policy will start merging error and fault status segments together
when thier aggregated size reaches a 2GB thrershold.

Performance Benchmark Metrics

Indexing

Size of index

Since data is completely sorted by timestamp, with DataAware merge policy there is no considerable improvement in size of index.

Latency

Indexing latency remained almost same with DataAware merge policy as Tiered and LogByteSize merge policy.

Segment Count

Since there are only two groups of successful (2xx) and failed status segments (4xx and 5xx), number of segments is almost same as Tiered and log byte size merge policy.

Search

Similar to scenario when bulk_indexing_client > 1, when there is a single bulk indexing client and with status code based grouping strategy, we see a considerable improvements in performance of range, aggregation and sort queries (order by timestamp) for fault/error logs within specific time range. This efficiency is again attributed to lesser number of iterations to find fault/error logs as they are spread across fewer segments as compared to Tiered and LogByteSize merge policies.

Query type % improvement over Tiered Merge Policy % improvement over log byte size merge policy
range (logs with a given time range with status code 500 and 1000 response size) 73.7 50
asc_sort_timestamp (logs within a given time range with status code 500, sorted in asc order with 1000 response size) 73.9 50
asc_sort_with_after_timestamp (logs after a timestamp with status code 500, sorted in asc order 1000 response size) 87.5 82.3
desc_sort_timestamp (logs within a given time range with status code 500 sorted in desc order with 1000 response size) 85 77.8
desc_sort_with_after_timestamp (logs after a timestamp with status code 500, sorted in desc order with 1000 response size) 86.8 77.2
hourly aggregations (daily agg of 400, 403 and 404 status code logs on min and max size) 89.7 64.8

@shwetathareja
Copy link
Member

The details around exact structure of predicate will be published separately as a separate github issue.

@RS146BIJAY : We should also explore if the predicate can be deduced from user's frequent queries.

@RS146BIJAY
Copy link
Contributor Author

We should also explore if the predicate can be deduced from user's frequent queries.

As a part of first phase, we will ask this as an input from the user itself. We will eventually explore how we can automatically determine grouping criteria based on Cx workload

@reta
Copy link
Collaborator

reta commented Apr 30, 2024

So active number of DWPT in our case will be higher than what it was earlier, but each thread even now will be routed to different DWPT.

@RS146BIJAY so it becomes function of a group? and in this case, if the cardinality of the group is high, we could easily OOM the JVM, right? we need the guardrails in place

@RS146BIJAY
Copy link
Contributor Author

RS146BIJAY commented Apr 30, 2024

Selecting the appropriate grouping criteria is crucial. Too small groups can increase the number of DWPT required in DocumentWriter regressing indexing performance. Additionally, this can also lead to multiple small segments. Conversely, selecting too large group can regress query performance. Implementing a guardrail around grouping criteria can prevent excessive small or large grouping.

@reta Yes. We will be implementing proper guardrails on grouping criteria to not allow too small or too large groups.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request feature New feature or request RFC Issues requesting major changes Roadmap:Cost/Performance/Scale Project-wide roadmap label Search:Performance
Projects
Status: New
Status: Next (Next Quarter)
Development

No branches or pull requests

8 participants