public Object metastoreCheck()

in src/main/java/com/netflix/bdp/s3mper/listing/ConsistentListingAspect.java [249:391]


    public Object metastoreCheck(final ProceedingJoinPoint pjp) throws Throwable {

        FileSystem fs = (FileSystem) pjp.getThis();

        if(disabled) {
            return pjp.proceed();
        }
        
        Configuration conf = ((FileSystem) pjp.getTarget()).getConf();
        updateConfig(conf);
        
        FileStatus [] s3Listing = (FileStatus[]) pjp.proceed();
        FileStatus[] originalListing = null;
        if (darkload) {
            originalListing = s3Listing.clone();
        }
        
        List<Path> pathsToCheck = new ArrayList<Path>();
        
        Object pathArg = pjp.getArgs()[0];
        
        //Locate paths in the arguments
        if(pathArg instanceof Path) {
            pathsToCheck.add((Path)pathArg);
        } else if (pathArg instanceof List) {
            pathsToCheck.addAll((List)pathArg);
        } else if (pathArg.getClass().isArray()) {
            pathsToCheck.addAll(Arrays.asList((Path[]) pathArg));
        }
        
        //HACK: This is just to prevent the emr metrics from causing consisteny failures
        for(StackTraceElement e : Thread.currentThread().getStackTrace()) {
            if(e.getClassName().contains("emr.metrics")) {
                log.debug("Ignoring EMR metrics listing for paths: " + pathsToCheck);
                return s3Listing;
            }
        }
        //END HACK
        
        long recheck = recheckCount;
        long delay = recheckPeriod;
        
        try {
            if (isTask(conf) && !checkTaskListings) {
                log.info("Skipping consistency check for task listing");
                return s3Listing;
            }
            
            if(isTask(conf)) {
                recheck = taskRecheckCount;
                delay = taskRecheckPeriod;
            }
        } catch (Exception e) {
            log.error("Error checking for task side listing", e);
        }
        
        try {
            List<FileInfo> metastoreListing = metastore.list(pathsToCheck);
            
            List<Path> missingPaths = ImmutableList.of();
            if (statOnMissingFile) {
                missingPaths = checkListing(metastoreListing, s3Listing);

                if (!missingPaths.isEmpty()) {
                    List<FileStatus> fullListing = new ArrayList<FileStatus>();
                    fullListing.addAll(Arrays.asList(s3Listing));
                    for (Path path : missingPaths) {
                        FileStatus status = fs.getFileStatus(path);
                        fullListing.add(status);
                    }
                    s3Listing = fullListing.toArray(new FileStatus[0]);
                }
            } else {

                int checkAttempt;

                for (checkAttempt = 0; checkAttempt <= recheck; checkAttempt++) {
                    missingPaths = checkListing(metastoreListing, s3Listing);

                    if (delistDeleteMarkedFiles) {
                        s3Listing = delistDeletedPaths(metastoreListing, s3Listing);
                    }

                    if (missingPaths.isEmpty()) {
                        break;
                    }

                    //Check if acceptable threshold of data has been met.  This is a little
                    //ambigious becuase S3 could potentially have more files than the
                    //metastore (via out-of-band access) and throw off the ratio
                    if (fileThreshold < 1 && metastoreListing.size() > 0) {
                        float ratio = s3Listing.length / (float) metastoreListing.size();

                        if (ratio > fileThreshold) {
                            log.info(format("Proceeding with incomplete listing at ratio %f (%f as acceptable). Still missing paths: %s", ratio, fileThreshold, missingPaths));

                            missingPaths.clear();
                            break;
                        }
                    }

                    if (recheck == 0) {
                        break;
                    }

                    log.info(format("Rechecking consistency in %d (ms).  Files missing %d. Missing paths: %s", delay, missingPaths.size(), missingPaths));
                    Thread.sleep(delay);
                    s3Listing = (FileStatus[]) pjp.proceed();
                }

                if (!missingPaths.isEmpty()) {
                    alertDispatcher.alert(missingPaths);

                    if (shouldFail(conf)) {
                        throw new S3ConsistencyException("Consistency check failed. See go/s3mper for details. Missing paths: " + missingPaths);
                    } else {
                        log.error("Consistency check failed.  See go/s3mper for details. Missing paths: " + missingPaths);
                    }
                } else {
                    if (checkAttempt > 0) {
                        log.info(format("Listing achieved consistency after %d attempts", checkAttempt));
                        alertDispatcher.recovered(pathsToCheck);
                    }
                }
            }
        } catch (TimeoutException t) {
            log.error("Timeout occurred listing metastore paths: " + pathsToCheck, t);
            
            alertDispatcher.timeout("metastoreCheck", pathsToCheck);
            
            if(failOnTimeout) {
                throw t;
            }
        } catch (Exception e) {
            log.error("Failed to list metastore for paths: " + pathsToCheck, e);
            
            if(shouldFail(conf)) {
                throw e;
            }
        }

        return darkload ? originalListing : s3Listing;
    }