Sources/XCMetricsBackendLib/UploadMetrics/Controllers/UploadController.swift (61 lines of code) (raw):

// Copyright (c) 2020 Spotify AB. // // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. import Vapor public struct UploadMetricsController: RouteCollection { /// 100 mb static let MAX_PAYLOAD_SIZE: ByteCount = 104857600 let fileLogRepository: LogFileRepository let redactUserData: Bool let metricsRepository: MetricsRepository let useAsyncProcessing: Bool init(fileLogRepository: LogFileRepository, redactUserData: Bool, metricsRepository: MetricsRepository, useAsyncProcessing: Bool) { self.fileLogRepository = fileLogRepository self.redactUserData = redactUserData self.metricsRepository = metricsRepository self.useAsyncProcessing = useAsyncProcessing } public func boot(routes: RoutesBuilder) throws { let builds = routes.grouped("v1") if useAsyncProcessing { builds.on(.PUT, "metrics", body: .collect(maxSize: Self.MAX_PAYLOAD_SIZE), use: create) } builds.on(.PUT, "metrics-sync", body: .collect(maxSize: Self.MAX_PAYLOAD_SIZE), use: createSync) } /// Gets a Request to process a Log and enqueues it to be processed by `ProcessMetricsJob` asynchronously. /// Basically acts as a Fire & Forget endpoint, which is faster that the Sync option. /// If the Backend is started with the option `XCMETRICS_USE_ASYNC_LOG_PROCESSING` turned off, /// this endpoint will not be available (Returns a `404` /// - Parameter req: Request with a valid`UploadMetricsPayload` /// - Throws: If the request is not a valid `UploadMetricsPayload` or there was an error storing the log /// - Returns: `200` HTTP Status if everything is ok. `400` if the request is not an `UploadMetricsPayload`, /// `404` if Async processing was turned off (`XCMETRICS_USE_ASYNC_LOG_PROCESSING`=0) /// `500` if there was an unexpected error public func create(req: Request) throws -> EventLoopFuture<HTTPStatus> { // The request contains a Multipart Nested request: one field of type Octect-stream with the actual log // and several of type `json`. Vapor lacks support for this type of content [Issue-1925](https://github.com/vapor/vapor/issues/1925) // We need to decode it as raw data and do the parsing manually // 1. Decode request as raw data let payload = try req.content.decode(UploadMetricsPayload.self) // Storing the log and decoding the request are blocking, we execute them in a background thread // to not block the eventloop return req.application.threadPool.runIfActive(eventLoop: req.eventLoop) { () -> UploadMetricsRequest in // 2. Store the log let logURL = try self.fileLogRepository.put(logFile: payload.log) // 3. Decode the JSON documents into `Codable` types guard let metricsRequest = try UploadMetricsRequest(logURL: logURL, payload: payload) else { throw Abort(.badRequest) } return metricsRequest }.flatMap { metricsRequest -> EventLoopFuture<HTTPStatus> in return req.queue.dispatch(ProcessMetricsJob.self, metricsRequest, maxRetryCount: 3) .transform(to: HTTPStatus.ok) } } /// Inserts the build metrics Synchronously which can be slow. Use only if the Async method is not available /// for instance, if running in CloudRun /// - Parameter req: Request with a valid`UploadMetricsPayload` /// - Throws: If the request is not a valid `UploadMetricsPayload` or there was an error parsing the Logs or inserting them in the database /// - Returns: `201` HTTP Status if everything is ok. `400` if the request is not an `UploadMetricsPayload`, `500` if there was an unexpected error public func createSync(req: Request) throws -> EventLoopFuture<HTTPStatus> { // The request contains a Multipart Nested request: one field of type Octect-stream with the actual log // and several of type `json`. Vapor lacks support for this type of content [Issue-1925](https://github.com/vapor/vapor/issues/1925) // We need to decode it as raw data and do the parsing manually // 1. Decode request as raw data let payload = try req.content.decode(UploadMetricsPayload.self) var logFileCopy: LogFile? = nil // Storing the log and decoding the request are blocking, we execute them in a background thread // to not block the eventloop return req.application.threadPool.runIfActive(eventLoop: req.eventLoop) { () -> BuildMetrics in // 2. Store the log let logURL = try self.fileLogRepository.put(logFile: payload.log) // 3. Decode the JSON documents into `Codable` types guard let metricsRequest = try UploadMetricsRequest(logURL: logURL, payload: payload) else { throw Abort(.badRequest) } // 4. Parse and process the metrics let logFile = try self.fileLogRepository.get(logURL: logURL) logFileCopy = logFile return try MetricsProcessor.process(metricsRequest: metricsRequest, logFile: logFile, redactUserData: self.redactUserData) }.flatMap { buildMetrics -> EventLoopFuture<HTTPStatus> in if let logFileCopy = logFileCopy { do { try self.fileLogRepository.delete(log: logFileCopy, wasProcessed: true) } catch { req.application.logger.error("Error deleting local log file \(error.localizedDescription)") } } return self.metricsRepository.insertBuildMetrics(buildMetrics, using: req.application.eventLoopGroup.next()) .transform(to: HTTPStatus.created) } } }