diff --git a/CHANGELOG.md b/CHANGELOG.md index fedbc45aa..61fde03a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,4 +32,5 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Infrastructure ### Documentation ### Maintenance +- Add reindex integration tests for ingest processors ([#1075](https://github.com/opensearch-project/neural-search/pull/1075)) ### Refactoring diff --git a/src/test/java/org/opensearch/neuralsearch/processor/SparseEncodingProcessIT.java b/src/test/java/org/opensearch/neuralsearch/processor/SparseEncodingProcessIT.java index 83b680d19..b5e14a11f 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/SparseEncodingProcessIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/SparseEncodingProcessIT.java @@ -4,20 +4,11 @@ */ package org.opensearch.neuralsearch.processor; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.Map; -import org.apache.hc.core5.http.HttpHeaders; -import org.apache.hc.core5.http.io.entity.EntityUtils; -import org.apache.hc.core5.http.message.BasicHeader; import org.junit.Before; -import org.opensearch.client.Response; -import org.opensearch.common.xcontent.XContentHelper; -import org.opensearch.common.xcontent.XContentType; import org.opensearch.neuralsearch.BaseNeuralSearchIT; -import com.google.common.collect.ImmutableList; import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder; public class SparseEncodingProcessIT extends BaseNeuralSearchIT { @@ -26,6 +17,20 @@ public class SparseEncodingProcessIT extends BaseNeuralSearchIT { private static final String PIPELINE_NAME = "pipeline-sparse-encoding"; + private static final String INGEST_DOCUMENT = "{\n" + + " \"title\": \"This is a good day\",\n" + + " \"description\": \"daily logging\",\n" + + " \"favor_list\": [\n" + + " \"test\",\n" + + " \"hello\",\n" + + " \"mock\"\n" + + " ],\n" + + " \"favorites\": {\n" + + " \"game\": \"overwatch\",\n" + + " \"movie\": null\n" + + " }\n" + + "}\n"; + @Before public void setUp() throws Exception { super.setUp(); @@ -37,8 +42,8 @@ public void testSparseEncodingProcessor() throws Exception { try { modelId = prepareSparseEncodingModel(); createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.SPARSE_ENCODING); - createSparseEncodingIndex(); - ingestDocument(); + createIndexWithPipeline(INDEX_NAME, "SparseEncodingIndexMappings.json", PIPELINE_NAME); + ingestDocument(INDEX_NAME, INGEST_DOCUMENT); assertEquals(1, getDocCount(INDEX_NAME)); NeuralSparseQueryBuilder neuralSparseQueryBuilder = new NeuralSparseQueryBuilder(); @@ -58,8 +63,8 @@ public void testSparseEncodingProcessorWithPrune() throws Exception { try { modelId = prepareSparseEncodingModel(); createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.SPARSE_ENCODING_PRUNE); - createSparseEncodingIndex(); - ingestDocument(); + createIndexWithPipeline(INDEX_NAME, "SparseEncodingIndexMappings.json", PIPELINE_NAME); + ingestDocument(INDEX_NAME, INGEST_DOCUMENT); assertEquals(1, getDocCount(INDEX_NAME)); NeuralSparseQueryBuilder neuralSparseQueryBuilder = new NeuralSparseQueryBuilder(); @@ -74,42 +79,22 @@ public void testSparseEncodingProcessorWithPrune() throws Exception { } } - private void createSparseEncodingIndex() throws Exception { - createIndexWithConfiguration( - INDEX_NAME, - Files.readString(Path.of(classLoader.getResource("processor/SparseEncodingIndexMappings.json").toURI())), - PIPELINE_NAME - ); - } - - private void ingestDocument() throws Exception { - String ingestDocument = "{\n" - + " \"title\": \"This is a good day\",\n" - + " \"description\": \"daily logging\",\n" - + " \"favor_list\": [\n" - + " \"test\",\n" - + " \"hello\",\n" - + " \"mock\"\n" - + " ],\n" - + " \"favorites\": {\n" - + " \"game\": \"overwatch\",\n" - + " \"movie\": null\n" - + " }\n" - + "}\n"; - Response response = makeRequest( - client(), - "POST", - INDEX_NAME + "/_doc?refresh", - null, - toHttpEntity(ingestDocument), - ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana")) - ); - Map map = XContentHelper.convertToMap( - XContentType.JSON.xContent(), - EntityUtils.toString(response.getEntity()), - false - ); - assertEquals("created", map.get("result")); + public void testSparseEncodingProcessorWithReindex() throws Exception { + // create a simple index and indexing data into this index. + String fromIndexName = "test-reindex-from"; + createIndexWithConfiguration(fromIndexName, "{ \"settings\": { \"number_of_shards\": 1, \"number_of_replicas\": 0 } }", null); + ingestDocument(fromIndexName, "{ \"text\": \"hello world\" }"); + // create text embedding index for reindex + String modelId = null; + try { + modelId = prepareSparseEncodingModel(); + String toIndexName = "test-reindex-to"; + createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.SPARSE_ENCODING); + createIndexWithPipeline(toIndexName, "SparseEncodingIndexMappings.json", PIPELINE_NAME); + reindex(fromIndexName, toIndexName); + assertEquals(1, getDocCount(toIndexName)); + } finally { + wipeOfTestResources(fromIndexName, PIPELINE_NAME, modelId, null); + } } - } diff --git a/src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorIT.java b/src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorIT.java index de5ca820e..e3b79b94e 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorIT.java @@ -4,11 +4,7 @@ */ package org.opensearch.neuralsearch.processor; -import com.google.common.collect.ImmutableList; import lombok.SneakyThrows; -import org.apache.hc.core5.http.HttpHeaders; -import org.apache.hc.core5.http.io.entity.EntityUtils; -import org.apache.hc.core5.http.message.BasicHeader; import org.junit.Before; import java.net.URL; @@ -19,9 +15,6 @@ import java.util.Map; import java.util.Objects; -import org.opensearch.client.Response; -import org.opensearch.common.xcontent.XContentHelper; -import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.neuralsearch.BaseNeuralSearchIT; @@ -73,7 +66,9 @@ public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmStandardToken try { createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME); createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME); - ingestDocument(TEST_DOCUMENT); + + String document = getDocumentFromFilePath(TEST_DOCUMENT); + ingestDocument(INDEX_NAME, document); List expectedPassages = new ArrayList<>(); expectedPassages.add("This is an example document to be chunked. The document "); @@ -90,7 +85,9 @@ public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmLetterTokeniz try { createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_LETTER_TOKENIZER_NAME); createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_LETTER_TOKENIZER_NAME); - ingestDocument(TEST_DOCUMENT); + + String document = getDocumentFromFilePath(TEST_DOCUMENT); + ingestDocument(INDEX_NAME, document); List expectedPassages = new ArrayList<>(); expectedPassages.add("This is an example document to be chunked. The document "); @@ -107,7 +104,9 @@ public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmLowercaseToke try { createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_LOWERCASE_TOKENIZER_NAME); createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_LOWERCASE_TOKENIZER_NAME); - ingestDocument(TEST_DOCUMENT); + + String document = getDocumentFromFilePath(TEST_DOCUMENT); + ingestDocument(INDEX_NAME, document); List expectedPassages = new ArrayList<>(); expectedPassages.add("This is an example document to be chunked. The document "); @@ -124,7 +123,10 @@ public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmStandardToken try { createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME); createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME); - Exception exception = assertThrows(Exception.class, () -> ingestDocument(TEST_LONG_DOCUMENT)); + Exception exception = assertThrows(Exception.class, () -> { + String document = getDocumentFromFilePath(TEST_LONG_DOCUMENT); + ingestDocument(INDEX_NAME, document); + }); // max_token_count is 100 by index settings assert (exception.getMessage() .contains("The number of tokens produced by calling _analyze has exceeded the allowed maximum of [100].")); @@ -139,7 +141,9 @@ public void testTextChunkingProcessor_withDelimiterAlgorithm_successful() { try { createPipelineProcessor(DELIMITER_PIPELINE_NAME); createTextChunkingIndex(INDEX_NAME, DELIMITER_PIPELINE_NAME); - ingestDocument(TEST_DOCUMENT); + + String document = getDocumentFromFilePath(TEST_DOCUMENT); + ingestDocument(INDEX_NAME, document); List expectedPassages = new ArrayList<>(); expectedPassages.add("This is an example document to be chunked."); @@ -157,7 +161,9 @@ public void testTextChunkingProcessor_withCascadePipeline_successful() { try { createPipelineProcessor(CASCADE_PIPELINE_NAME); createTextChunkingIndex(INDEX_NAME, CASCADE_PIPELINE_NAME); - ingestDocument(TEST_DOCUMENT); + + String document = getDocumentFromFilePath(TEST_DOCUMENT); + ingestDocument(INDEX_NAME, document); List expectedPassages = new ArrayList<>(); expectedPassages.add("This is an example document to be chunked."); @@ -176,6 +182,23 @@ public void testTextChunkingProcessor_withCascadePipeline_successful() { } } + public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmStandardTokenizer_whenReindexingDocument_thenSuccessful() + throws Exception { + try { + String fromIndexName = "test-reindex-from"; + createIndexWithConfiguration(fromIndexName, "{ \"settings\": { \"number_of_shards\": 1, \"number_of_replicas\": 0 } }", null); + String document = getDocumentFromFilePath(TEST_DOCUMENT); + ingestDocument(fromIndexName, document); + + createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME); + createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME); + reindex(fromIndexName, INDEX_NAME); + assertEquals(1, getDocCount(INDEX_NAME)); + } finally { + wipeOfTestResources(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME, null, null); + } + } + private void validateIndexIngestResults(String indexName, String fieldName, Object expected) { assertEquals(1, getDocCount(indexName)); MatchAllQueryBuilder query = new MatchAllQueryBuilder(); @@ -205,23 +228,9 @@ private void createTextChunkingIndex(String indexName, String pipelineName) thro createIndexWithConfiguration(indexName, Files.readString(Path.of(indexSettingsURLPath.toURI())), pipelineName); } - private void ingestDocument(String documentPath) throws Exception { - URL documentURLPath = classLoader.getResource(documentPath); + private String getDocumentFromFilePath(String filePath) throws Exception { + URL documentURLPath = classLoader.getResource(filePath); Objects.requireNonNull(documentURLPath); - String document = Files.readString(Path.of(documentURLPath.toURI())); - Response response = makeRequest( - client(), - "POST", - INDEX_NAME + "/_doc?refresh", - null, - toHttpEntity(document), - ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana")) - ); - Map map = XContentHelper.convertToMap( - XContentType.JSON.xContent(), - EntityUtils.toString(response.getEntity()), - false - ); - assertEquals("created", map.get("result")); + return Files.readString(Path.of(documentURLPath.toURI())); } } diff --git a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java index d872aafcd..88d90ef35 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java @@ -17,7 +17,6 @@ import java.util.Optional; import java.util.Set; -import org.apache.commons.lang3.StringUtils; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.message.BasicHeader; @@ -70,8 +69,8 @@ public void testTextEmbeddingProcessor() throws Exception { modelId = uploadTextEmbeddingModel(); loadModel(modelId); createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.TEXT_EMBEDDING); - createTextEmbeddingIndex(); - ingestDocument(INGEST_DOC1, null); + createIndexWithPipeline(INDEX_NAME, "IndexMappings.json", PIPELINE_NAME); + ingestDocument(INDEX_NAME, INGEST_DOC1); assertEquals(1, getDocCount(INDEX_NAME)); } finally { wipeOfTestResources(INDEX_NAME, PIPELINE_NAME, modelId, null); @@ -84,12 +83,12 @@ public void testTextEmbeddingProcessor_batch() throws Exception { modelId = uploadTextEmbeddingModel(); loadModel(modelId); createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.TEXT_EMBEDDING, 2); - createTextEmbeddingIndex(); + createIndexWithPipeline(INDEX_NAME, "IndexMappings.json", PIPELINE_NAME); ingestBatchDocumentWithBulk("batch_", 2, Collections.emptySet(), Collections.emptySet()); assertEquals(2, getDocCount(INDEX_NAME)); - ingestDocument(String.format(LOCALE, INGEST_DOC1, "success"), "1"); - ingestDocument(String.format(LOCALE, INGEST_DOC2, "success"), "2"); + ingestDocument(INDEX_NAME, String.format(LOCALE, INGEST_DOC1, "success"), "1"); + ingestDocument(INDEX_NAME, String.format(LOCALE, INGEST_DOC2, "success"), "2"); assertEquals(getDocById(INDEX_NAME, "1").get("_source"), getDocById(INDEX_NAME, "batch_1").get("_source")); assertEquals(getDocById(INDEX_NAME, "2").get("_source"), getDocById(INDEX_NAME, "batch_2").get("_source")); @@ -104,9 +103,9 @@ public void testNestedFieldMapping_whenDocumentsIngested_thenSuccessful() throws modelId = uploadTextEmbeddingModel(); loadModel(modelId); createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.TEXT_EMBEDDING_WITH_NESTED_FIELDS_MAPPING); - createTextEmbeddingIndex(); - ingestDocument(INGEST_DOC3, "3"); - ingestDocument(INGEST_DOC4, "4"); + createIndexWithPipeline(INDEX_NAME, "IndexMappings.json", PIPELINE_NAME); + ingestDocument(INDEX_NAME, INGEST_DOC3, "3"); + ingestDocument(INDEX_NAME, INGEST_DOC4, "4"); assertDoc( (Map) getDocById(INDEX_NAME, "3").get("_source"), @@ -196,7 +195,7 @@ public void testTextEmbeddingProcessor_withBatchSizeInProcessor() throws Excepti Objects.requireNonNull(pipelineURLPath); String requestBody = Files.readString(Path.of(pipelineURLPath.toURI())); createPipelineProcessor(requestBody, PIPELINE_NAME, modelId, null); - createTextEmbeddingIndex(); + createIndexWithPipeline(INDEX_NAME, "IndexMappings.json", PIPELINE_NAME); int docCount = 5; ingestBatchDocumentWithBulk("batch_", docCount, Collections.emptySet(), Collections.emptySet()); assertEquals(5, getDocCount(INDEX_NAME)); @@ -204,7 +203,7 @@ public void testTextEmbeddingProcessor_withBatchSizeInProcessor() throws Excepti for (int i = 0; i < docCount; ++i) { String template = List.of(INGEST_DOC1, INGEST_DOC2).get(i % 2); String payload = String.format(LOCALE, template, "success"); - ingestDocument(payload, String.valueOf(i + 1)); + ingestDocument(INDEX_NAME, payload, String.valueOf(i + 1)); } for (int i = 0; i < docCount; ++i) { @@ -228,7 +227,7 @@ public void testTextEmbeddingProcessor_withFailureAndSkip() throws Exception { Objects.requireNonNull(pipelineURLPath); String requestBody = Files.readString(Path.of(pipelineURLPath.toURI())); createPipelineProcessor(requestBody, PIPELINE_NAME, modelId, null); - createTextEmbeddingIndex(); + createIndexWithPipeline(INDEX_NAME, "IndexMappings.json", PIPELINE_NAME); int docCount = 5; ingestBatchDocumentWithBulk("batch_", docCount, Set.of(0), Set.of(1)); assertEquals(3, getDocCount(INDEX_NAME)); @@ -236,7 +235,7 @@ public void testTextEmbeddingProcessor_withFailureAndSkip() throws Exception { for (int i = 2; i < docCount; ++i) { String template = List.of(INGEST_DOC1, INGEST_DOC2).get(i % 2); String payload = String.format(LOCALE, template, "success"); - ingestDocument(payload, String.valueOf(i + 1)); + ingestDocument(INDEX_NAME, payload, String.valueOf(i + 1)); } for (int i = 2; i < docCount; ++i) { @@ -258,8 +257,8 @@ public void testNestedFieldMapping_whenDocumentInListIngested_thenSuccessful() t modelId = uploadTextEmbeddingModel(); loadModel(modelId); createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.TEXT_EMBEDDING_WITH_NESTED_FIELDS_MAPPING); - createTextEmbeddingIndex(); - ingestDocument(INGEST_DOC5, "5"); + createIndexWithPipeline(INDEX_NAME, "IndexMappings.json", PIPELINE_NAME); + ingestDocument(INDEX_NAME, INGEST_DOC5, "5"); assertDocWithLevel2AsList((Map) getDocById(INDEX_NAME, "5").get("_source")); @@ -294,37 +293,6 @@ private String uploadTextEmbeddingModel() throws Exception { return registerModelGroupAndUploadModel(requestBody); } - private void createTextEmbeddingIndex() throws Exception { - createIndexWithConfiguration( - INDEX_NAME, - Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())), - PIPELINE_NAME - ); - } - - private void ingestDocument(String doc, String id) throws Exception { - String endpoint; - if (StringUtils.isEmpty(id)) { - endpoint = INDEX_NAME + "/_doc?refresh"; - } else { - endpoint = INDEX_NAME + "/_doc/" + id + "?refresh"; - } - Response response = makeRequest( - client(), - "POST", - endpoint, - null, - toHttpEntity(doc), - ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana")) - ); - Map map = XContentHelper.convertToMap( - XContentType.JSON.xContent(), - EntityUtils.toString(response.getEntity()), - false - ); - assertEquals("created", map.get("result")); - } - private void ingestBatchDocumentWithBulk(String idPrefix, int docCount, Set failedIds, Set droppedIds) throws Exception { StringBuilder payloadBuilder = new StringBuilder(); @@ -370,4 +338,24 @@ private void ingestBatchDocumentWithBulk(String idPrefix, int docCount, Set map = XContentHelper.convertToMap( - XContentType.JSON.xContent(), - EntityUtils.toString(response.getEntity()), - false - ); - assertEquals("created", map.get("result")); + public void testEmbeddingProcessor_whenReindexingDocument_thenSuccessful() throws Exception { + // create a simple index and indexing data into this index. + String fromIndexName = "test-reindex-from"; + createIndexWithConfiguration(fromIndexName, "{ \"settings\": { \"number_of_shards\": 1, \"number_of_replicas\": 0 } }", null); + ingestDocument(fromIndexName, "{ \"text\": \"hello world\" }"); + String modelId = null; + try { + modelId = uploadModel(); + loadModel(modelId); + String toIndexName = "test-reindex-to"; + createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.TEXT_IMAGE_EMBEDDING); + createIndexWithPipeline(toIndexName, "IndexMappings.json", PIPELINE_NAME); + reindex(fromIndexName, toIndexName); + assertEquals(1, getDocCount(toIndexName)); + } finally { + wipeOfTestResources(fromIndexName, PIPELINE_NAME, modelId, null); + } } } diff --git a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java index 781382e07..0956a9d0f 100644 --- a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java +++ b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java @@ -1330,6 +1330,87 @@ protected void addDocument( client().performRequest(request); } + /** + * Create an index with an pipeline with mappings from an index mapping file name + * @param indexName + * @param indexMappingFileName + * @param pipelineName + * @throws Exception + */ + protected void createIndexWithPipeline(String indexName, String indexMappingFileName, String pipelineName) throws Exception { + createIndexWithConfiguration( + indexName, + Files.readString(Path.of(classLoader.getResource("processor/" + indexMappingFileName).toURI())), + pipelineName + ); + } + + /** + * Ingest a document to index with optional id + * @param indexName name of the index + * @param ingestDocument + * @param id nullable optional id + * @throws Exception + */ + protected String ingestDocument(String indexName, String ingestDocument, String id) throws Exception { + String endpoint; + if (StringUtils.isEmpty(id)) { + endpoint = indexName + "/_doc?refresh"; + } else { + endpoint = indexName + "/_doc/" + id + "?refresh"; + } + Response response = makeRequest( + client(), + "POST", + endpoint, + null, + toHttpEntity(ingestDocument), + ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana")) + ); + Map map = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(response.getEntity()), + false + ); + + String result = (String) map.get("result"); + assertEquals("created", result); + return result; + } + + /** + * Ingest a document to index using auto generated id + * @param indexName name of the index + * @param ingestDocument + * @throws Exception + */ + protected String ingestDocument(String indexName, String ingestDocument) throws Exception { + return ingestDocument(indexName, ingestDocument, null); + } + + /** + * Reindex all documents from one index to another + * @param fromIndexName + * @param toIndexName + * @throws Exception + */ + protected void reindex(String fromIndexName, String toIndexName) throws Exception { + Response response = makeRequest( + client(), + "POST", + "/_reindex?refresh", + null, + toHttpEntity("{\"source\":{\"index\":\"" + fromIndexName + "\"},\"dest\":{\"index\":\"" + toIndexName + "\"}}"), + ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana")) + ); + Map map = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(response.getEntity()), + false + ); + assertEquals(0, ((List) map.get("failures")).size()); + } + /** * Get ingest pipeline * @param pipelineName of the ingest pipeline