public Object call()

in src/main/java/com/netflix/bdp/s3mper/metastore/impl/DeleteWriterTask.java [60:119]


    public Object call() throws Exception {
        running = true;
        
        Set<Key> keys = new HashSet<Key>(batchLimit);
        List<WriteRequest> batch = new ArrayList<WriteRequest>(batchLimit);
        
        try {
            
            while (!deleteQueue.isEmpty() || running) {
                
                deleteQueue.drainTo(keys, batchLimit);
                
                synchronized (deleteQueue) {
                    deleteQueue.notifyAll();
                }
                
                if (keys.isEmpty()) {
                    Thread.sleep(500);
                    continue;
                }
                
                for (Key key : keys) {
                    batch.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(key)));
                }
                
                BatchWriteItemRequest batchRequest = new BatchWriteItemRequest();
                Map<String, List<WriteRequest>> itemRequests = new HashMap<String, List<WriteRequest>>();
                itemRequests.put(MetastoreJanitor.tableName, batch);
                batchRequest.setRequestItems(itemRequests);
                
                BatchWriteItemResult result = db.batchWriteItem(batchRequest);
                
                //Resubmit failed writes
                for (Map.Entry<String, List<WriteRequest>> e : result.getUnprocessedItems().entrySet()) {
                    for (WriteRequest w : e.getValue()) {
                        deleteQueue.put(w.getDeleteRequest().getKey());
                    }
                }
                
                //Drain capacity
                for (Map.Entry<String, BatchWriteResponse> e : result.getResponses().entrySet()) {
                    limiter.acquire(e.getValue().getConsumedCapacityUnits().intValue());
                }
                
                if(log.isDebugEnabled()) {
                    log.debug(String.format("delete: %2d, queue_size: %5d, max_rate: %4.1f", keys.size(), deleteQueue.size(), limiter.getRate()));
                }
                
                keys.clear();
                batch.clear();
            }
        } catch (InterruptedException interruptedException) {
            log.error("Interrupted", interruptedException);
        } catch (AmazonClientException amazonClientException) {
            log.error("", amazonClientException);
        }
        
        log.info("Delete task terminating");
        return Boolean.FALSE;
    }