How to Write OpenSearch Bulk Operation Using Java SDK

Here we will learn the basic code to do bulk operation in OpenSearch using Java SDK.

We will need below dependencies in our Gradle project. This is part of build.gradle file.

	implementation 'org.opensearch.client:opensearch-rest-client: 2.3.0'
	implementation 'org.opensearch.client:opensearch-java:2.0.0'

For maven projects, we need to add the dependencies in pom.xml file.

As we know, OpenSearch provides bulk operation which supports index, delete, create & update operations in multiple indexes in a single HTTP call. Here we will see how to write bulk operation code in Java. Suppose we have below HTTP request which we want to transform to Java code:

POST _bulk
{ "index" : { "_index" : "index1", "_id" : "1" } }
{ "name" : "sample1"}
{ "delete" : { "_index" : "index2", "_id" : "2" } }
{ "create" : { "_index" : "index3", "_id" : "3" } }
{ "name" : "sample3"}
{ "update" : {"_index" : "index4", "_id" : "4"} }
{ "name" : "sample4"}

As you can see above, we have one bulk operation which is doing index, delete, create & update operations in a single HTTP call. And each operation is happening in a different index.

Here is the Java code that emulates the above HTTP request.

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.client.opensearch.core.bulk.CreateOperation;
import org.opensearch.client.opensearch.core.bulk.DeleteOperation;
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
import org.opensearch.client.opensearch.core.bulk.UpdateOperation;

public class OpenSearchServiceImpl {

	private OpenSearchClient client = OpenSearchClientFactory.getInstance();

	public void doBulkOperation() throws OpenSearchException, IOException {

		// index operation
		Map<String, Object> document1 = Map.of("name", "sample1");
		String documentId1 = "1";
		String index1 = "index1";
		IndexOperation<Map<String, Object>> indexOperation = new IndexOperation.Builder<Map<String, Object>>()
				.index(index1).id(documentId1).document(document1).build();

		// delete operation
		String documentId2 = "2";
		String index2 = "index2";
		DeleteOperation deleteOperation = new DeleteOperation.Builder().index(index2).id(documentId2).build();

		// create operation
		Map<String, Object> document3 = Map.of("name", "sample3");
		String documentId3 = "3";
		String index3 = "index3";
		CreateOperation<Map<String, Object>> createOperation = new CreateOperation.Builder<Map<String, Object>>()
				.index(index3).id(documentId3).document(document3).build();

		// update operation
		Map<String, Object> updatedDoc = Map.of("doc", Map.of("name", "sample4"));
		String documentId4 = "4";
		String index4 = "index4";
		UpdateOperation<Map<String, Object>> updateOperation = new UpdateOperation.Builder<Map<String, Object>>()
				.index(index4).id(documentId4).document(updatedDoc).build();

		BulkOperation bulkOperation1 = new BulkOperation.Builder().index(indexOperation).build();
		BulkOperation bulkOperation2 = new BulkOperation.Builder().delete(deleteOperation).build();
		BulkOperation bulkOperation3 = new BulkOperation.Builder().create(createOperation).build();
		BulkOperation bulkOperation4 = new BulkOperation.Builder().update(updateOperation).build();

		BulkRequest bulkRequest = new BulkRequest.Builder()
				.operations(List.of(bulkOperation1, bulkOperation2, bulkOperation3, bulkOperation4)).build();

		BulkResponse bulkResponse = client.bulk(bulkRequest);

		bulkResponse.items().forEach(item -> {
			// get individual item id
			String id = item.id();

			// get individual item HTTP response code
			int status = item.status();

			// get individual item error message
			String error = item.error() == null ? null : item.error().reason();

			System.out.println(id + ", status: " + status + ", error: " + error);
		});

	}

}

It follows the same logic of HTTP request. We are creating IndexOperation, DeleteOperation, CreateOperation & UpdateOperation objects. One BulkOperation object can take only one of them. So we are creating four BulkOperation objects to accommodate the four sub-operations.


Then we are supplying all these BulkOperation objects to BulkRequest builder & creating a BulkRequest instance. The OpenSearchClient takes BulkRequest object as method argument & makes the OpenSearch API call based on that. The client returns BulkResponse object from the method call.

BulkResponse object contains status of each individual operation in items list as shown in the code above. So it is easy to detect failed operations & the corresponding documents in case of partial failure.

Leave a Comment