in flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala [415:623]
private def getRemoteAkkaConfig(
configuration: Configuration,
bindAddress: String,
port: Int,
externalHostname: String,
externalPort: Int): Config = {
val normalizedExternalHostname = NetUtils.unresolvedHostToNormalizedString(externalHostname)
val akkaAskTimeout = getTimeout(configuration)
val startupTimeout = TimeUtils.getStringInMillis(
TimeUtils.parseDuration(
configuration.getString(
AkkaOptions.STARTUP_TIMEOUT,
TimeUtils.getStringInMillis(akkaAskTimeout.multipliedBy(10L)))))
val transportHeartbeatIntervalDuration = TimeUtils.parseDuration(
configuration.getString(AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL))
val transportHeartbeatPauseDuration = TimeUtils.parseDuration(
configuration.getString(AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE))
validateHeartbeat(
AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE.key(),
transportHeartbeatPauseDuration,
AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL.key(),
transportHeartbeatIntervalDuration)
val transportHeartbeatInterval = TimeUtils.getStringInMillis(transportHeartbeatIntervalDuration)
val transportHeartbeatPause = TimeUtils.getStringInMillis(transportHeartbeatPauseDuration)
val transportThreshold = configuration.getDouble(AkkaOptions.TRANSPORT_THRESHOLD)
val akkaTCPTimeout = TimeUtils.getStringInMillis(
TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT)))
val akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE)
val lifecycleEvents = configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)
val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
val akkaEnableSSLConfig = configuration.getBoolean(AkkaOptions.SSL_ENABLED) &&
SSLUtils.isInternalSSLEnabled(configuration)
val retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR)
val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off"
val akkaSSLKeyStore = configuration.getString(
SecurityOptions.SSL_INTERNAL_KEYSTORE,
configuration.getString(SecurityOptions.SSL_KEYSTORE))
val akkaSSLKeyStorePassword = configuration.getString(
SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD,
configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD))
val akkaSSLKeyPassword = configuration.getString(
SecurityOptions.SSL_INTERNAL_KEY_PASSWORD,
configuration.getString(SecurityOptions.SSL_KEY_PASSWORD))
val akkaSSLTrustStore = configuration.getString(
SecurityOptions.SSL_INTERNAL_TRUSTSTORE,
configuration.getString(SecurityOptions.SSL_TRUSTSTORE))
val akkaSSLTrustStorePassword = configuration.getString(
SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD,
configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD))
val akkaSSLCertFingerprintString = configuration.getString(
SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT)
val akkaSSLCertFingerprints = if ( akkaSSLCertFingerprintString != null ) {
akkaSSLCertFingerprintString.split(",").toList.mkString("[\"", "\",\"", "\"]")
} else "[]"
val akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL)
val akkaSSLAlgorithmsString = configuration.getString(SecurityOptions.SSL_ALGORITHMS)
val akkaSSLAlgorithms = akkaSSLAlgorithmsString.split(",").toList.mkString("[", ",", "]")
val clientSocketWorkerPoolPoolSizeMin =
configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN)
val clientSocketWorkerPoolPoolSizeMax =
configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX)
val clientSocketWorkerPoolPoolSizeFactor =
configuration.getDouble(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR)
val serverSocketWorkerPoolPoolSizeMin =
configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN)
val serverSocketWorkerPoolPoolSizeMax =
configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX)
val serverSocketWorkerPoolPoolSizeFactor =
configuration.getDouble(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR)
val configString =
s"""
|akka {
| actor {
| provider = "akka.remote.RemoteActorRefProvider"
| }
|
| remote {
| startup-timeout = $startupTimeout
|
| transport-failure-detector{
| acceptable-heartbeat-pause = $transportHeartbeatPause
| heartbeat-interval = $transportHeartbeatInterval
| threshold = $transportThreshold
| }
|
| netty {
| tcp {
| transport-class = "akka.remote.transport.netty.NettyTransport"
| port = $externalPort
| bind-port = $port
| connection-timeout = $akkaTCPTimeout
| maximum-frame-size = $akkaFramesize
| tcp-nodelay = on
|
| client-socket-worker-pool {
| pool-size-min = $clientSocketWorkerPoolPoolSizeMin
| pool-size-max = $clientSocketWorkerPoolPoolSizeMax
| pool-size-factor = $clientSocketWorkerPoolPoolSizeFactor
| }
|
| server-socket-worker-pool {
| pool-size-min = $serverSocketWorkerPoolPoolSizeMin
| pool-size-max = $serverSocketWorkerPoolPoolSizeMax
| pool-size-factor = $serverSocketWorkerPoolPoolSizeFactor
| }
| }
| }
|
| log-remote-lifecycle-events = $logLifecycleEvents
|
| retry-gate-closed-for = ${retryGateClosedFor + " ms"}
| }
|}
""".stripMargin
val effectiveHostname =
if (normalizedExternalHostname != null && normalizedExternalHostname.nonEmpty) {
normalizedExternalHostname
} else {
// if bindAddress is null or empty, then leave bindAddress unspecified. Akka will pick
// InetAddress.getLocalHost.getHostAddress
""
}
val hostnameConfigString =
s"""
|akka {
| remote {
| netty {
| tcp {
| hostname = "$effectiveHostname"
| bind-hostname = "$bindAddress"
| }
| }
| }
|}
""".stripMargin
val sslConfigString = if (akkaEnableSSLConfig) {
s"""
|akka {
| remote {
|
| enabled-transports = ["akka.remote.netty.ssl"]
|
| netty {
|
| ssl = $${akka.remote.netty.tcp}
|
| ssl {
|
| enable-ssl = $akkaEnableSSL
| ssl-engine-provider = org.apache.flink.runtime.akka.CustomSSLEngineProvider
| security {
| key-store = "$akkaSSLKeyStore"
| key-store-password = "$akkaSSLKeyStorePassword"
| key-password = "$akkaSSLKeyPassword"
| trust-store = "$akkaSSLTrustStore"
| trust-store-password = "$akkaSSLTrustStorePassword"
| protocol = $akkaSSLProtocol
| enabled-algorithms = $akkaSSLAlgorithms
| random-number-generator = ""
| require-mutual-authentication = on
| cert-fingerprints = $akkaSSLCertFingerprints
| }
| }
| }
| }
|}
""".stripMargin
}else{
""
}
ConfigFactory.parseString(configString + hostnameConfigString + sslConfigString).resolve()
}