in src/java/com/twitter/search/earlybird/factory/EarlybirdServerFactory.java [68:314]
public EarlybirdServer makeEarlybirdServer(EarlybirdWireModule earlybirdWireModule)
throws IOException {
LOG.info("Started making an Earlybird server");
CriticalExceptionHandler criticalExceptionHandler = new CriticalExceptionHandler();
Decider decider = earlybirdWireModule.provideDecider();
SearchDecider searchDecider = new SearchDecider(decider);
EarlybirdWireModule.ZooKeeperClients zkClients = earlybirdWireModule.provideZooKeeperClients();
ZooKeeperTryLockFactory zkTryLockFactory =
zkClients.stateClient.createZooKeeperTryLockFactory();
EarlybirdIndexConfig earlybirdIndexConfig =
earlybirdWireModule.provideEarlybirdIndexConfig(
decider, earlybirdWireModule.provideSearchIndexingMetricSet(),
criticalExceptionHandler);
SearchStatsReceiver earlybirdServerStats =
earlybirdWireModule.provideEarlybirdServerStatsReceiver();
EarlybirdSearcherStats tweetsSearcherStats =
earlybirdWireModule.provideTweetsSearcherStats();
DynamicPartitionConfig dynamicPartitionConfig =
earlybirdWireModule.provideDynamicPartitionConfig();
PartitionConfig partitionConfig = dynamicPartitionConfig.getCurrentPartitionConfig();
LOG.info("Partition config info [Cluster: {}, Tier: {}, Partition: {}, Replica: {}]",
partitionConfig.getClusterName(),
partitionConfig.getTierName(),
partitionConfig.getIndexingHashPartitionID(),
partitionConfig.getHostPositionWithinHashPartition());
Clock clock = earlybirdWireModule.provideClock();
UserUpdatesChecker userUpdatesChecker =
new UserUpdatesChecker(clock, decider, earlybirdIndexConfig.getCluster());
UserTable userTable = UserTable.newTableWithDefaultCapacityAndPredicate(
earlybirdIndexConfig.getUserTableFilter(partitionConfig)::apply);
UserScrubGeoMap userScrubGeoMap = new UserScrubGeoMap();
AudioSpaceTable audioSpaceTable = new AudioSpaceTable(clock);
SegmentSyncConfig segmentSyncConfig =
earlybirdWireModule.provideSegmentSyncConfig(earlybirdIndexConfig.getCluster());
SegmentManager segmentManager = earlybirdWireModule.provideSegmentManager(
dynamicPartitionConfig,
earlybirdIndexConfig,
earlybirdWireModule.provideSearchIndexingMetricSet(),
tweetsSearcherStats,
earlybirdServerStats,
userUpdatesChecker,
segmentSyncConfig,
userTable,
userScrubGeoMap,
clock,
criticalExceptionHandler);
QueryCacheConfig config = earlybirdWireModule.provideQueryCacheConfig(earlybirdServerStats);
QueryCacheManager queryCacheManager = earlybirdWireModule.provideQueryCacheManager(
config,
earlybirdIndexConfig,
partitionConfig.getMaxEnabledLocalSegments(),
userTable,
userScrubGeoMap,
earlybirdWireModule.provideQueryCacheUpdateTaskScheduledExecutorFactory(),
earlybirdServerStats,
tweetsSearcherStats,
decider,
criticalExceptionHandler,
clock);
EarlybirdServerSetManager serverSetManager = earlybirdWireModule.provideServerSetManager(
zkClients.discoveryClient,
dynamicPartitionConfig,
earlybirdServerStats,
EarlybirdConfig.getThriftPort(),
"");
EarlybirdWarmUpManager warmUpManager =
earlybirdWireModule.provideWarmUpManager(zkClients.discoveryClient,
dynamicPartitionConfig,
earlybirdServerStats,
decider,
clock,
EarlybirdConfig.getWarmUpThriftPort(),
"warmup_");
EarlybirdDarkProxy earlybirdDarkProxy = earlybirdWireModule.provideEarlybirdDarkProxy(
new SearchDecider(decider),
earlybirdWireModule.provideFinagleStatsReceiver(),
serverSetManager,
warmUpManager,
partitionConfig.getClusterName());
UserUpdatesStreamIndexer userUpdatesStreamIndexer =
earlybirdWireModule.provideUserUpdatesKafkaConsumer(segmentManager);
UserScrubGeoEventStreamIndexer userScrubGeoEventStreamIndexer =
earlybirdWireModule.provideUserScrubGeoEventKafkaConsumer(segmentManager);
AudioSpaceEventsStreamIndexer audioSpaceEventsStreamIndexer =
earlybirdWireModule.provideAudioSpaceEventsStreamIndexer(audioSpaceTable, clock);
MultiSegmentTermDictionaryManager.Config termDictionaryConfig =
earlybirdWireModule.provideMultiSegmentTermDictionaryManagerConfig();
MultiSegmentTermDictionaryManager multiSegmentTermDictionaryManager =
earlybirdWireModule.provideMultiSegmentTermDictionaryManager(
termDictionaryConfig,
segmentManager,
earlybirdServerStats,
decider,
earlybirdIndexConfig.getCluster());
TermCountMonitor termCountMonitor =
earlybirdWireModule.provideTermCountMonitor(
segmentManager, earlybirdWireModule.provideTermCountMonitorScheduledExecutorFactory(),
earlybirdServerStats,
criticalExceptionHandler);
TweetCountMonitor tweetCountMonitor =
earlybirdWireModule.provideTweetCountMonitor(
segmentManager, earlybirdWireModule.provideTweetCountMonitorScheduledExecutorFactory(),
earlybirdServerStats,
criticalExceptionHandler);
ScoringModelsManager scoringModelsManager = earlybirdWireModule.provideScoringModelsManager(
earlybirdServerStats,
earlybirdIndexConfig
);
TensorflowModelsManager tensorflowModelsManager =
earlybirdWireModule.provideTensorflowModelsManager(
earlybirdServerStats,
"tf_loader",
decider,
earlybirdIndexConfig
);
AuroraSchedulerClient schedulerClient = null;
AuroraInstanceKey auroraInstanceKey = EarlybirdConfig.getAuroraInstanceKey();
if (auroraInstanceKey != null) {
schedulerClient = new AuroraSchedulerClient(auroraInstanceKey.getCluster());
}
UpdateableEarlybirdStateManager earlybirdStateManager =
earlybirdWireModule.provideUpdateableEarlybirdStateManager(
earlybirdIndexConfig,
dynamicPartitionConfig,
zkClients.stateClient,
schedulerClient,
earlybirdWireModule.provideStateUpdateManagerExecutorFactory(),
scoringModelsManager,
tensorflowModelsManager,
earlybirdServerStats,
new SearchDecider(decider),
criticalExceptionHandler);
EarlybirdFuturePoolManager futurePoolManager = earlybirdWireModule.provideFuturePoolManager();
EarlybirdFinagleServerManager finagleServerManager =
earlybirdWireModule.provideFinagleServerManager(criticalExceptionHandler);
PartitionManager partitionManager = null;
if (EarlybirdIndexConfigUtil.isArchiveSearch()) {
partitionManager = buildArchivePartitionManager(
earlybirdWireModule,
userUpdatesStreamIndexer,
userScrubGeoEventStreamIndexer,
zkTryLockFactory,
earlybirdIndexConfig,
dynamicPartitionConfig,
segmentManager,
queryCacheManager,
earlybirdServerStats,
serverSetManager,
earlybirdWireModule.providePartitionManagerExecutorFactory(),
earlybirdWireModule.provideSimpleUserUpdateIndexerScheduledExecutorFactory(),
clock,
segmentSyncConfig,
criticalExceptionHandler);
} else {
LOG.info("Not creating PartitionManager");
}
EarlybirdSegmentFactory earlybirdSegmentFactory = new EarlybirdSegmentFactory(
earlybirdIndexConfig,
earlybirdWireModule.provideSearchIndexingMetricSet(),
tweetsSearcherStats,
clock);
EarlybirdStartup earlybirdStartup = earlybirdWireModule.provideEarlybirdStartup(
partitionManager,
userUpdatesStreamIndexer,
userScrubGeoEventStreamIndexer,
audioSpaceEventsStreamIndexer,
dynamicPartitionConfig,
criticalExceptionHandler,
segmentManager,
multiSegmentTermDictionaryManager,
queryCacheManager,
zkTryLockFactory,
serverSetManager,
clock,
segmentSyncConfig,
earlybirdSegmentFactory,
earlybirdIndexConfig.getCluster(),
searchDecider);
QualityFactor qualityFactor = earlybirdWireModule.provideQualityFactor(
decider,
earlybirdServerStats);
EarlybirdServer earlybirdServer = new EarlybirdServer(
queryCacheManager,
zkClients.stateClient,
decider,
earlybirdIndexConfig,
dynamicPartitionConfig,
partitionManager,
segmentManager,
audioSpaceTable,
termCountMonitor,
tweetCountMonitor,
earlybirdStateManager,
futurePoolManager,
finagleServerManager,
serverSetManager,
warmUpManager,
earlybirdServerStats,
tweetsSearcherStats,
scoringModelsManager,
tensorflowModelsManager,
clock,
multiSegmentTermDictionaryManager,
earlybirdDarkProxy,
segmentSyncConfig,
earlybirdWireModule.provideQueryTimeoutFactory(),
earlybirdStartup,
qualityFactor,
earlybirdWireModule.provideSearchIndexingMetricSet());
earlybirdStateManager.setEarlybirdServer(earlybirdServer);
criticalExceptionHandler.setShutdownHook(earlybirdServer::shutdown);
return earlybirdServer;
}