private def ensureIndex()

in scio-elasticsearch/common/src/main/scala/com/spotify/scio/elasticsearch/IndexAdmin.scala [66:169]


  private def ensureIndex(
    index: String,
    typeMappings: TypeMapping,
    client: ElasticsearchIndicesClient
  ): CreateIndexResponse =
    client.create(CreateIndexRequest.of(_.index(index).mappings(typeMappings)))

  /**
   * Ensure that index is created. If index already exists or some other error occurs this results
   * in a [[scala.util.Failure]].
   *
   * @param index
   *   index to be created
   * @param typeMappings
   */
  def ensureIndex(
    esOptions: ElasticsearchOptions,
    index: String,
    typeMappings: TypeMapping
  ): Try[CreateIndexResponse] =
    indicesClient(esOptions)(client => ensureIndex(index, typeMappings, client))

  /**
   * Delete index
   *
   * @param index
   *   to be deleted
   * @param timeout
   *   defaults to 1 minute
   * @return
   *   Failure or unacknowledged response if operation did not succeed
   */
  private def removeIndex(
    client: ElasticsearchIndicesClient,
    index: String,
    timeout: Time
  ): DeleteIndexResponse =
    client.delete(DeleteIndexRequest.of(_.index(index).timeout(timeout)))

  /**
   * Delete index
   *
   * @param index
   *   to be deleted
   * @param timeout
   *   defaults to 1 minute
   * @return
   *   Failure or unacknowledged response if operation did not succeed
   */
  def removeIndex(
    esOptions: ElasticsearchOptions,
    index: String,
    timeout: Time
  ): Try[DeleteIndexResponse] =
    indicesClient(esOptions)(client => removeIndex(client, index, timeout))

  /**
   * Add or update index alias with an option to remove the alias from all other indexes if it is
   * already pointed to any.
   *
   * @param alias
   *   to be re-assigned
   * @param indices
   *   Iterable of pairs (index, isWriteIndex) to point the alias to. Note: only one index can be
   *   assigned as write index.
   * @param removePrevious
   *   When set to true, the indexAlias would be removed from all indices it was assigned to before
   *   adding new index alias assignment
   */
  private def createOrUpdateAlias(
    client: ElasticsearchIndicesClient,
    indices: Iterable[(String, Boolean)],
    alias: String,
    removePrevious: Boolean,
    timeout: Time
  ): UpdateAliasesResponse = {

    val (writeIdx, idxs) = indices.toList
      .partitionMap {
        case (idx, true)  => Left(idx)
        case (idx, false) => Right(idx)
      }

    require(
      writeIdx.size == 1,
      "Only one index per alias can be assigned to be the write index at a time"
    )

    val actionsBuilder = List.newBuilder[Action]
    actionsBuilder += Action.of(_.add(_.indices(writeIdx.asJava).isWriteIndex(true).alias(alias)))
    actionsBuilder += Action.of(_.add(_.indices(idxs.asJava).isWriteIndex(false).alias(alias)))

    if (removePrevious) {
      val getAliasesResponse = client.getAlias(GetAliasRequest.of(_.name(alias)))
      val indicesToRemove = getAliasesResponse.result().asScala.keys.toList
      Logger.info(s"Removing alias $alias from ${indicesToRemove.mkString(", ")}")

      actionsBuilder += Action.of(_.remove(_.indices(indicesToRemove.asJava).alias(alias)))
    }

    client.updateAliases(
      UpdateAliasesRequest.of(_.actions(actionsBuilder.result().asJava).timeout(timeout))
    )
  }