Sources/Confidence/EventSenderEngine.swift (155 lines of code) (raw):
import Combine
import Foundation
protocol FlushPolicy {
func reset()
func hit(event: ConfidenceEvent)
func shouldFlush() -> Bool
}
protocol EventSenderEngine {
func emit(
eventName: String,
data: ConfidenceStruct,
context: ConfidenceStruct
) throws
func shutdown()
func flush()
}
final class EventSenderEngineImpl: EventSenderEngine {
private static let sendSignalName: String = "FLUSH"
private let storage: any EventStorage
private let writeReqChannel = PassthroughSubject<ConfidenceEvent, Never>()
private let uploadReqChannel = PassthroughSubject<String, Never>()
private var cancellables = Set<AnyCancellable>()
private let flushPolicies: [FlushPolicy]
private let uploader: ConfidenceClient
private let clientSecret: String
private let payloadMerger: PayloadMerger = PayloadMergerImpl()
private let semaphore = DispatchSemaphore(value: 1)
private let writeQueue: DispatchQueue
private let debugLogger: DebugLogger?
convenience init(
clientSecret: String,
uploader: ConfidenceClient,
storage: EventStorage,
debugLogger: DebugLogger?
) {
self.init(
clientSecret: clientSecret,
uploader: uploader,
storage: storage,
flushPolicies: [SizeFlushPolicy(batchSize: 10)],
writeQueue: DispatchQueue(label: "ConfidenceWriteQueue"),
debugLogger: debugLogger
)
}
init(
clientSecret: String,
uploader: ConfidenceClient,
storage: EventStorage,
flushPolicies: [FlushPolicy],
writeQueue: DispatchQueue,
debugLogger: DebugLogger?
) {
self.uploader = uploader
self.clientSecret = clientSecret
self.storage = storage
self.flushPolicies = flushPolicies + [ManualFlushPolicy()]
self.writeQueue = writeQueue
self.debugLogger = debugLogger
writeReqChannel
.receive(on: self.writeQueue)
.sink { [weak self] event in
guard let self = self else { return }
if event.name != manualFlushEvent.name { // skip storing flush events.
do {
try self.storage.writeEvent(event: event)
} catch {
}
}
self.flushPolicies.forEach { policy in policy.hit(event: event) }
let shouldFlush = self.flushPolicies.contains { policy in policy.shouldFlush() }
if shouldFlush {
self.uploadReqChannel.send(EventSenderEngineImpl.sendSignalName)
self.flushPolicies.forEach { policy in policy.reset() }
}
}
.store(in: &cancellables)
uploadReqChannel.sink { [weak self] _ in
guard let self = self else { return }
await self.upload()
}
.store(in: &cancellables)
}
func upload() async {
await withSemaphore { [weak self] in
guard let self = self else { return }
do {
try self.storage.startNewBatch()
let ids = try storage.batchReadyIds()
if ids.isEmpty {
return
}
for id in ids {
let events: [NetworkEvent] = try self.storage.eventsFrom(id: id)
.compactMap { event in
return NetworkEvent(
eventDefinition: event.name,
payload: NetworkStruct(fields: TypeMapper.convert(structure: event.payload).fields),
eventTime: Date.backport.toISOString(date: event.eventTime))
}
var shouldCleanup = false
if events.isEmpty {
shouldCleanup = true
} else {
shouldCleanup = try await self.uploader.upload(events: events)
}
if shouldCleanup {
try storage.remove(id: id)
}
}
} catch {
}
}
}
func withSemaphore(callback: @escaping () async -> Void) async {
await withCheckedContinuation { continuation in
DispatchQueue.global().async {
self.semaphore.wait()
continuation.resume()
}
}
await callback()
semaphore.signal()
}
func emit(
eventName: String,
data: ConfidenceStruct,
context: ConfidenceStruct
) throws {
let event = ConfidenceEvent(
name: eventName,
payload: try payloadMerger.merge(context: context, data: data),
eventTime: Date.backport.now)
writeReqChannel.send(event)
debugLogger?.logEvent(action: "Emitting event", event: event)
}
func flush() {
writeReqChannel.send(manualFlushEvent)
debugLogger?.logEvent(action: "Event flushed", event: nil)
}
func shutdown() {
for cancellable in cancellables {
cancellable.cancel()
}
cancellables.removeAll()
}
}
private extension Publisher where Self.Failure == Never {
func sink(receiveValue: @escaping ((Self.Output) async -> Void)) -> AnyCancellable {
sink { value in
Task {
await receiveValue(value)
}
}
}
}