in graphjet-core/src/main/java/com/twitter/graphjet/bipartite/MultiSegmentReaderAccessibleInfoProvider.java [75:115]
public T addNewSegment(
int numEdgesInLiveSegment,
Int2IntMap numEdgesInNonLiveSegmentsMap,
StatsReceiver statsReceiver,
BipartiteGraphSegmentProvider<T> bipartiteGraphSegmentProvider
) {
final Int2ObjectMap<T> segments =
new Int2ObjectOpenHashMap<T>(multiSegmentReaderAccessibleInfo.getSegments());
numEdgesInNonLiveSegmentsMap.put(liveSegmentId, numEdgesInLiveSegment);
int oldestSegmentId = multiSegmentReaderAccessibleInfo.oldestSegmentId;
// remove a segment if we're at the limit
if (multiSegmentReaderAccessibleInfo.getSegments().size() == maxNumSegments) {
segments.remove(oldestSegmentId);
numEdgesInNonLiveSegmentsMap.remove(oldestSegmentId);
LOG.info("Removed segment " + oldestSegmentId);
oldestSegmentId++;
} else {
statsReceiver.counter("numSegments").incr();
}
int newLiveSegmentId = multiSegmentReaderAccessibleInfo.liveSegmentId + 1;
// add a new segment
T liveSegment =
bipartiteGraphSegmentProvider.generateNewSegment(newLiveSegmentId, maxNumEdgesPerSegment);
segments.put(newLiveSegmentId, liveSegment);
// now make the switch for the readers -- this is immediately published and visible!
multiSegmentReaderAccessibleInfo = new MultiSegmentReaderAccessibleInfo<T>(
segments, oldestSegmentId, newLiveSegmentId);
// flush the write
liveSegmentId = newLiveSegmentId;
numEdgesInNonLiveSegments = 0;
for (int segmentEdgeCount : numEdgesInNonLiveSegmentsMap.values()) {
numEdgesInNonLiveSegments += segmentEdgeCount;
}
LOG.info("Total number of edges in graph = " + numEdgesInNonLiveSegments);
LOG.info("Created a new segment: oldestSegmentId = " + oldestSegmentId
+ ", and liveSegmentId = " + liveSegmentId);
return liveSegment;
}