in src/main/java/com/netflix/bdp/s3mper/metastore/impl/DeleteWriterTask.java [60:119]
public Object call() throws Exception {
running = true;
Set<Key> keys = new HashSet<Key>(batchLimit);
List<WriteRequest> batch = new ArrayList<WriteRequest>(batchLimit);
try {
while (!deleteQueue.isEmpty() || running) {
deleteQueue.drainTo(keys, batchLimit);
synchronized (deleteQueue) {
deleteQueue.notifyAll();
}
if (keys.isEmpty()) {
Thread.sleep(500);
continue;
}
for (Key key : keys) {
batch.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(key)));
}
BatchWriteItemRequest batchRequest = new BatchWriteItemRequest();
Map<String, List<WriteRequest>> itemRequests = new HashMap<String, List<WriteRequest>>();
itemRequests.put(MetastoreJanitor.tableName, batch);
batchRequest.setRequestItems(itemRequests);
BatchWriteItemResult result = db.batchWriteItem(batchRequest);
//Resubmit failed writes
for (Map.Entry<String, List<WriteRequest>> e : result.getUnprocessedItems().entrySet()) {
for (WriteRequest w : e.getValue()) {
deleteQueue.put(w.getDeleteRequest().getKey());
}
}
//Drain capacity
for (Map.Entry<String, BatchWriteResponse> e : result.getResponses().entrySet()) {
limiter.acquire(e.getValue().getConsumedCapacityUnits().intValue());
}
if(log.isDebugEnabled()) {
log.debug(String.format("delete: %2d, queue_size: %5d, max_rate: %4.1f", keys.size(), deleteQueue.size(), limiter.getRate()));
}
keys.clear();
batch.clear();
}
} catch (InterruptedException interruptedException) {
log.error("Interrupted", interruptedException);
} catch (AmazonClientException amazonClientException) {
log.error("", amazonClientException);
}
log.info("Delete task terminating");
return Boolean.FALSE;
}