in scio-elasticsearch/common/src/main/scala/com/spotify/scio/elasticsearch/IndexAdmin.scala [66:169]
private def ensureIndex(
index: String,
typeMappings: TypeMapping,
client: ElasticsearchIndicesClient
): CreateIndexResponse =
client.create(CreateIndexRequest.of(_.index(index).mappings(typeMappings)))
/**
* Ensure that index is created. If index already exists or some other error occurs this results
* in a [[scala.util.Failure]].
*
* @param index
* index to be created
* @param typeMappings
*/
def ensureIndex(
esOptions: ElasticsearchOptions,
index: String,
typeMappings: TypeMapping
): Try[CreateIndexResponse] =
indicesClient(esOptions)(client => ensureIndex(index, typeMappings, client))
/**
* Delete index
*
* @param index
* to be deleted
* @param timeout
* defaults to 1 minute
* @return
* Failure or unacknowledged response if operation did not succeed
*/
private def removeIndex(
client: ElasticsearchIndicesClient,
index: String,
timeout: Time
): DeleteIndexResponse =
client.delete(DeleteIndexRequest.of(_.index(index).timeout(timeout)))
/**
* Delete index
*
* @param index
* to be deleted
* @param timeout
* defaults to 1 minute
* @return
* Failure or unacknowledged response if operation did not succeed
*/
def removeIndex(
esOptions: ElasticsearchOptions,
index: String,
timeout: Time
): Try[DeleteIndexResponse] =
indicesClient(esOptions)(client => removeIndex(client, index, timeout))
/**
* Add or update index alias with an option to remove the alias from all other indexes if it is
* already pointed to any.
*
* @param alias
* to be re-assigned
* @param indices
* Iterable of pairs (index, isWriteIndex) to point the alias to. Note: only one index can be
* assigned as write index.
* @param removePrevious
* When set to true, the indexAlias would be removed from all indices it was assigned to before
* adding new index alias assignment
*/
private def createOrUpdateAlias(
client: ElasticsearchIndicesClient,
indices: Iterable[(String, Boolean)],
alias: String,
removePrevious: Boolean,
timeout: Time
): UpdateAliasesResponse = {
val (writeIdx, idxs) = indices.toList
.partitionMap {
case (idx, true) => Left(idx)
case (idx, false) => Right(idx)
}
require(
writeIdx.size == 1,
"Only one index per alias can be assigned to be the write index at a time"
)
val actionsBuilder = List.newBuilder[Action]
actionsBuilder += Action.of(_.add(_.indices(writeIdx.asJava).isWriteIndex(true).alias(alias)))
actionsBuilder += Action.of(_.add(_.indices(idxs.asJava).isWriteIndex(false).alias(alias)))
if (removePrevious) {
val getAliasesResponse = client.getAlias(GetAliasRequest.of(_.name(alias)))
val indicesToRemove = getAliasesResponse.result().asScala.keys.toList
Logger.info(s"Removing alias $alias from ${indicesToRemove.mkString(", ")}")
actionsBuilder += Action.of(_.remove(_.indices(indicesToRemove.asJava).alias(alias)))
}
client.updateAliases(
UpdateAliasesRequest.of(_.actions(actionsBuilder.result().asJava).timeout(timeout))
)
}