private long getFirstOccurenceOfRecordElement()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java [344:461]


    private long getFirstOccurenceOfRecordElement(
        ReadableByteChannel channel, ByteArrayOutputStream preambleByteBuffer) throws IOException {
      int byteIndexInRecordElementToMatch = 0;
      // Index of the byte in the string "<recordElement" to be matched
      // against the current byte from the stream.
      boolean recordStartBytesMatched = false; // "<recordElement" matched. Still have to match the
      // next character to confirm if this is a positive match.
      boolean fullyMatched = false; // If true, record element was fully matched.

      // This gives the offset of the byte currently being read. We do a '-1' here since we
      // increment this value at the beginning of the while loop below.
      long offsetInFileOfCurrentByte = getCurrentSource().getStartOffset() - 1;
      long startingOffsetInFileOfCurrentMatch = -1;
      // If this is non-negative, currently there is a match in progress and this value gives the
      // starting offset of the match currently being conducted.
      boolean matchStarted = false; // If true, a match is currently in progress.

      // These two values are used to determine the character immediately following a match for
      // "<recordElement". Please see the comment for 'MAX_CHAR_BYTES' above.
      byte[] charBytes = new byte[MAX_CHAR_BYTES];
      int charBytesFound = 0;

      ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
      byte[] recordStartBytes =
          ("<" + getCurrentSource().recordElement).getBytes(StandardCharsets.UTF_8);

      outer: while (channel.read(buf) > 0) {
        buf.flip();
        while (buf.hasRemaining()) {
          offsetInFileOfCurrentByte++;
          byte b = buf.get();
          boolean reset = false;
          if (recordStartBytesMatched) {
            // We already matched "<recordElement" reading the next character to determine if this
            // is a positive match for a new record.
            charBytes[charBytesFound] = b;
            charBytesFound++;
            Character c = null;
            if (charBytesFound == charBytes.length) {
              CharBuffer charBuf = CharBuffer.allocate(1);
              InputStream charBufStream = new ByteArrayInputStream(charBytes);
              java.io.Reader reader =
                  new InputStreamReader(charBufStream, StandardCharsets.UTF_8);
              int read = reader.read();
              if (read <= 0) {
                return -1;
              }
              charBuf.flip();
              c = (char) read;
            } else {
              continue;
            }

            // Record start may be of following forms
            // * "<recordElement<whitespace>..."
            // * "<recordElement>..."
            // * "<recordElement/..."
            if (Character.isWhitespace(c) || c == '>' || c == '/') {
              fullyMatched = true;
              // Add the recordStartBytes and charBytes to preambleByteBuffer since these were
              // already read from the channel.
              preambleByteBuffer.write(recordStartBytes);
              preambleByteBuffer.write(charBytes);
              // Also add the rest of the current buffer to preambleByteBuffer.
              while (buf.hasRemaining()) {
                preambleByteBuffer.write(buf.get());
              }
              break outer;
            } else {
              // Matching was unsuccessful. Reset the buffer to include bytes read for the char.
              ByteBuffer newbuf = ByteBuffer.allocate(BUF_SIZE);
              newbuf.put(charBytes);
              offsetInFileOfCurrentByte -= charBytes.length;
              while (buf.hasRemaining()) {
                newbuf.put(buf.get());
              }
              newbuf.flip();
              buf = newbuf;

              // Ignore everything and try again starting from the current buffer.
              reset = true;
            }
          } else if (b == recordStartBytes[byteIndexInRecordElementToMatch]) {
            // Next byte matched.
            if (!matchStarted) {
              // Match was for the first byte, record the starting offset.
              matchStarted = true;
              startingOffsetInFileOfCurrentMatch = offsetInFileOfCurrentByte;
            }
            byteIndexInRecordElementToMatch++;
          } else {
            // Not a match. Ignore everything and try again starting at current point.
            reset = true;
          }
          if (reset) {
            // Clear variables and try to match starting from the next byte.
            byteIndexInRecordElementToMatch = 0;
            startingOffsetInFileOfCurrentMatch = -1;
            matchStarted = false;
            recordStartBytesMatched = false;
            charBytes = new byte[MAX_CHAR_BYTES];
            charBytesFound = 0;
          }
          if (byteIndexInRecordElementToMatch == recordStartBytes.length) {
            // "<recordElement" matched. Need to still check next byte since this might be an
            // element that has "recordElement" as a prefix.
            recordStartBytesMatched = true;
          }
        }
        buf.clear();
      }

      if (!fullyMatched) {
        return -1;
      } else {
        return startingOffsetInFileOfCurrentMatch;
      }
    }