in flink-core/src/main/java/org/apache/flink/types/Record.java [563:729]
public void unionFields(Record other) {
final int minFields = Math.min(this.numFields, other.numFields);
final int maxFields = Math.max(this.numFields, other.numFields);
final int[] offsets = this.offsets.length >= maxFields ? this.offsets : new int[maxFields];
final int[] lengths = this.lengths.length >= maxFields ? this.lengths : new int[maxFields];
if (!(this.isModified() || other.isModified())) {
// handle the special (but common) case where both records have a valid binary representation differently
// allocate space for the switchBuffer first
final int estimatedLength = this.binaryLen + other.binaryLen;
this.serializer.memory = (this.switchBuffer != null && this.switchBuffer.length >= estimatedLength) ?
this.switchBuffer : new byte[estimatedLength];
this.serializer.position = 0;
try {
// common loop for both records
for (int i = 0; i < minFields; i++) {
final int thisOff = this.offsets[i];
if (thisOff == NULL_INDICATOR_OFFSET) {
final int otherOff = other.offsets[i];
if (otherOff == NULL_INDICATOR_OFFSET) {
offsets[i] = NULL_INDICATOR_OFFSET;
} else {
// take field from other record
offsets[i] = this.serializer.position;
this.serializer.write(other.binaryData, otherOff, other.lengths[i]);
lengths[i] = other.lengths[i];
}
} else {
// copy field from this one
offsets[i] = this.serializer.position;
this.serializer.write(this.binaryData, thisOff, this.lengths[i]);
lengths[i] = this.lengths[i];
}
}
// add the trailing fields from one record
if (minFields != maxFields) {
final Record sourceForRemainder = this.numFields > minFields ? this : other;
int begin = -1;
int end = -1;
int offsetDelta = 0;
// go through the offsets, find the non-null fields to account for the remaining data
for (int k = minFields; k < maxFields; k++) {
final int off = sourceForRemainder.offsets[k];
if (off == NULL_INDICATOR_OFFSET) {
offsets[k] = NULL_INDICATOR_OFFSET;
} else {
end = sourceForRemainder.offsets[k]+sourceForRemainder.lengths[k];
if (begin == -1) {
// first non null column in the remainder
begin = sourceForRemainder.offsets[k];
offsetDelta = this.serializer.position - begin;
}
offsets[k] = sourceForRemainder.offsets[k] + offsetDelta;
}
}
// copy the remaining fields directly as binary
if (begin != -1) {
this.serializer.write(sourceForRemainder.binaryData, begin,
end - begin);
}
// the lengths can be copied directly
if (lengths != sourceForRemainder.lengths) {
System.arraycopy(sourceForRemainder.lengths, minFields, lengths, minFields, maxFields - minFields);
}
}
} catch (Exception ioex) {
throw new RuntimeException("Error creating field union of record data" +
ioex.getMessage() == null ? "." : ": " + ioex.getMessage(), ioex);
}
}
else {
// the general case, where at least one of the two records has a binary representation that is not in sync.
final int estimatedLength = (this.binaryLen > 0 ? this.binaryLen : this.numFields * DEFAULT_FIELD_LEN_ESTIMATE) +
(other.binaryLen > 0 ? other.binaryLen : other.numFields * DEFAULT_FIELD_LEN_ESTIMATE);
this.serializer.memory = (this.switchBuffer != null && this.switchBuffer.length >= estimatedLength) ?
this.switchBuffer : new byte[estimatedLength];
this.serializer.position = 0;
try {
// common loop for both records
for (int i = 0; i < minFields; i++) {
final int thisOff = this.offsets[i];
if (thisOff == NULL_INDICATOR_OFFSET) {
final int otherOff = other.offsets[i];
if (otherOff == NULL_INDICATOR_OFFSET) {
offsets[i] = NULL_INDICATOR_OFFSET;
} else if (otherOff == MODIFIED_INDICATOR_OFFSET) {
// serialize modified field from other record
offsets[i] = this.serializer.position;
other.writeFields[i].write(this.serializer);
lengths[i] = this.serializer.position - offsets[i];
} else {
// take field from other record binary
offsets[i] = this.serializer.position;
this.serializer.write(other.binaryData, otherOff, other.lengths[i]);
lengths[i] = other.lengths[i];
}
} else if (thisOff == MODIFIED_INDICATOR_OFFSET) {
// serialize modified field from this record
offsets[i] = this.serializer.position;
this.writeFields[i].write(this.serializer);
lengths[i] = this.serializer.position - offsets[i];
} else {
// copy field from this one
offsets[i] = this.serializer.position;
this.serializer.write(this.binaryData, thisOff, this.lengths[i]);
lengths[i] = this.lengths[i];
}
}
// add the trailing fields from one record
if (minFields != maxFields) {
final Record sourceForRemainder = this.numFields > minFields ? this : other;
// go through the offsets, find the non-null fields
for (int k = minFields; k < maxFields; k++) {
final int off = sourceForRemainder.offsets[k];
if (off == NULL_INDICATOR_OFFSET) {
offsets[k] = NULL_INDICATOR_OFFSET;
} else if (off == MODIFIED_INDICATOR_OFFSET) {
// serialize modified field from the source record
offsets[k] = this.serializer.position;
sourceForRemainder.writeFields[k].write(this.serializer);
lengths[k] = this.serializer.position - offsets[k];
} else {
// copy field from the source record binary
offsets[k] = this.serializer.position;
final int len = sourceForRemainder.lengths[k];
this.serializer.write(sourceForRemainder.binaryData, off, len);
lengths[k] = len;
}
}
}
} catch (Exception ioex) {
throw new RuntimeException("Error creating field union of record data" +
ioex.getMessage() == null ? "." : ": " + ioex.getMessage(), ioex);
}
}
serializeHeader(this.serializer, offsets, maxFields);
// set the fields
this.switchBuffer = this.binaryData;
this.binaryData = serializer.memory;
this.binaryLen = serializer.position;
this.numFields = maxFields;
this.offsets = offsets;
this.lengths = lengths;
this.firstModifiedPos = Integer.MAX_VALUE;
// make sure that the object arrays reflect the size as well
if (this.readFields == null || this.readFields.length < maxFields) {
final Value[] na = new Value[maxFields];
System.arraycopy(this.readFields, 0, na, 0, this.readFields.length);
this.readFields = na;
}
this.writeFields = (this.writeFields == null || this.writeFields.length < maxFields) ?
new Value[maxFields] : this.writeFields;
}