Skip to content

Commit

Permalink
Merge branch 'main' into polling-2
Browse files Browse the repository at this point in the history
  • Loading branch information
mch2 committed Sep 4, 2024
2 parents 173fcd9 + 7b0846e commit aa4c11c
Show file tree
Hide file tree
Showing 29 changed files with 761 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@

package org.opensearch.indices.settings;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -110,7 +113,6 @@ public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOExceptio
// add back a node
internalCluster().startDataOnlyNode();
ensureGreen(TEST_INDEX);

}

public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOException {
Expand Down Expand Up @@ -175,6 +177,39 @@ public void testSearchReplicaScaling() {
assertActiveSearchShards(0);
}

public void testSearchReplicaRoutingPreference() throws IOException {
int numSearchReplicas = 1;
int numWriterReplicas = 1;
internalCluster().startClusterManagerOnlyNode();
String primaryNodeName = internalCluster().startDataOnlyNode();
createIndex(
TEST_INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas)
.build()
);
ensureYellow(TEST_INDEX);
client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
// add 2 nodes for the replicas
internalCluster().startDataOnlyNodes(2);
ensureGreen(TEST_INDEX);

assertActiveShardCounts(numSearchReplicas, numWriterReplicas);

// set preference to search replica here - we default to this when there are
// search replicas but tests will randomize this value if unset
SearchResponse response = client().prepareSearch(TEST_INDEX)
.setPreference(Preference.SEARCH_REPLICA.type())
.setQuery(QueryBuilders.matchAllQuery())
.get();

String nodeId = response.getHits().getAt(0).getShard().getNodeId();
IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable();
assertEquals(nodeId, indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId());
}

/**
* Helper to assert counts of active shards for each type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import static org.hamcrest.Matchers.lessThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class DeleteSnapshotITV2 extends AbstractSnapshotIntegTestCase {
public class DeleteSnapshotV2IT extends AbstractSnapshotIntegTestCase {

private static final String REMOTE_REPO_NAME = "remote-store-repo-name";

Expand Down Expand Up @@ -276,9 +276,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");
Path segmentsPath = Path.of(String.valueOf(shardPath), "segments");
Path translogPath = Path.of(String.valueOf(shardPath), "translog");

// Get total segments remote store directory file count for deleted index and shard 0
int segmentFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath);
int translogFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(translogPath);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

Expand Down Expand Up @@ -312,6 +314,13 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1));
} catch (Exception e) {}
}, 60, TimeUnit.SECONDS);

assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1));
} catch (Exception e) {}
}, 60, TimeUnit.SECONDS);

}

private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,15 +647,11 @@ public ShardIterator replicaActiveInitializingShardIt() {
return new PlainShardIterator(shardId, Collections.emptyList());
}

LinkedList<ShardRouting> ordered = new LinkedList<>();
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.addFirst(replica);
} else if (replica.initializing()) {
ordered.addLast(replica);
}
}
return new PlainShardIterator(shardId, ordered);
return filterAndOrderShards(replica -> true);
}

public ShardIterator searchReplicaActiveInitializingShardIt() {
return filterAndOrderShards(ShardRouting::isSearchOnly);
}

/**
Expand Down Expand Up @@ -686,6 +682,20 @@ public ShardIterator replicaFirstActiveInitializingShardsIt() {
return new PlainShardIterator(shardId, ordered);
}

private ShardIterator filterAndOrderShards(Predicate<ShardRouting> filter) {
LinkedList<ShardRouting> ordered = new LinkedList<>();
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (filter.test(replica)) {
if (replica.active()) {
ordered.addFirst(replica);
} else if (replica.initializing()) {
ordered.addLast(replica);
}
}
}
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns an iterator on active and initializing shards residing on the provided nodeId.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class OperationRouting {
private volatile boolean isFailOpenEnabled;
private volatile boolean isStrictWeightedShardRouting;
private volatile boolean ignoreWeightedRouting;
private final boolean isReaderWriterSplitEnabled;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
// whether to ignore awareness attributes when routing requests
Expand All @@ -141,6 +142,7 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled);
clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting);
clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting);
this.isReaderWriterSplitEnabled = FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings);
}

void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
Expand Down Expand Up @@ -254,6 +256,14 @@ public GroupShardsIterator<ShardIterator> searchShards(
preference = Preference.PRIMARY_FIRST.type();
}

if (isReaderWriterSplitEnabled) {
if (preference == null || preference.isEmpty()) {
if (indexMetadataForShard.getNumberOfSearchOnlyReplicas() > 0) {
preference = Preference.SEARCH_REPLICA.type();
}
}
}

ShardIterator iterator = preferenceActiveShardIterator(
shard,
clusterState.nodes().getLocalNodeId(),
Expand Down Expand Up @@ -366,6 +376,8 @@ private ShardIterator preferenceActiveShardIterator(
return indexShard.primaryFirstActiveInitializingShardsIt();
case REPLICA_FIRST:
return indexShard.replicaFirstActiveInitializingShardsIt();
case SEARCH_REPLICA:
return indexShard.searchReplicaActiveInitializingShardIt();
case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public enum Preference {
*/
REPLICA_FIRST("_replica_first"),

/**
* Route to search replica shards
*/
SEARCH_REPLICA("_search_replica"),

