in src/main/java/com/netflix/bdp/s3mper/listing/ConsistentListingAspect.java [249:391]
public Object metastoreCheck(final ProceedingJoinPoint pjp) throws Throwable {
FileSystem fs = (FileSystem) pjp.getThis();
if(disabled) {
return pjp.proceed();
}
Configuration conf = ((FileSystem) pjp.getTarget()).getConf();
updateConfig(conf);
FileStatus [] s3Listing = (FileStatus[]) pjp.proceed();
FileStatus[] originalListing = null;
if (darkload) {
originalListing = s3Listing.clone();
}
List<Path> pathsToCheck = new ArrayList<Path>();
Object pathArg = pjp.getArgs()[0];
//Locate paths in the arguments
if(pathArg instanceof Path) {
pathsToCheck.add((Path)pathArg);
} else if (pathArg instanceof List) {
pathsToCheck.addAll((List)pathArg);
} else if (pathArg.getClass().isArray()) {
pathsToCheck.addAll(Arrays.asList((Path[]) pathArg));
}
//HACK: This is just to prevent the emr metrics from causing consisteny failures
for(StackTraceElement e : Thread.currentThread().getStackTrace()) {
if(e.getClassName().contains("emr.metrics")) {
log.debug("Ignoring EMR metrics listing for paths: " + pathsToCheck);
return s3Listing;
}
}
//END HACK
long recheck = recheckCount;
long delay = recheckPeriod;
try {
if (isTask(conf) && !checkTaskListings) {
log.info("Skipping consistency check for task listing");
return s3Listing;
}
if(isTask(conf)) {
recheck = taskRecheckCount;
delay = taskRecheckPeriod;
}
} catch (Exception e) {
log.error("Error checking for task side listing", e);
}
try {
List<FileInfo> metastoreListing = metastore.list(pathsToCheck);
List<Path> missingPaths = ImmutableList.of();
if (statOnMissingFile) {
missingPaths = checkListing(metastoreListing, s3Listing);
if (!missingPaths.isEmpty()) {
List<FileStatus> fullListing = new ArrayList<FileStatus>();
fullListing.addAll(Arrays.asList(s3Listing));
for (Path path : missingPaths) {
FileStatus status = fs.getFileStatus(path);
fullListing.add(status);
}
s3Listing = fullListing.toArray(new FileStatus[0]);
}
} else {
int checkAttempt;
for (checkAttempt = 0; checkAttempt <= recheck; checkAttempt++) {
missingPaths = checkListing(metastoreListing, s3Listing);
if (delistDeleteMarkedFiles) {
s3Listing = delistDeletedPaths(metastoreListing, s3Listing);
}
if (missingPaths.isEmpty()) {
break;
}
//Check if acceptable threshold of data has been met. This is a little
//ambigious becuase S3 could potentially have more files than the
//metastore (via out-of-band access) and throw off the ratio
if (fileThreshold < 1 && metastoreListing.size() > 0) {
float ratio = s3Listing.length / (float) metastoreListing.size();
if (ratio > fileThreshold) {
log.info(format("Proceeding with incomplete listing at ratio %f (%f as acceptable). Still missing paths: %s", ratio, fileThreshold, missingPaths));
missingPaths.clear();
break;
}
}
if (recheck == 0) {
break;
}
log.info(format("Rechecking consistency in %d (ms). Files missing %d. Missing paths: %s", delay, missingPaths.size(), missingPaths));
Thread.sleep(delay);
s3Listing = (FileStatus[]) pjp.proceed();
}
if (!missingPaths.isEmpty()) {
alertDispatcher.alert(missingPaths);
if (shouldFail(conf)) {
throw new S3ConsistencyException("Consistency check failed. See go/s3mper for details. Missing paths: " + missingPaths);
} else {
log.error("Consistency check failed. See go/s3mper for details. Missing paths: " + missingPaths);
}
} else {
if (checkAttempt > 0) {
log.info(format("Listing achieved consistency after %d attempts", checkAttempt));
alertDispatcher.recovered(pathsToCheck);
}
}
}
} catch (TimeoutException t) {
log.error("Timeout occurred listing metastore paths: " + pathsToCheck, t);
alertDispatcher.timeout("metastoreCheck", pathsToCheck);
if(failOnTimeout) {
throw t;
}
} catch (Exception e) {
log.error("Failed to list metastore for paths: " + pathsToCheck, e);
if(shouldFail(conf)) {
throw e;
}
}
return darkload ? originalListing : s3Listing;
}