public Object call()

in src/main/java/com/netflix/bdp/s3mper/metastore/impl/PathScannerTask.java [57:113]


    public Object call() throws Exception {
        running = true;
        
        long deleteEpoch = System.currentTimeMillis() - age;
        
        Map<String, Condition> filter = new HashMap<String, Condition>();
        
        AttributeValue value = new AttributeValue();
        value.setN(deleteEpoch + "");
        
        Condition c = new Condition();
        c.setComparisonOperator(ComparisonOperator.LT);
        c.setAttributeValueList(Collections.singletonList(value));
        
        filter.put("epoch", c);
        
        ScanRequest scan = new ScanRequest(MetastoreJanitor.tableName);
        scan.setScanFilter(filter);
        scan.setLimit( (int) limiter.getRate());
        
        ScanResult result;
        
        int scanTotal = 0;
        int matched = 0;
        
        do {
            //Can't set a hard limit on the queue since paths can be resubmitted by delete task
            synchronized (deleteQueue) {
                while (deleteQueue.size() >= queueSize) {
                    deleteQueue.wait();
                }
            }
            
            if(!running) {
                break;
            }
            
            result = db.scan(scan);
            scanTotal += result.getScannedCount();
            matched += result.getCount();
            
            log.info(String.format("Total scanned: %d, matched: %d, added: %d, queue size: %d, consumed capacity: %f, max rate: %f", scanTotal, matched, result.getCount(), deleteQueue.size(), result.getConsumedCapacityUnits(), limiter.getRate()));
            
            for (Map<String, AttributeValue> i : result.getItems()) {
                if (!i.containsKey("epoch")) {
                    continue;
                }
                deleteQueue.put(new Key(i.get(DynamoDBMetastore.HASH_KEY), i.get(DynamoDBMetastore.RANGE_KEY)));
            }
            
            limiter.acquire(result.getConsumedCapacityUnits().intValue());
            
            scan.setExclusiveStartKey(result.getLastEvaluatedKey());
        } while (running && result.getLastEvaluatedKey() != null);
        
        return Boolean.TRUE;
    }