in sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java [344:461]
private long getFirstOccurenceOfRecordElement(
ReadableByteChannel channel, ByteArrayOutputStream preambleByteBuffer) throws IOException {
int byteIndexInRecordElementToMatch = 0;
// Index of the byte in the string "<recordElement" to be matched
// against the current byte from the stream.
boolean recordStartBytesMatched = false; // "<recordElement" matched. Still have to match the
// next character to confirm if this is a positive match.
boolean fullyMatched = false; // If true, record element was fully matched.
// This gives the offset of the byte currently being read. We do a '-1' here since we
// increment this value at the beginning of the while loop below.
long offsetInFileOfCurrentByte = getCurrentSource().getStartOffset() - 1;
long startingOffsetInFileOfCurrentMatch = -1;
// If this is non-negative, currently there is a match in progress and this value gives the
// starting offset of the match currently being conducted.
boolean matchStarted = false; // If true, a match is currently in progress.
// These two values are used to determine the character immediately following a match for
// "<recordElement". Please see the comment for 'MAX_CHAR_BYTES' above.
byte[] charBytes = new byte[MAX_CHAR_BYTES];
int charBytesFound = 0;
ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
byte[] recordStartBytes =
("<" + getCurrentSource().recordElement).getBytes(StandardCharsets.UTF_8);
outer: while (channel.read(buf) > 0) {
buf.flip();
while (buf.hasRemaining()) {
offsetInFileOfCurrentByte++;
byte b = buf.get();
boolean reset = false;
if (recordStartBytesMatched) {
// We already matched "<recordElement" reading the next character to determine if this
// is a positive match for a new record.
charBytes[charBytesFound] = b;
charBytesFound++;
Character c = null;
if (charBytesFound == charBytes.length) {
CharBuffer charBuf = CharBuffer.allocate(1);
InputStream charBufStream = new ByteArrayInputStream(charBytes);
java.io.Reader reader =
new InputStreamReader(charBufStream, StandardCharsets.UTF_8);
int read = reader.read();
if (read <= 0) {
return -1;
}
charBuf.flip();
c = (char) read;
} else {
continue;
}
// Record start may be of following forms
// * "<recordElement<whitespace>..."
// * "<recordElement>..."
// * "<recordElement/..."
if (Character.isWhitespace(c) || c == '>' || c == '/') {
fullyMatched = true;
// Add the recordStartBytes and charBytes to preambleByteBuffer since these were
// already read from the channel.
preambleByteBuffer.write(recordStartBytes);
preambleByteBuffer.write(charBytes);
// Also add the rest of the current buffer to preambleByteBuffer.
while (buf.hasRemaining()) {
preambleByteBuffer.write(buf.get());
}
break outer;
} else {
// Matching was unsuccessful. Reset the buffer to include bytes read for the char.
ByteBuffer newbuf = ByteBuffer.allocate(BUF_SIZE);
newbuf.put(charBytes);
offsetInFileOfCurrentByte -= charBytes.length;
while (buf.hasRemaining()) {
newbuf.put(buf.get());
}
newbuf.flip();
buf = newbuf;
// Ignore everything and try again starting from the current buffer.
reset = true;
}
} else if (b == recordStartBytes[byteIndexInRecordElementToMatch]) {
// Next byte matched.
if (!matchStarted) {
// Match was for the first byte, record the starting offset.
matchStarted = true;
startingOffsetInFileOfCurrentMatch = offsetInFileOfCurrentByte;
}
byteIndexInRecordElementToMatch++;
} else {
// Not a match. Ignore everything and try again starting at current point.
reset = true;
}
if (reset) {
// Clear variables and try to match starting from the next byte.
byteIndexInRecordElementToMatch = 0;
startingOffsetInFileOfCurrentMatch = -1;
matchStarted = false;
recordStartBytesMatched = false;
charBytes = new byte[MAX_CHAR_BYTES];
charBytesFound = 0;
}
if (byteIndexInRecordElementToMatch == recordStartBytes.length) {
// "<recordElement" matched. Need to still check next byte since this might be an
// element that has "recordElement" as a prefix.
recordStartBytesMatched = true;
}
}
buf.clear();
}
if (!fullyMatched) {
return -1;
} else {
return startingOffsetInFileOfCurrentMatch;
}
}