/**
* Route to the local shard only
*/
Expand Down Expand Up @@ -127,6 +132,8 @@ public static Preference parse(String preference) {
return ONLY_LOCAL;
case "_only_nodes":
return ONLY_NODES;
case "_search_replica":
return SEARCH_REPLICA;
default:
throw new IllegalArgumentException("no Preference for [" + preferenceType + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState r

// adding metric fields
for (Metric metric : starTreeMetadata.getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
fields.add(
fullyQualifiedFieldNameForStarTreeMetricsDocValues(
compositeFieldName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

Expand All @@ -23,10 +24,18 @@
public class Metric implements ToXContent {
private final String field;
private final List<MetricStat> metrics;
private final List<MetricStat> baseMetrics;

public Metric(String field, List<MetricStat> metrics) {
this.field = field;
this.metrics = metrics;
this.baseMetrics = new ArrayList<>();
for (MetricStat metricStat : metrics) {
if (metricStat.isDerivedMetric()) {
continue;
}
baseMetrics.add(metricStat);
}
}

public String getField() {
Expand All @@ -37,6 +46,13 @@ public List<MetricStat> getMetrics() {
return metrics;
}

/**
* Returns only the base metrics
*/
public List<MetricStat> getBaseMetrics() {
return baseMetrics;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,7 @@ public List<MetricAggregatorInfo> generateMetricAggregatorInfos(MapperService ma
metricAggregatorInfos.add(metricAggregatorInfo);
continue;
}
for (MetricStat metricStat : metric.getMetrics()) {
if (metricStat.isDerivedMetric()) {
continue;
}
for (MetricStat metricStat : metric.getBaseMetrics()) {
FieldValueConverter fieldValueConverter;
Mapper fieldMapper = mapperService.documentMapper().mappers().getMapper(metric.getField());
if (fieldMapper instanceof FieldMapper && ((FieldMapper) fieldMapper).fieldType() instanceof FieldValueConverter) {
Expand Down Expand Up @@ -185,7 +182,7 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat

List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
SequentialDocValuesIterator metricReader;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metricStat.equals(MetricStat.DOC_COUNT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
// get doc id set iterators for metrics
for (Metric metric : starTreeValues.getStarTreeField().getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
String metricFullName = fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTreeValues.getStarTreeField().getName(),
metric.getField(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ StarTreeDocument[] getSegmentsStarTreeDocuments(List<StarTreeValues> starTreeVal
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
// get doc id set iterators for metrics
for (Metric metric : starTreeValues.getStarTreeField().getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
String metricFullName = fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTreeValues.getStarTreeField().getName(),
metric.getField(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,37 @@ private int readMetricsCount() throws IOException {
private List<Metric> readMetricEntries() throws IOException {
int metricCount = readMetricsCount();

Map<String, Metric> starTreeMetricMap = new LinkedHashMap<>();
Map<String, List<MetricStat>> starTreeMetricStatMap = new LinkedHashMap<>();
for (int i = 0; i < metricCount; i++) {
String metricName = meta.readString();
int metricStatOrdinal = meta.readVInt();
MetricStat metricStat = MetricStat.fromMetricOrdinal(metricStatOrdinal);
Metric metric = starTreeMetricMap.computeIfAbsent(metricName, field -> new Metric(field, new ArrayList<>()));
metric.getMetrics().add(metricStat);
List<MetricStat> metricStats = starTreeMetricStatMap.computeIfAbsent(metricName, field -> new ArrayList<>());
metricStats.add(metricStat);
}
List<Metric> starTreeMetricMap = new ArrayList<>();
for (Map.Entry<String, List<MetricStat>> metricStatsEntry : starTreeMetricStatMap.entrySet()) {
addEligibleDerivedMetrics(metricStatsEntry.getValue());
starTreeMetricMap.add(new Metric(metricStatsEntry.getKey(), metricStatsEntry.getValue()));

return new ArrayList<>(starTreeMetricMap.values());
}
return starTreeMetricMap;
}

/**
* Add derived metrics if all associated base metrics are present
*/
private void addEligibleDerivedMetrics(List<MetricStat> metricStatsList) {
Set<MetricStat> metricStatsSet = new HashSet<>(metricStatsList);
for (MetricStat metric : MetricStat.values()) {
if (metric.isDerivedMetric() && !metricStatsSet.contains(metric)) {
List<MetricStat> sourceMetrics = metric.getBaseMetrics();
if (metricStatsSet.containsAll(sourceMetrics)) {
metricStatsList.add(metric);
metricStatsSet.add(metric);
}
}
}
}

private int readSegmentAggregatedDocCount() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public StarTreeValues(

// get doc id set iterators for metrics
for (Metric metric : starTreeMetadata.getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
String metricFullName = fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTreeField.getName(),
metric.getField(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,9 +653,7 @@ public byte[] encodePoint(Number value) {

@Override
public double toDoubleValue(long value) {
byte[] bytes = new byte[8];
NumericUtils.longToSortableBytes(value, bytes, 0);
return NumericUtils.sortableLongToDouble(NumericUtils.sortableBytesToLong(bytes, 0));
return objectToDouble(value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,7 @@ private List<Metric> buildMetrics(String fieldName, Map<String, Object> map, Map
}
int numBaseMetrics = 0;
for (Metric metric : metrics) {
for (MetricStat metricStat : metric.getMetrics()) {
if (metricStat.isDerivedMetric() == false) {
numBaseMetrics++;
}
}
numBaseMetrics += metric.getBaseMetrics().size();
}
if (numBaseMetrics > context.getSettings()
.getAsInt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ static long getGeneration(String[] filenameTokens) {

public static long getTimestamp(String filename) {
String[] filenameTokens = filename.split(SEPARATOR);
return RemoteStoreUtils.invertLong(filenameTokens[6]);
return RemoteStoreUtils.invertLong(filenameTokens[filenameTokens.length - 2]);
}

public static Tuple<String, String> getNodeIdByPrimaryTermAndGen(String filename) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,8 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
}
}

public Supplier<RepositoriesService> getRepositoriesService() {
return this.repositoriesService;
}

}
Loading

0 comments on commit aa4c11c

Please sign in to comment.