func dequeue()

in Sources/XCMetricsBackendLib/UploadMetrics/Jobs/ProcessMetricsJob.swift [49:99]


    func dequeue(_ context: QueueContext, _ payload: UploadMetricsRequest) -> EventLoopFuture<Void> {
        logWithTimestamp(context.logger, msg: "[ProcessMetricsJob] message dequeued")

        let eventLoop = context.application.eventLoopGroup.next()
        let promise = eventLoop.makePromise(of: Void.self)

        /// parsing is a blocking call, we execute it in a Dispatch Queue to not block the eventloop
        queue.async {
            self.semaphore.wait()

            defer {
                self.semaphore.signal()
            }
            let logFile: LogFile
            let buildMetrics: BuildMetrics
            do {
                logWithTimestamp(context.logger, msg: "[ProcessMetricsJob] fetching log from \(payload.logURL)")
                logFile = try self.logFileRepository.get(logURL: payload.logURL)
                logWithTimestamp(context.logger, msg: "[ProcessMetricsJob] log fetched to \(logFile.localURL)")
                buildMetrics = try MetricsProcessor.process(metricsRequest: payload,
                                                            logFile: logFile,
                                                            redactUserData: self.redactUserData)
            } catch {
                context.logger.error("[ProcessMetricsJob] error processing log from \(payload.logURL): \(error)")
                promise.fail(error)
                return
            }
            logWithTimestamp(context.logger, msg: "[ProcessMetricsJob] log parsed \(payload.logURL)")
            _ = self.metricsRepository.insertBuildMetrics(buildMetrics, using: eventLoop)
                .flatMapAlways { (result) -> EventLoopFuture<Void> in
                    var wasProcessed: Bool = false
                    switch result {
                    case .failure(let error):
                        context.logger.error("[ProcessMetricsJob] error inserting log from \(payload.logURL): \(error)")
                        wasProcessed = false
                        promise.fail(error)
                    case .success:
                        wasProcessed = true
                        promise.succeed(())
                    }
                    return self.removeLocalLog(logFile,
                                               wasProcessed: wasProcessed,
                                               using: eventLoop, context)
                }
                .map { _ -> Void in
                    context.logger.info("[ProcessMetricsJob] finished processing \(payload.logURL)")
                    return ()
                }
        }
        return promise.futureResult
    }