in vandelay-bigtable/src/main/java/com/spotify/api/vandelay/bigtable/imports/VandelayBigTableImporter.java [109:149]
public Status addRows(
final VandelayBigTableImportConfiguration importConfiguration,
final List<VandelayBigTableRow> rows) {
try {
logger.info("Adding " + rows.size() + " rows");
logger.info("Getting data client");
final BigtableDataClient dataClient =
vandelayBigTableClient.getDataClient(importConfiguration);
final Map<String, List<VandelayBigTableRow>> tableToRows =
rows.stream()
.collect(
Collectors.groupingBy(
VandelayBigTableRow::getTable, Collectors.toCollection(ArrayList::new)));
logger.info("Split rows into " + tableToRows.size() + " entries");
for (final var tableToRow : tableToRows.entrySet()) {
final BulkMutation bulkMutation = BulkMutation.create(tableToRow.getKey());
for (final var tableRows : tableToRow.getValue()) {
for (final var cell : tableRows.getRowCells()) {
bulkMutation.add(
tableRows.getRowKey(),
Mutation.create()
.setCell(
cell.getColumnFamily(),
ByteString.copyFrom(cell.getQualifier().toDecodedBytes()),
cell.getTimestamp(),
ByteString.copyFrom(cell.getValue().toDecodedBytes())));
}
}
logger.info("Performing row mutation on table " + tableToRow.getKey());
dataClient.bulkMutateRows(bulkMutation);
}
return Status.success();
} catch (IOException ex) {
logger.error("IOException occurred creating rows");
return Status.fail(ex.getMessage(), ex);
}
}