Sources/XCMetricsBackendLib/UploadMetrics/Jobs/ProcessMetricsJob.swift (75 lines of code) (raw):

// Copyright (c) 2020 Spotify AB. // // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. import Foundation import CryptoSwift import Queues /// Job that parses the uploaded xcactivitylog and inserts it into a Repository class ProcessMetricsJob: Job { typealias Payload = UploadMetricsRequest let logFileRepository: LogFileRepository let metricsRepository: MetricsRepository /// If true, the user data (userId, machineName) will be hashed and the /// User data will be redacted from the log let redactUserData: Bool /// Queue in which the logs will be processed let queue = DispatchQueue(label: "process.metrics.queue", qos: .userInitiated) /// Set concurrent processing of logs up to the number of cores let semaphore = DispatchSemaphore(value: ProcessInfo.processInfo.processorCount) init(logFileRepository: LogFileRepository, metricsRepository: MetricsRepository, redactUserData: Bool) { self.logFileRepository = logFileRepository self.metricsRepository = metricsRepository self.redactUserData = redactUserData } func dequeue(_ context: QueueContext, _ payload: UploadMetricsRequest) -> EventLoopFuture<Void> { logWithTimestamp(context.logger, msg: "[ProcessMetricsJob] message dequeued") let eventLoop = context.application.eventLoopGroup.next() let promise = eventLoop.makePromise(of: Void.self) /// parsing is a blocking call, we execute it in a Dispatch Queue to not block the eventloop queue.async { self.semaphore.wait() defer { self.semaphore.signal() } let logFile: LogFile let buildMetrics: BuildMetrics do { logWithTimestamp(context.logger, msg: "[ProcessMetricsJob] fetching log from \(payload.logURL)") logFile = try self.logFileRepository.get(logURL: payload.logURL) logWithTimestamp(context.logger, msg: "[ProcessMetricsJob] log fetched to \(logFile.localURL)") buildMetrics = try MetricsProcessor.process(metricsRequest: payload, logFile: logFile, redactUserData: self.redactUserData) } catch { context.logger.error("[ProcessMetricsJob] error processing log from \(payload.logURL): \(error)") promise.fail(error) return } logWithTimestamp(context.logger, msg: "[ProcessMetricsJob] log parsed \(payload.logURL)") _ = self.metricsRepository.insertBuildMetrics(buildMetrics, using: eventLoop) .flatMapAlways { (result) -> EventLoopFuture<Void> in var wasProcessed: Bool = false switch result { case .failure(let error): context.logger.error("[ProcessMetricsJob] error inserting log from \(payload.logURL): \(error)") wasProcessed = false promise.fail(error) case .success: wasProcessed = true promise.succeed(()) } return self.removeLocalLog(logFile, wasProcessed: wasProcessed, using: eventLoop, context) } .map { _ -> Void in context.logger.info("[ProcessMetricsJob] finished processing \(payload.logURL)") return () } } return promise.futureResult } private func removeLocalLog(_ log: LogFile, wasProcessed: Bool, using eventLoop: EventLoop, _ context: QueueContext) -> EventLoopFuture<Void> { return eventLoop.submit { () -> Void in do { try self.logFileRepository.delete(log: log, wasProcessed: wasProcessed) } catch { context.logger.error("[ProcessMetricsJob] Error removing log from \(log.localURL): \(error)") } } } }