From 540ff72e1455fbc41519428655dafa28ab2dabad Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Tue, 7 May 2024 12:12:44 -0700 Subject: [PATCH] Dynamic FieldType inference based on random sampling of documents Signed-off-by: Rishabh Maurya --- .../index/mapper/FieldTypeInference.java | 174 ++++++++++++++++++ .../index/mapper/FieldTypeInferenceTests.java | 141 ++++++++++++++ 2 files changed, 315 insertions(+) create mode 100644 server/src/main/java/org/opensearch/index/mapper/FieldTypeInference.java create mode 100644 server/src/test/java/org/opensearch/index/mapper/FieldTypeInferenceTests.java diff --git a/server/src/main/java/org/opensearch/index/mapper/FieldTypeInference.java b/server/src/main/java/org/opensearch/index/mapper/FieldTypeInference.java new file mode 100644 index 0000000000000..f649e3c1ca251 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/mapper/FieldTypeInference.java @@ -0,0 +1,174 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.mapper; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.search.lookup.SourceLookup; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; + +/** + * This class performs type inference by analyzing the _source documents. It uses a random sample of documents to infer the field type, similar to dynamic mapping type guessing logic. + * Unlike guessing based on the first document, where field could be missing, this method generates a random sample to make a more accurate inference. + * This approach is especially useful for handling missing fields, which is common in nested fields within derived fields of object types. + * + *

The sample size should be chosen carefully to ensure a high probability of selecting at least one document where the field is present. + * However, it's essential to strike a balance because a large sample size can lead to performance issues since each sample document's _source field is loaded and examined until the field is found. + * + *

Determining the sample size (S) is akin to deciding how many balls to draw from a bin, ensuring a high probability ((>=P)) of drawing at least one green ball (documents with the field) from a mixture of R red balls (documents without the field) and G green balls: + *

{@code
+ * P >= 1 - C(R, S) / C(R + G, S)
+ * }
+ * Here, C() represents the binomial coefficient. + * For a high confidence level, we aim for P >= 0.95. For example, with 10^7 documents where the field is present in 2% of them, the sample size S should be around 149 to achieve a probability of 0.95. + */ +public class FieldTypeInference { + private final IndexReader indexReader; + private final String indexName; + private final MapperService mapperService; + // TODO expose using a index setting + private int sampleSize; + private static final int DEFAULT_SAMPLE_SIZE = 150; + private static final int MAX_SAMPLE_SIZE_ALLOWED = 1000; + + public FieldTypeInference(String indexName, MapperService mapperService, IndexReader indexReader) { + this.indexName = indexName; + this.mapperService = mapperService; + this.indexReader = indexReader; + this.sampleSize = DEFAULT_SAMPLE_SIZE; + } + + public void setSampleSize(int sampleSize) { + this.sampleSize = Math.min(sampleSize, MAX_SAMPLE_SIZE_ALLOWED); + } + + public int getSampleSize() { + return sampleSize; + } + + public Mapper infer(ValueFetcher valueFetcher) throws IOException { + RandomSourceValuesGenerator valuesGenerator = new RandomSourceValuesGenerator(sampleSize, indexReader, valueFetcher); + Mapper inferredMapper = null; + while (inferredMapper == null && valuesGenerator.hasNext()) { + List values = valuesGenerator.next(); + if (values == null || values.isEmpty()) { + continue; + } + // always use first value in case of multi value field to infer type + inferredMapper = inferTypeFromObject(values.get(0)); + } + return inferredMapper; + } + + private Mapper inferTypeFromObject(Object o) throws IOException { + if (o == null) { + return null; + } + DocumentMapper mapper = mapperService.documentMapper(); + XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("field", o).endObject(); + BytesReference bytesReference = BytesReference.bytes(builder); + SourceToParse sourceToParse = new SourceToParse(indexName, "_id", bytesReference, JsonXContent.jsonXContent.mediaType()); + ParsedDocument parsedDocument = mapper.parse(sourceToParse); + Mapping mapping = parsedDocument.dynamicMappingsUpdate(); + return mapping.root.getMapper("field"); + } + + private static class RandomSourceValuesGenerator implements Iterator> { + private final ValueFetcher valueFetcher; + private final IndexReader indexReader; + private final SourceLookup sourceLookup; + private final int numLeaves; + private final int[] docs; + private int iter; + private int offset; + private LeafReaderContext leafReaderContext; + private int leaf; + private final int MAX_ATTEMPTS_TO_GENERATE_RANDOM_SAMPLES = 10000; + + public RandomSourceValuesGenerator(int sampleSize, IndexReader indexReader, ValueFetcher valueFetcher) { + this.valueFetcher = valueFetcher; + this.indexReader = indexReader; + sampleSize = Math.min(sampleSize, indexReader.numDocs()); + this.docs = getSortedRandomNum( + sampleSize, + indexReader.numDocs(), + Math.max(sampleSize, MAX_ATTEMPTS_TO_GENERATE_RANDOM_SAMPLES) + ); + this.iter = 0; + this.offset = 0; + this.leaf = 0; + this.numLeaves = indexReader.leaves().size(); + this.sourceLookup = new SourceLookup(); + this.leafReaderContext = indexReader.leaves().get(leaf); + valueFetcher.setNextReader(leafReaderContext); + } + + @Override + public boolean hasNext() { + return iter < docs.length && leaf < numLeaves; + } + + /** + * Ensure hasNext() is called before calling next() + */ + @Override + public List next() { + int docID = docs[iter] - offset; + if (docID >= leafReaderContext.reader().numDocs()) { + setNextLeaf(); + return next(); + } + // deleted docs are getting used to infer type, which should be okay? + sourceLookup.setSegmentAndDocument(leafReaderContext, docID); + try { + iter++; + return valueFetcher.fetchValues(sourceLookup); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void setNextLeaf() { + offset += leafReaderContext.reader().numDocs(); + leaf++; + if (leaf < numLeaves) { + leafReaderContext = indexReader.leaves().get(leaf); + valueFetcher.setNextReader(leafReaderContext); + } + } + + private static int[] getSortedRandomNum(int sampleSize, int upperBound, int attempts) { + Set generatedNumbers = new HashSet<>(); + Random random = new Random(); + int itr = 0; + while (generatedNumbers.size() < sampleSize && itr++ < attempts) { + int randomNumber = random.nextInt(upperBound); + generatedNumbers.add(randomNumber); + } + int[] result = new int[generatedNumbers.size()]; + int i = 0; + for (int number : generatedNumbers) { + result[i++] = number; + } + Arrays.sort(result); + return result; + } + } +} diff --git a/server/src/test/java/org/opensearch/index/mapper/FieldTypeInferenceTests.java b/server/src/test/java/org/opensearch/index/mapper/FieldTypeInferenceTests.java new file mode 100644 index 0000000000000..3762663c1fb01 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/mapper/FieldTypeInferenceTests.java @@ -0,0 +1,141 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.mapper; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.store.Directory; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.core.index.Index; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.search.lookup.SourceLookup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.when; + +public class FieldTypeInferenceTests extends MapperServiceTestCase { + + private static final Map> documentMap; + static { + List listWithNull = new ArrayList<>(); + listWithNull.add(null); + documentMap = new HashMap<>(); + documentMap.put("text_field", List.of("The quick brown fox jumps over the lazy dog.")); + documentMap.put("int_field", List.of(789)); + documentMap.put("float_field", List.of(123.45)); + documentMap.put("date_field_1", List.of("2024-05-12T15:45:00Z")); + documentMap.put("date_field_2", List.of("2024-05-12")); + documentMap.put("boolean_field", List.of(true)); + documentMap.put("null_field", listWithNull); + documentMap.put("array_field_int", List.of(100, 200, 300, 400, 500)); + documentMap.put("array_field_text", List.of("100", "200")); + documentMap.put("object_type", List.of(Map.of("foo", Map.of("bar", 10)))); + } + + public void testJsonSupportedTypes() throws IOException { + MapperService mapperService = createMapperService(topMapping(b -> {})); + QueryShardContext queryShardContext = createQueryShardContext(mapperService); + when(queryShardContext.index()).thenReturn(new Index("test_index", "uuid")); + int totalDocs = 10000; + int docsPerLeafCount = 1000; + int leaves = 0; + try (Directory dir = newDirectory()) { + IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); + Document d = new Document(); + for (int i = 0; i < totalDocs; i++) { + iw.addDocument(d); + if ((i + 1) % docsPerLeafCount == 0) { + iw.commit(); + leaves++; + } + } + try (IndexReader reader = DirectoryReader.open(iw)) { + iw.close(); + FieldTypeInference typeInference = new FieldTypeInference("test_index", queryShardContext.getMapperService(), reader); + String[] fieldName = { "text_field" }; + Mapper mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("text", mapper.typeName()); + + fieldName[0] = "int_field"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("long", mapper.typeName()); + + fieldName[0] = "float_field"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("float", mapper.typeName()); + + fieldName[0] = "date_field_1"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("date", mapper.typeName()); + + fieldName[0] = "date_field_2"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("date", mapper.typeName()); + + fieldName[0] = "boolean_field"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("boolean", mapper.typeName()); + + fieldName[0] = "array_field_int"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("long", mapper.typeName()); + + fieldName[0] = "array_field_text"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("text", mapper.typeName()); + + fieldName[0] = "object_type"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("object", mapper.typeName()); + + fieldName[0] = "null_field"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertNull(mapper); + + // If field is missing ensure that sample docIDs generated for inference are ordered and are in bounds + fieldName[0] = "missing_field"; + List> docsEvaluated = new ArrayList<>(); + int[] totalDocsEvaluated = { 0 }; + typeInference.setSampleSize(50); + mapper = typeInference.infer(new ValueFetcher() { + @Override + public List fetchValues(SourceLookup lookup) throws IOException { + docsEvaluated.get(docsEvaluated.size() - 1).add(lookup.docId()); + totalDocsEvaluated[0]++; + return documentMap.get(fieldName[0]); + } + + @Override + public void setNextReader(LeafReaderContext leafReaderContext) { + docsEvaluated.add(new ArrayList<>()); + } + }); + assertNull(mapper); + assertEquals(typeInference.getSampleSize(), totalDocsEvaluated[0]); + for (List docsPerLeaf : docsEvaluated) { + for (int j = 0; j < docsPerLeaf.size() - 1; j++) { + assertTrue(docsPerLeaf.get(j) < docsPerLeaf.get(j + 1)); + } + if (!docsPerLeaf.isEmpty()) { + assertTrue(docsPerLeaf.get(0) >= 0 && docsPerLeaf.get(docsPerLeaf.size() - 1) < docsPerLeafCount); + } + } + } + } + } +}