public EarlybirdStartup provideEarlybirdStartup()

in src/java/com/twitter/search/earlybird/factory/EarlybirdWireModule.java [634:820]


  public EarlybirdStartup provideEarlybirdStartup(
      PartitionManager partitionManager,
      UserUpdatesStreamIndexer userUpdatesStreamIndexer,
      UserScrubGeoEventStreamIndexer userScrubGeoEventStreamIndexer,
      AudioSpaceEventsStreamIndexer audioSpaceEventsStreamIndexer,
      DynamicPartitionConfig dynamicPartitionConfig,
      CriticalExceptionHandler criticalExceptionHandler,
      SegmentManager segmentManager,
      MultiSegmentTermDictionaryManager multiSegmentTermDictionaryManager,
      QueryCacheManager queryCacheManager,
      ZooKeeperTryLockFactory zooKeeperTryLockFactory,
      ServerSetMember serverSetMember,
      Clock clock,
      SegmentSyncConfig segmentSyncConfig,
      EarlybirdSegmentFactory earlybirdSegmentFactory,
      EarlybirdCluster cluster,
      SearchDecider decider) throws IOException {
    if (cluster == EarlybirdCluster.FULL_ARCHIVE) {
      return new PartitionManagerStartup(clock, partitionManager);
    }

    // Check that the earlybird name is what we're expecting so we can build the kafka topics.
    String earlybirdName = EarlybirdProperty.EARLYBIRD_NAME.get();
    Preconditions.checkArgument("earlybird-realtime".equals(earlybirdName)
        || "earlybird-protected".equals(earlybirdName)
        || "earlybird-realtime-exp0".equals(earlybirdName)
        || "earlybird-realtime_cg".equals(earlybirdName));

    StartupUserEventIndexer startupUserEventIndexer = new StartupUserEventIndexer(
        provideSearchIndexingMetricSet(),
        userUpdatesStreamIndexer,
        userScrubGeoEventStreamIndexer,
        segmentManager,
        clock);

    // Coordinate leaving the serverset to flush segments to HDFS.
    CoordinatedEarlybirdAction actionCoordinator = new CoordinatedEarlybirdAction(
        zooKeeperTryLockFactory,
        "segment_flusher",
        dynamicPartitionConfig,
        serverSetMember,
        criticalExceptionHandler,
        segmentSyncConfig);
    actionCoordinator.setShouldSynchronize(true);

    FileSystem hdfsFileSystem = HdfsUtil.getHdfsFileSystem();
    EarlybirdIndexFlusher earlybirdIndexFlusher = new EarlybirdIndexFlusher(
        actionCoordinator,
        hdfsFileSystem,
        EarlybirdProperty.HDFS_INDEX_SYNC_DIR.get(),
        segmentManager,
        dynamicPartitionConfig.getCurrentPartitionConfig(),
        clock,
        new TimeLimitedHadoopExistsCall(hdfsFileSystem),
        provideOptimizationAndFlushingCoordinationLock());

    String baseTopicName = "search_ingester_%s_events_%s_%s";

    String earlybirdType;

    if ("earlybird-protected".equals(earlybirdName)) {
      earlybirdType = "protected";
    } else if ("earlybird-realtime_cg".equals(earlybirdName)) {
      earlybirdType = "realtime_cg";
    } else {
      earlybirdType = "realtime";
    }

    String tweetTopicName = String.format(
        baseTopicName,
        "indexing",
        earlybirdType,
        EarlybirdProperty.KAFKA_ENV.get());

    String updateTopicName = String.format(
        baseTopicName,
        "update",
        earlybirdType,
        EarlybirdProperty.KAFKA_ENV.get());

    LOG.info("Tweet topic: {}", tweetTopicName);
    LOG.info("Update topic: {}", updateTopicName);

    TopicPartition tweetTopic = new TopicPartition(
        tweetTopicName,
        dynamicPartitionConfig.getCurrentPartitionConfig().getIndexingHashPartitionID());
    TopicPartition updateTopic = new TopicPartition(
        updateTopicName,
        dynamicPartitionConfig.getCurrentPartitionConfig().getIndexingHashPartitionID());

    EarlybirdKafkaConsumersFactory earlybirdKafkaConsumersFactory =
        provideEarlybirdKafkaConsumersFactory();
    FreshStartupHandler freshStartupHandler = new FreshStartupHandler(
        clock,
        earlybirdKafkaConsumersFactory,
        tweetTopic,
        updateTopic,
        segmentManager,
        EarlybirdConfig.getMaxSegmentSize(),
        EarlybirdConfig.getLateTweetBuffer(),
        criticalExceptionHandler
    );

    TweetUpdateHandler updateHandler = new TweetUpdateHandler(segmentManager);

    CoordinatedEarlybirdAction postOptimizationRebuilds = new CoordinatedEarlybirdAction(
            zooKeeperTryLockFactory,
            "post_optimization_rebuilds",
            dynamicPartitionConfig,
            serverSetMember,
            criticalExceptionHandler,
            segmentSyncConfig
    );
    postOptimizationRebuilds.setShouldSynchronize(true);
    CoordinatedEarlybirdAction gcAction = new CoordinatedEarlybirdAction(
            zooKeeperTryLockFactory,
            "gc_before_optimization",
            dynamicPartitionConfig,
            serverSetMember,
            criticalExceptionHandler,
            segmentSyncConfig
    );
    gcAction.setShouldSynchronize(true);

    TweetCreateHandler createHandler = new TweetCreateHandler(
        segmentManager,
        provideSearchIndexingMetricSet(),
        criticalExceptionHandler,
        multiSegmentTermDictionaryManager,
        queryCacheManager,
        postOptimizationRebuilds,
        gcAction,
        EarlybirdConfig.getLateTweetBuffer(),
        EarlybirdConfig.getMaxSegmentSize(),
        provideKafkaIndexCaughtUpMonitor(),
        provideOptimizationAndFlushingCoordinationLock());

    PartitionWriter partitionWriter = new PartitionWriter(
        createHandler,
        updateHandler,
        criticalExceptionHandler,
        PenguinVersion.versionFromByteValue(EarlybirdConfig.getPenguinVersionByte()),
        clock);

    KafkaConsumer<Long, ThriftVersionedEvents> rawKafkaConsumer =
        earlybirdKafkaConsumersFactory.createKafkaConsumer(
            "earlybird_tweet_kafka_consumer");

    EarlybirdKafkaConsumer earlybirdKafkaConsumer = provideKafkaConsumer(
        criticalExceptionHandler,
        rawKafkaConsumer,
        tweetTopic,
        updateTopic,
        partitionWriter,
        earlybirdIndexFlusher);

    EarlybirdIndexLoader earlybirdIndexLoader = new EarlybirdIndexLoader(
        hdfsFileSystem,
        getIndexLoadingDirectory(), // See SEARCH-32839
        EarlybirdProperty.ENV.get("default_env_value"),
        dynamicPartitionConfig.getCurrentPartitionConfig(),
        earlybirdSegmentFactory,
        segmentSyncConfig,
        clock);

    this.storeEarlybirdStartupProducts(
        createHandler,
        partitionWriter,
        earlybirdIndexFlusher
    );

    return new KafkaStartup(
        segmentManager,
        earlybirdKafkaConsumer,
        startupUserEventIndexer,
        userUpdatesStreamIndexer,
        userScrubGeoEventStreamIndexer,
        audioSpaceEventsStreamIndexer,
        queryCacheManager,
        earlybirdIndexLoader,
        freshStartupHandler,
        provideSearchIndexingMetricSet(),
        multiSegmentTermDictionaryManager,
        criticalExceptionHandler,
        decider
    );
  }