in Sources/XCMetricsBackendLib/UploadMetrics/Jobs/ProcessMetricsJob.swift [49:99]
func dequeue(_ context: QueueContext, _ payload: UploadMetricsRequest) -> EventLoopFuture<Void> {
logWithTimestamp(context.logger, msg: "[ProcessMetricsJob] message dequeued")
let eventLoop = context.application.eventLoopGroup.next()
let promise = eventLoop.makePromise(of: Void.self)
/// parsing is a blocking call, we execute it in a Dispatch Queue to not block the eventloop
queue.async {
self.semaphore.wait()
defer {
self.semaphore.signal()
}
let logFile: LogFile
let buildMetrics: BuildMetrics
do {
logWithTimestamp(context.logger, msg: "[ProcessMetricsJob] fetching log from \(payload.logURL)")
logFile = try self.logFileRepository.get(logURL: payload.logURL)
logWithTimestamp(context.logger, msg: "[ProcessMetricsJob] log fetched to \(logFile.localURL)")
buildMetrics = try MetricsProcessor.process(metricsRequest: payload,
logFile: logFile,
redactUserData: self.redactUserData)
} catch {
context.logger.error("[ProcessMetricsJob] error processing log from \(payload.logURL): \(error)")
promise.fail(error)
return
}
logWithTimestamp(context.logger, msg: "[ProcessMetricsJob] log parsed \(payload.logURL)")
_ = self.metricsRepository.insertBuildMetrics(buildMetrics, using: eventLoop)
.flatMapAlways { (result) -> EventLoopFuture<Void> in
var wasProcessed: Bool = false
switch result {
case .failure(let error):
context.logger.error("[ProcessMetricsJob] error inserting log from \(payload.logURL): \(error)")
wasProcessed = false
promise.fail(error)
case .success:
wasProcessed = true
promise.succeed(())
}
return self.removeLocalLog(logFile,
wasProcessed: wasProcessed,
using: eventLoop, context)
}
.map { _ -> Void in
context.logger.info("[ProcessMetricsJob] finished processing \(payload.logURL)")
return ()
}
}
return promise.futureResult
}