Thursday, November 30, 2017
Cleaning older documents from Elasticsearch
Cleaning older documents from Elasticsearch

The best solution, specially for time-based data such as logging is to clean the older documents which have no effective use from Elasticsearch storage. Elasticsearch guides recommends to create multiple indices, one per each time frame (e.g. seperate index for each month) and then delete the older indices completely. But in some use cases of Elasticsearch, this multi-indices approach makes data retrieval activities such as aggregations difficult, compared to a single index approach. Therefore this post will discuss how to clean documents older than a given date from a single index.
This cleaning process will be performed from the following 3 main steps.
- Collect the list of IDs of expired documents
- Perform a bulk delete of those documents
- Optimize the index
1. Collect the list of IDs of expired documents
The first step is to get the list of IDs of the expired documents to be deleted. It should be noted that to use this method, the elasticsearch records should have a field that represents the time of publish. For this tutorial lets assume that field name as "publishTime" while assuming the elasticsearch index as "my_index", record type as "my_record_type" and expire time as 180 days. Well also assume that the host name of elasticsearch server as "localhost" and port as "9300". First of all well create a class and assign these values to constants.package com.sample.es.udith;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
public class ESManager {
private final String ES_INDEX;
private final String ES_TYPE;
private final String ES_TIME_FIELD;
private final int EXPIRE_DAYS;
private final TransportClient elasticsearch;
public ESManager() {
ES_INDEX = "my_index";
ES_TYPE = "my_record_type";
ES_TIME_FIELD = "publishTime";
EXPIRE_DAYS = 180;
elasticsearch = new TransportClient()
.addTransportAddress(new InetSocketTransportAddress("localhost", 9300));
}
}
Then well add a method that will query the documents from Elasticsearch filtered from their publishTime.
public void cleanESDocs() {
final Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DATE, -EXPIRE_DAYS);
SearchResponse scrollResp = elasticsearch.prepareSearch(ES_INDEX).setTypes(ES_TYPE)
.setSearchType(SearchType.SCAN)
.setScroll(new TimeValue(600000))
.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(),
FilterBuilders.andFilter(
FilterBuilders.rangeFilter(ES_TIME_FIELD).to(calendar.getTimeInMillis())
)))
.setNoFields()
.setSize(100).execute().actionGet();
while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
System.out.println(hit.getId());
}
scrollResp = elasticsearch.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000))
.execute().actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
}
In this method we have used Scan & Scroll API to avoid issues caused when retrieving larger number of documents. We filter all the documents that have publishTimes older than 180 days and print their IDs to the system out. Also for performance concerns, we have invoked the setNoFields() method on searchResponse so that it does not contain the source fields which are unnecessary for our task.2. Perform a bulk delete of those documents
As the second step, well modify the above method to use Bulk API, that will delete the identified documents in the previous step.
public void cleanESDocs() {
final Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DATE, -EXPIRE_DAYS);
BulkRequestBuilder bulkRequest = elasticsearch.prepareBulk();
SearchResponse scrollResp = elasticsearch.prepareSearch(ES_INDEX).setTypes(ES_TYPE)
.setSearchType(SearchType.SCAN)
.setScroll(new TimeValue(600000))
.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(),
FilterBuilders.andFilter(
FilterBuilders.rangeFilter(ES_TIME_FIELD).to(calendar.getTimeInMillis())
)))
.setNoFields()
.setSize(100).execute().actionGet();
while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
bulkRequest.add(elasticsearch.prepareDelete()
.setIndex(ES_INDEX)
.setType(ES_TYPE)
.setId(hit.getId()));
}
scrollResp = elasticsearch.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000))
.execute().actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
if ((bulkRequest != null) && (!bulkRequest.request().requests().isEmpty())) {
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
System.out.println("Elasticsearch cleaning failed");
} else {
System.out.println("Elasticsearch cleaning finished deleting " +
bulkResponse.getItems().length + " documents");
}
} else {
System.out.println("Elasticsearch cleaning finished without deleting any documents");
}
}
3. Optimize the index
Now we have deleted the expired documents from Elasticsearch. This will only mark those documents as deleted and will omit from queries. But to remove those documents from disk in order to free disk space, we need to run a index optimization operation after the deletion operation. For that well add the following index optimization method and call it after the deletion operation.
private void optimizeIndex() {
OptimizeResponse optimizeResponse = elasticsearch.admin().indices()
.prepareOptimize(ES_INDEX)
.setFlush(true)
.setOnlyExpungeDeletes(false)
.execute().actionGet();
System.out.println("Elasticsearch index optimization finished with " +
optimizeResponse.getSuccessfulShards() + " successful and " +
optimizeResponse.getFailedShards() + " failed shards out of " +
optimizeResponse.getTotalShards() + " total shards");
}
This will delete all the expired documents and free up the disk space. You can view the statistics about a certain index using Indices Stats API.The complete Java class for this tutorial can be found here.
Labels:
cleaning,
documents,
elasticsearch,
from,
older
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment