Sources/Confidence/Apply/FlagApplierWithRetries.swift (146 lines of code) (raw):
import Foundation
import os
typealias ApplyFlagHTTPResponse = HttpClientResponse<ApplyFlagsResponse>
typealias ApplyFlagResult = Result<ApplyFlagHTTPResponse, Error>
final class FlagApplierWithRetries: FlagApplier {
private let storage: Storage
private let httpClient: HttpClient
private let options: ConfidenceClientOptions
private let cacheDataInteractor: CacheDataActor
private let metadata: ConfidenceMetadata
private let debugLogger: DebugLogger?
init(
httpClient: HttpClient,
storage: Storage,
options: ConfidenceClientOptions,
metadata: ConfidenceMetadata,
cacheDataInteractor: CacheDataActor? = nil,
triggerBatch: Bool = true,
debugLogger: DebugLogger? = nil
) {
self.storage = storage
self.httpClient = httpClient
self.options = options
self.metadata = metadata
self.debugLogger = debugLogger
let storedData = try? storage.load(defaultValue: CacheData.empty())
self.cacheDataInteractor = cacheDataInteractor ?? CacheDataInteractor(cacheData: storedData ?? .empty())
if triggerBatch {
Task(priority: .utility) {
await self.triggerBatch()
}
}
}
public func apply(flagName: String, resolveToken: String) async {
let applyTime = Date.backport.now
let (data, added) = await cacheDataInteractor.add(
resolveToken: resolveToken,
flagName: flagName,
applyTime: applyTime
)
guard added == true else {
// If record is found in the cache, early return (de-duplication).
// Triggerring batch apply in case if there are any unsent events stored
await triggerBatch()
return
}
debugLogger?.logFlags(action: "Apply", flag: flagName)
self.writeToFile(data: data)
await triggerBatch()
}
// MARK: private
private func triggerBatch() async {
let cacheData = await cacheDataInteractor.cache
await cacheData.resolveEvents.asyncForEach { resolveEvent in
let appliesToSend = resolveEvent.events.filter { $0.status == .created }
.chunk(size: 20)
guard appliesToSend.isEmpty == false else {
return
}
await appliesToSend.asyncForEach { chunk in
await self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .sending)
let success = await executeApply(
resolveToken: resolveEvent.resolveToken,
items: chunk
)
guard success else {
await self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .created)
return
}
// Set 'sent' property of apply events to true
await self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .sent)
}
}
}
private func writeStatus(resolveToken: String, events: [FlagApply], status: ApplyEventStatus) async {
let lastIndex = events.count - 1
await events.enumerated().asyncForEach { index, event in
var data = await self.cacheDataInteractor.setEventStatus(
resolveToken: resolveToken,
name: event.name,
status: status
)
if index == lastIndex {
let unsentFlagApplies = data.resolveEvents.filter {
$0.isSent == false
}
data.resolveEvents = unsentFlagApplies
try? self.storage.save(data: data)
}
}
}
private func writeToFile(data: CacheData) {
try? storage.save(data: data)
}
private func executeApply(
resolveToken: String,
items: [FlagApply]
) async -> Bool {
let applyFlagRequestItems = items.map { applyEvent in
AppliedFlagRequestItem(
flag: applyEvent.name,
applyTime: applyEvent.applyTime
)
}
let request = ApplyFlagsRequest(
flags: applyFlagRequestItems,
sendTime: Date.backport.nowISOString,
clientSecret: options.credentials.getSecret(),
resolveToken: resolveToken,
sdk: Sdk(id: metadata.name, version: metadata.version)
)
let result = await performRequest(request: request)
switch result {
case .success:
return true
case .failure(let error):
self.logApplyError(error: error)
return false
}
}
private func performRequest(
request: ApplyFlagsRequest
) async -> ApplyFlagResult {
do {
return try await httpClient.post(path: ":apply", data: request)
} catch {
return .failure(handleError(error: error))
}
}
private func handleError(error: Error) -> Error {
if error is ConfidenceError {
return error
} else {
return ConfidenceError.grpcError(message: "\(error)")
}
}
private func logApplyError(error: Error) {
Logger(subsystem: "com.confidence.provider", category: "apply").error(
"Error while executing \"apply\": \(error)")
}
}
extension Sequence {
func asyncForEach(
_ transform: (Element) async throws -> Void
) async rethrows {
for element in self {
try await transform(element)
}
}
}