in folsom/src/main/java/com/spotify/folsom/ketama/ResolvingKetamaClient.java [96:155]
public void resolve() {
synchronized (sync) {
if (shutdown) {
return;
}
long ttl = this.ttl; // Default ttl to use if resolve fails
try {
final List<Resolver.ResolveResult> lookupResults = resolver.resolve();
if (lookupResults.isEmpty()) {
// Just ignore empty results
return;
}
final Set<HostAndPort> newAddresses =
lookupResults
.stream()
.map(result -> HostAndPort.fromParts(result.getHost(), result.getPort()))
.collect(Collectors.toSet());
final long resolvedTtl =
lookupResults
.stream()
.mapToLong(Resolver.ResolveResult::getTtl)
.min()
.orElse(Long.MAX_VALUE);
ttl = Math.min(ttl, resolvedTtl);
final Set<HostAndPort> currentAddresses = clients.keySet();
if (!newAddresses.equals(currentAddresses)) {
final ImmutableSet<HostAndPort> toRemove =
Sets.difference(currentAddresses, newAddresses).immutableCopy();
final Sets.SetView<HostAndPort> toAdd = Sets.difference(newAddresses, currentAddresses);
if (!toAdd.isEmpty()) {
log.info("Connecting to " + toAdd);
}
if (!toRemove.isEmpty()) {
log.info("Scheduling disconnect from " + toRemove);
}
for (final HostAndPort host : toAdd) {
final RawMemcacheClient newClient = connector.connect(host);
newClient.registerForConnectionChanges(listener);
clients.put(host, newClient);
}
final ImmutableList.Builder<RawMemcacheClient> removedClients = ImmutableList.builder();
for (final HostAndPort host : toRemove) {
final RawMemcacheClient removed = clients.remove(host);
removed.unregisterForConnectionChanges(listener);
removedClients.add(removed);
}
setPendingClient(removedClients);
}
} finally {
long delay = clamp(MIN_RESOLVE_WAIT_TIME, MAX_RESOLVE_WAIT_TIME, ttl);
refreshJob = this.executor.schedule(this::resolve, delay, TimeUnit.SECONDS);
}
}
}