private def getRemoteAkkaConfig()

in flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala [415:623]


  private def getRemoteAkkaConfig(
      configuration: Configuration,
      bindAddress: String,
      port: Int,
      externalHostname: String,
      externalPort: Int): Config = {

    val normalizedExternalHostname = NetUtils.unresolvedHostToNormalizedString(externalHostname)

    val akkaAskTimeout = getTimeout(configuration)

    val startupTimeout = TimeUtils.getStringInMillis(
      TimeUtils.parseDuration(
        configuration.getString(
          AkkaOptions.STARTUP_TIMEOUT,
          TimeUtils.getStringInMillis(akkaAskTimeout.multipliedBy(10L)))))

    val transportHeartbeatIntervalDuration = TimeUtils.parseDuration(
      configuration.getString(AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL))

    val transportHeartbeatPauseDuration = TimeUtils.parseDuration(
      configuration.getString(AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE))

    validateHeartbeat(
      AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE.key(),
      transportHeartbeatPauseDuration,
      AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL.key(),
      transportHeartbeatIntervalDuration)

    val transportHeartbeatInterval = TimeUtils.getStringInMillis(transportHeartbeatIntervalDuration)

    val transportHeartbeatPause = TimeUtils.getStringInMillis(transportHeartbeatPauseDuration)

    val transportThreshold = configuration.getDouble(AkkaOptions.TRANSPORT_THRESHOLD)

    val akkaTCPTimeout = TimeUtils.getStringInMillis(
      TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT)))

    val akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE)

    val lifecycleEvents = configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)

    val logLifecycleEvents = if (lifecycleEvents) "on" else "off"

    val akkaEnableSSLConfig = configuration.getBoolean(AkkaOptions.SSL_ENABLED) &&
          SSLUtils.isInternalSSLEnabled(configuration)

    val retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR)

    val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off"

    val akkaSSLKeyStore = configuration.getString(
                              SecurityOptions.SSL_INTERNAL_KEYSTORE,
                              configuration.getString(SecurityOptions.SSL_KEYSTORE))

    val akkaSSLKeyStorePassword = configuration.getString(
                              SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD,
                              configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD))

    val akkaSSLKeyPassword = configuration.getString(
                              SecurityOptions.SSL_INTERNAL_KEY_PASSWORD,
                              configuration.getString(SecurityOptions.SSL_KEY_PASSWORD))

    val akkaSSLTrustStore = configuration.getString(
                              SecurityOptions.SSL_INTERNAL_TRUSTSTORE,
                              configuration.getString(SecurityOptions.SSL_TRUSTSTORE))

    val akkaSSLTrustStorePassword = configuration.getString(
                              SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD,
                              configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD))

    val akkaSSLCertFingerprintString = configuration.getString(
                              SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT)

    val akkaSSLCertFingerprints = if ( akkaSSLCertFingerprintString != null ) {
      akkaSSLCertFingerprintString.split(",").toList.mkString("[\"", "\",\"", "\"]")
    } else  "[]"

    val akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL)

    val akkaSSLAlgorithmsString = configuration.getString(SecurityOptions.SSL_ALGORITHMS)
    val akkaSSLAlgorithms = akkaSSLAlgorithmsString.split(",").toList.mkString("[", ",", "]")

    val clientSocketWorkerPoolPoolSizeMin =
      configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN)

    val clientSocketWorkerPoolPoolSizeMax =
      configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX)

    val clientSocketWorkerPoolPoolSizeFactor =
      configuration.getDouble(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR)

    val serverSocketWorkerPoolPoolSizeMin =
      configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN)

    val serverSocketWorkerPoolPoolSizeMax =
      configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX)

    val serverSocketWorkerPoolPoolSizeFactor =
      configuration.getDouble(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR)


    val configString =
      s"""
         |akka {
         |  actor {
         |    provider = "akka.remote.RemoteActorRefProvider"
         |  }
         |
         |  remote {
         |    startup-timeout = $startupTimeout
         |
         |    transport-failure-detector{
         |      acceptable-heartbeat-pause = $transportHeartbeatPause
         |      heartbeat-interval = $transportHeartbeatInterval
         |      threshold = $transportThreshold
         |    }
         |
         |    netty {
         |      tcp {
         |        transport-class = "akka.remote.transport.netty.NettyTransport"
         |        port = $externalPort
         |        bind-port = $port
         |        connection-timeout = $akkaTCPTimeout
         |        maximum-frame-size = $akkaFramesize
         |        tcp-nodelay = on
         |
         |        client-socket-worker-pool {
         |          pool-size-min = $clientSocketWorkerPoolPoolSizeMin
         |          pool-size-max = $clientSocketWorkerPoolPoolSizeMax
         |          pool-size-factor = $clientSocketWorkerPoolPoolSizeFactor
         |        }
         |
         |        server-socket-worker-pool {
         |          pool-size-min = $serverSocketWorkerPoolPoolSizeMin
         |          pool-size-max = $serverSocketWorkerPoolPoolSizeMax
         |          pool-size-factor = $serverSocketWorkerPoolPoolSizeFactor
         |        }
         |      }
         |    }
         |
         |    log-remote-lifecycle-events = $logLifecycleEvents
         |
         |    retry-gate-closed-for = ${retryGateClosedFor + " ms"}
         |  }
         |}
       """.stripMargin

    val effectiveHostname =
      if (normalizedExternalHostname != null && normalizedExternalHostname.nonEmpty) {
        normalizedExternalHostname
      } else {
        // if bindAddress is null or empty, then leave bindAddress unspecified. Akka will pick
        // InetAddress.getLocalHost.getHostAddress
        ""
      }

    val hostnameConfigString =
      s"""
         |akka {
         |  remote {
         |    netty {
         |      tcp {
         |        hostname = "$effectiveHostname"
         |        bind-hostname = "$bindAddress"
         |      }
         |    }
         |  }
         |}
       """.stripMargin

    val sslConfigString = if (akkaEnableSSLConfig) {
      s"""
         |akka {
         |  remote {
         |
         |    enabled-transports = ["akka.remote.netty.ssl"]
         |
         |    netty {
         |
         |      ssl = $${akka.remote.netty.tcp}
         |
         |      ssl {
         |
         |        enable-ssl = $akkaEnableSSL
         |        ssl-engine-provider = org.apache.flink.runtime.akka.CustomSSLEngineProvider
         |        security {
         |          key-store = "$akkaSSLKeyStore"
         |          key-store-password = "$akkaSSLKeyStorePassword"
         |          key-password = "$akkaSSLKeyPassword"
         |          trust-store = "$akkaSSLTrustStore"
         |          trust-store-password = "$akkaSSLTrustStorePassword"
         |          protocol = $akkaSSLProtocol
         |          enabled-algorithms = $akkaSSLAlgorithms
         |          random-number-generator = ""
         |          require-mutual-authentication = on
         |          cert-fingerprints = $akkaSSLCertFingerprints
         |        }
         |      }
         |    }
         |  }
         |}
       """.stripMargin
    }else{
      ""
    }

    ConfigFactory.parseString(configString + hostnameConfigString + sslConfigString).resolve()
  }