in syndicate/connection/dynamo_connection.py [0:0]
def update_global_indexes(self, table_name, global_indexes_meta,
existing_global_indexes, table_read_capacity,
table_write_capacity, existing_capacity_mode):
""" Creates, Deletes or Updates global indexes for the table
:param table_name: DynamoDB table name
:type table_name: str
:param global_indexes_meta: list of global indexes definitions in meta
:type global_indexes_meta: list
:param existing_global_indexes: list of global indexes currently
existing in table
:type existing_global_indexes: list
:param table_read_capacity: table read capacity defined in meta
:type table_read_capacity: int
:param table_write_capacity: table write capacity defined in meta
:type table_write_capacity: int
:param existing_capacity_mode: capacity mode currently set in the table
:type existing_capacity_mode: str
:returns None
"""
gsi_names = [gsi.get('name') for gsi in global_indexes_meta]
existing_gsi_names = [gsi.get('IndexName')
for gsi in existing_global_indexes]
global_indexes_to_delete = []
for gsi in existing_global_indexes:
if gsi.get('IndexName') not in gsi_names:
global_indexes_to_delete.append(gsi)
global_indexes_to_create = []
for gsi in global_indexes_meta:
if gsi.get('name') not in existing_gsi_names:
global_indexes_to_create.append(gsi)
global_indexes_to_update_capacity = []
# AWS handles changing gsi capacity mode from provisioned to on-demand,
# so we don't have to
if existing_capacity_mode == 'PROVISIONED':
for gsi in global_indexes_meta:
for existing_index in existing_global_indexes:
if gsi['name'] == existing_index['IndexName']:
existing_read_capacity = existing_index['ProvisionedThroughput']['ReadCapacityUnits']
existing_write_capacity = \
existing_index['ProvisionedThroughput']['WriteCapacityUnits']
read_capacity_meta = \
gsi.get('read_capacity') or table_read_capacity
write_capacity_meta = \
gsi.get('write_capacity') or table_write_capacity
# add indexes with different capacity values for update
if existing_read_capacity != read_capacity_meta \
or existing_write_capacity != write_capacity_meta:
gsi.update(
dict(old_read_capacity=existing_read_capacity,
old_write_capacity=existing_write_capacity))
global_indexes_to_update_capacity.append(gsi)
break
for gsi in global_indexes_to_delete:
index_name = gsi.get('IndexName')
self.delete_global_secondary_index(table_name=table_name,
index_name=index_name)
self._wait_for_index_update(table_name, index_name)
_LOG.info(
'Removed global secondary index {0} for table {1}'.format(
index_name, table_name))
for gsi in global_indexes_to_create:
read_capacity = \
gsi.get('read_capacity') or table_read_capacity
write_capacity = \
gsi.get('write_capacity') or table_write_capacity
self.create_global_secondary_index(
table_name=table_name, index_meta=gsi,
existing_capacity_mode=existing_capacity_mode,
read_throughput=read_capacity, write_throughput=write_capacity)
self._wait_for_index_update(table_name, gsi.get('name'))
_LOG.info(
'Created global secondary index {0} for table {1}'.format(
gsi.get('name'), table_name))
for gsi in global_indexes_to_update_capacity:
read_capacity = \
gsi.get('read_capacity') or table_read_capacity
write_capacity = \
gsi.get('write_capacity') or table_write_capacity
self.update_global_secondary_index(
table_name=table_name, index_name=gsi.get('name'),
read_throughput=read_capacity, write_throughput=write_capacity)
_LOG.info(
'Updated global secondary index {0} for table {1}. '
'Updated read capacity from {2} to {3}, '
'write capacity from {4} to {5}'.format(
gsi.get('name'), table_name,
gsi.get('old_read_capacity'), read_capacity,
gsi.get('old_write_capacity'), write_capacity))