public void unionFields()

in flink-core/src/main/java/org/apache/flink/types/Record.java [563:729]


	public void unionFields(Record other) {
		final int minFields = Math.min(this.numFields, other.numFields);
		final int maxFields = Math.max(this.numFields, other.numFields);
		
		final int[] offsets = this.offsets.length >= maxFields ? this.offsets : new int[maxFields];
		final int[] lengths = this.lengths.length >= maxFields ? this.lengths : new int[maxFields];
		
		if (!(this.isModified() || other.isModified())) {
			// handle the special (but common) case where both records have a valid binary representation differently
			// allocate space for the switchBuffer first
			final int estimatedLength = this.binaryLen + other.binaryLen;
			this.serializer.memory = (this.switchBuffer != null && this.switchBuffer.length >= estimatedLength) ? 
										this.switchBuffer : new byte[estimatedLength];
			this.serializer.position = 0;
			
			try {
				// common loop for both records
				for (int i = 0; i < minFields; i++) {
					final int thisOff = this.offsets[i];
					if (thisOff == NULL_INDICATOR_OFFSET) {
						final int otherOff = other.offsets[i];
						if (otherOff == NULL_INDICATOR_OFFSET) {
							offsets[i] = NULL_INDICATOR_OFFSET;
						} else {
							// take field from other record
							offsets[i] = this.serializer.position;
							this.serializer.write(other.binaryData, otherOff, other.lengths[i]);
							lengths[i] = other.lengths[i];
						}
					} else {
						// copy field from this one
						offsets[i] = this.serializer.position;
						this.serializer.write(this.binaryData, thisOff, this.lengths[i]);
						lengths[i] = this.lengths[i];
					}
				}
				
				// add the trailing fields from one record
				if (minFields != maxFields) {
					final Record sourceForRemainder = this.numFields > minFields ? this : other;
					int begin = -1;
					int end = -1;
					int offsetDelta = 0;
					
					// go through the offsets, find the non-null fields to account for the remaining data
					for (int k = minFields; k < maxFields; k++) {
						final int off = sourceForRemainder.offsets[k];
						if (off == NULL_INDICATOR_OFFSET) {
							offsets[k] = NULL_INDICATOR_OFFSET;
						} else {
							end = sourceForRemainder.offsets[k]+sourceForRemainder.lengths[k];
							if (begin == -1) {
								// first non null column in the remainder
								begin = sourceForRemainder.offsets[k];
								offsetDelta = this.serializer.position - begin;
							}
							offsets[k] = sourceForRemainder.offsets[k] + offsetDelta;
						}
					}
					
					// copy the remaining fields directly as binary
					if (begin != -1) {
						this.serializer.write(sourceForRemainder.binaryData, begin, 
								end - begin);
					}
					
					// the lengths can be copied directly
					if (lengths != sourceForRemainder.lengths) {
						System.arraycopy(sourceForRemainder.lengths, minFields, lengths, minFields, maxFields - minFields);
					}
				}
			} catch (Exception ioex) {
				throw new RuntimeException("Error creating field union of record data" + 
							ioex.getMessage() == null ? "." : ": " + ioex.getMessage(), ioex);
			}
		}
		else {
			// the general case, where at least one of the two records has a binary representation that is not in sync.
			final int estimatedLength = (this.binaryLen > 0 ? this.binaryLen : this.numFields * DEFAULT_FIELD_LEN_ESTIMATE) + 
										(other.binaryLen > 0 ? other.binaryLen : other.numFields * DEFAULT_FIELD_LEN_ESTIMATE);
			this.serializer.memory = (this.switchBuffer != null && this.switchBuffer.length >= estimatedLength) ? 
										this.switchBuffer : new byte[estimatedLength];
			this.serializer.position = 0;
			
			try {
				// common loop for both records
				for (int i = 0; i < minFields; i++) {
					final int thisOff = this.offsets[i];
					if (thisOff == NULL_INDICATOR_OFFSET) {
						final int otherOff = other.offsets[i];
						if (otherOff == NULL_INDICATOR_OFFSET) {
							offsets[i] = NULL_INDICATOR_OFFSET;
						} else if (otherOff == MODIFIED_INDICATOR_OFFSET) {
							// serialize modified field from other record
							offsets[i] = this.serializer.position;
							other.writeFields[i].write(this.serializer);
							lengths[i] = this.serializer.position - offsets[i];
						} else {
							// take field from other record binary
							offsets[i] = this.serializer.position;
							this.serializer.write(other.binaryData, otherOff, other.lengths[i]);
							lengths[i] = other.lengths[i];
						}
					} else if (thisOff == MODIFIED_INDICATOR_OFFSET) {
						// serialize modified field from this record
						offsets[i] = this.serializer.position;
						this.writeFields[i].write(this.serializer);
						lengths[i] = this.serializer.position - offsets[i];
					} else {
						// copy field from this one
						offsets[i] = this.serializer.position;
						this.serializer.write(this.binaryData, thisOff, this.lengths[i]);
						lengths[i] = this.lengths[i];
					}
				}
				
				// add the trailing fields from one record
				if (minFields != maxFields) {
					final Record sourceForRemainder = this.numFields > minFields ? this : other;
					
					// go through the offsets, find the non-null fields
					for (int k = minFields; k < maxFields; k++) {
						final int off = sourceForRemainder.offsets[k];
						if (off == NULL_INDICATOR_OFFSET) {
							offsets[k] = NULL_INDICATOR_OFFSET;
						} else if (off == MODIFIED_INDICATOR_OFFSET) {
							// serialize modified field from the source record
							offsets[k] = this.serializer.position;
							sourceForRemainder.writeFields[k].write(this.serializer);
							lengths[k] = this.serializer.position - offsets[k];
						} else {
							// copy field from the source record binary
							offsets[k] = this.serializer.position;
							final int len = sourceForRemainder.lengths[k];
							this.serializer.write(sourceForRemainder.binaryData, off, len);
							lengths[k] = len;
						}
					}
				}
			} catch (Exception ioex) {
				throw new RuntimeException("Error creating field union of record data" + 
							ioex.getMessage() == null ? "." : ": " + ioex.getMessage(), ioex);
			}
		}
		
		serializeHeader(this.serializer, offsets, maxFields);
		
		// set the fields
		this.switchBuffer = this.binaryData;
		this.binaryData = serializer.memory;
		this.binaryLen = serializer.position;
		
		this.numFields = maxFields;
		this.offsets = offsets;
		this.lengths = lengths;
		
		this.firstModifiedPos = Integer.MAX_VALUE;
		
		// make sure that the object arrays reflect the size as well
		if (this.readFields == null || this.readFields.length < maxFields) {
			final Value[] na = new Value[maxFields];
			System.arraycopy(this.readFields, 0, na, 0, this.readFields.length);
			this.readFields = na;
		}
		this.writeFields = (this.writeFields == null || this.writeFields.length < maxFields) ? 
																new Value[maxFields] : this.writeFields;
	}