public Object call()

in src/main/java/com/netflix/bdp/s3mper/metastore/impl/TimeseriesScannerTask.java [56:117]


    public Object call() throws Exception {
        running = true;
        
        long deleteEpoch = System.currentTimeMillis() - age;
        
        QueryRequest query = new QueryRequest();
        query.setTableName(MetastoreJanitor.tableName);
        query.setHashKeyValue(new AttributeValue().withS(DynamoDBMetastore.TIMESERIES_KEY));
        query.setLimit(queueSize/2);
        
        QueryResult result;
        
        int scanCount = 0;
        int deleteCount = 0;
        
        do {
            //Can't set a hard limit on the queue since paths can be resubmitted by delete task
            //which can cause a deadlock.
            synchronized (deleteQueue) {
                while (deleteQueue.size() >= queueSize) {
                    deleteQueue.wait();
                }
            }
            
            if(!running) {
                break;
            }
                
            result = db.query(query);
            
            scanCount += result.getCount();
            
            long epoch = deleteEpoch;
            
            for (Map<String, AttributeValue> i : result.getItems()) {
                epoch = Long.parseLong(i.get(DynamoDBMetastore.RANGE_KEY).getS().split("-")[0]);
                
                if (epoch >= deleteEpoch) {
                    log.info("Timeseries scan complete.  Exiting.");
                    running = false;
                    break;
                }
                
                deleteCount += 2;
                
                deleteQueue.put(new Key(i.get(DynamoDBMetastore.HASH_KEY), i.get(DynamoDBMetastore.RANGE_KEY)));
                deleteQueue.put(new Key(i.get(DynamoDBMetastore.LINK_HASH_KEY), i.get(DynamoDBMetastore.LINK_RANGE_KEY)));
            }
            
            if(scanCount % reportInterval == 0) {
                log.info(format("scanned: %d, added: %d, queue_size: %d, current_date: %s", scanCount, deleteCount, deleteQueue.size(), new Date(epoch)));
            }
            
            limiter.acquire(result.getConsumedCapacityUnits().intValue());
            
            query.setExclusiveStartKey(result.getLastEvaluatedKey());
        } while (running && result.getLastEvaluatedKey() != null);
        
        log.info(format("Scan Complete.%nEntries Scanned: %d%nEntries Deleted: %d", scanCount, deleteCount));
        
        return Boolean.TRUE;
    }