in flink-filesystems/flink-fs-hadoop-shaded/src/main/java/org/apache/hadoop/conf/Configuration.java [2816:3042]
private Resource loadResource(Properties properties,
Resource wrapper, boolean quiet) {
String name = UNKNOWN_RESOURCE;
try {
Object resource = wrapper.getResource();
name = wrapper.getName();
XMLStreamReader2 reader = null;
boolean returnCachedProperties = false;
boolean isRestricted = wrapper.isParserRestricted();
if (resource instanceof URL) { // an URL resource
reader = (XMLStreamReader2)parse((URL)resource, isRestricted);
} else if (resource instanceof String) { // a CLASSPATH resource
URL url = getResource((String)resource);
reader = (XMLStreamReader2)parse(url, isRestricted);
} else if (resource instanceof Path) { // a file resource
// Can't use FileSystem API or we get an infinite loop
// since FileSystem uses Configuration API. Use java.io.File instead.
File file = new File(((Path)resource).toUri().getPath())
.getAbsoluteFile();
if (file.exists()) {
if (!quiet) {
LOG.debug("parsing File " + file);
}
reader = (XMLStreamReader2)parse(new BufferedInputStream(
new FileInputStream(file)), ((Path)resource).toString(),
isRestricted);
}
} else if (resource instanceof InputStream) {
reader = (XMLStreamReader2)parse((InputStream)resource, null,
isRestricted);
returnCachedProperties = true;
} else if (resource instanceof Properties) {
overlay(properties, (Properties)resource);
}
if (reader == null) {
if (quiet) {
return null;
}
throw new RuntimeException(resource + " not found");
}
Properties toAddTo = properties;
if(returnCachedProperties) {
toAddTo = new Properties();
}
DeprecationContext deprecations = deprecationContext.get();
StringBuilder token = new StringBuilder();
String confName = null;
String confValue = null;
String confInclude = null;
boolean confFinal = false;
boolean fallbackAllowed = false;
boolean fallbackEntered = false;
boolean parseToken = false;
LinkedList<String> confSource = new LinkedList<String>();
while (reader.hasNext()) {
switch (reader.next()) {
case XMLStreamConstants.START_ELEMENT:
switch (reader.getLocalName()) {
case "property":
confName = null;
confValue = null;
confFinal = false;
confSource.clear();
// First test for short format configuration
int attrCount = reader.getAttributeCount();
for (int i = 0; i < attrCount; i++) {
String propertyAttr = reader.getAttributeLocalName(i);
if ("name".equals(propertyAttr)) {
confName = StringInterner.weakIntern(
reader.getAttributeValue(i));
} else if ("value".equals(propertyAttr)) {
confValue = StringInterner.weakIntern(
reader.getAttributeValue(i));
} else if ("final".equals(propertyAttr)) {
confFinal = "true".equals(reader.getAttributeValue(i));
} else if ("source".equals(propertyAttr)) {
confSource.add(StringInterner.weakIntern(
reader.getAttributeValue(i)));
}
}
break;
case "name":
case "value":
case "final":
case "source":
parseToken = true;
token.setLength(0);
break;
case "include":
// Determine href for xi:include
confInclude = null;
attrCount = reader.getAttributeCount();
for (int i = 0; i < attrCount; i++) {
String attrName = reader.getAttributeLocalName(i);
if ("href".equals(attrName)) {
confInclude = reader.getAttributeValue(i);
}
}
if (confInclude == null) {
break;
}
if (isRestricted) {
throw new RuntimeException("Error parsing resource " + wrapper
+ ": XInclude is not supported for restricted resources");
}
// Determine if the included resource is a classpath resource
// otherwise fallback to a file resource
// xi:include are treated as inline and retain current source
URL include = getResource(confInclude);
if (include != null) {
Resource classpathResource = new Resource(include, name,
wrapper.isParserRestricted());
loadResource(properties, classpathResource, quiet);
} else {
URL url;
try {
url = new URL(confInclude);
url.openConnection().connect();
} catch (IOException ioe) {
File href = new File(confInclude);
if (!href.isAbsolute()) {
// Included resources are relative to the current resource
File baseFile = new File(name).getParentFile();
href = new File(baseFile, href.getPath());
}
if (!href.exists()) {
// Resource errors are non-fatal iff there is 1 xi:fallback
fallbackAllowed = true;
break;
}
url = href.toURI().toURL();
}
Resource uriResource = new Resource(url, name,
wrapper.isParserRestricted());
loadResource(properties, uriResource, quiet);
}
break;
case "fallback":
fallbackEntered = true;
break;
case "configuration":
break;
default:
break;
}
break;
case XMLStreamConstants.CHARACTERS:
if (parseToken) {
char[] text = reader.getTextCharacters();
token.append(text, reader.getTextStart(), reader.getTextLength());
}
break;
case XMLStreamConstants.END_ELEMENT:
switch (reader.getLocalName()) {
case "name":
if (token.length() > 0) {
confName = StringInterner.weakIntern(token.toString().trim());
}
break;
case "value":
if (token.length() > 0) {
confValue = StringInterner.weakIntern(token.toString());
}
break;
case "final":
confFinal = "true".equals(token.toString());
break;
case "source":
confSource.add(StringInterner.weakIntern(token.toString()));
break;
case "include":
if (fallbackAllowed && !fallbackEntered) {
throw new IOException("Fetch fail on include for '"
+ confInclude + "' with no fallback while loading '"
+ name + "'");
}
fallbackAllowed = false;
fallbackEntered = false;
break;
case "property":
if (confName == null || (!fallbackAllowed && fallbackEntered)) {
break;
}
confSource.add(name);
DeprecatedKeyInfo keyInfo =
deprecations.getDeprecatedKeyMap().get(confName);
if (keyInfo != null) {
keyInfo.clearAccessed();
for (String key : keyInfo.newKeys) {
// update new keys with deprecated key's value
loadProperty(toAddTo, name, key, confValue, confFinal,
confSource.toArray(new String[confSource.size()]));
}
} else {
loadProperty(toAddTo, name, confName, confValue, confFinal,
confSource.toArray(new String[confSource.size()]));
}
break;
default:
break;
}
default:
break;
}
}
reader.close();
if (returnCachedProperties) {
overlay(properties, toAddTo);
return new Resource(toAddTo, name, wrapper.isParserRestricted());
}
return null;
} catch (IOException e) {
LOG.error("error parsing conf " + name, e);
throw new RuntimeException(e);
} catch (XMLStreamException e) {
LOG.error("error parsing conf " + name, e);
throw new RuntimeException(e);
}
}