public T addNewSegment()

in graphjet-core/src/main/java/com/twitter/graphjet/bipartite/MultiSegmentReaderAccessibleInfoProvider.java [75:115]


  public T addNewSegment(
      int numEdgesInLiveSegment,
      Int2IntMap numEdgesInNonLiveSegmentsMap,
      StatsReceiver statsReceiver,
      BipartiteGraphSegmentProvider<T> bipartiteGraphSegmentProvider
  ) {
    final Int2ObjectMap<T> segments =
        new Int2ObjectOpenHashMap<T>(multiSegmentReaderAccessibleInfo.getSegments());
    numEdgesInNonLiveSegmentsMap.put(liveSegmentId, numEdgesInLiveSegment);
    int oldestSegmentId = multiSegmentReaderAccessibleInfo.oldestSegmentId;
    // remove a segment if we're at the limit
    if (multiSegmentReaderAccessibleInfo.getSegments().size() == maxNumSegments) {
      segments.remove(oldestSegmentId);
      numEdgesInNonLiveSegmentsMap.remove(oldestSegmentId);
      LOG.info("Removed segment " + oldestSegmentId);
      oldestSegmentId++;
    } else {
      statsReceiver.counter("numSegments").incr();
    }
    int newLiveSegmentId =  multiSegmentReaderAccessibleInfo.liveSegmentId + 1;
    // add a new segment
    T liveSegment =
        bipartiteGraphSegmentProvider.generateNewSegment(newLiveSegmentId, maxNumEdgesPerSegment);
    segments.put(newLiveSegmentId, liveSegment);
    // now make the switch for the readers -- this is immediately published and visible!
    multiSegmentReaderAccessibleInfo = new MultiSegmentReaderAccessibleInfo<T>(
            segments, oldestSegmentId, newLiveSegmentId);

    // flush the write
    liveSegmentId = newLiveSegmentId;

    numEdgesInNonLiveSegments = 0;
    for (int segmentEdgeCount : numEdgesInNonLiveSegmentsMap.values()) {
      numEdgesInNonLiveSegments += segmentEdgeCount;
    }
    LOG.info("Total number of edges in graph = " + numEdgesInNonLiveSegments);
    LOG.info("Created a new segment: oldestSegmentId = " + oldestSegmentId
        + ", and liveSegmentId = " + liveSegmentId);

    return liveSegment;
  }