Sources/XCMetricsBackendLib/UploadMetrics/Repository/PostgreSQLJobLogRepository.swift (152 lines of code) (raw):
// Copyright (c) 2020 Spotify AB.
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import Foundation
import Fluent
import FluentSQL
import NIO
/// PostgreSQL implementation of `JobLogRepository`
struct PostgreSQLJobLogRepository: JobLogRepository {
let db: Database
func create(_ jobLogEntry: JobLogEntry, using eventLoop: EventLoop) -> EventLoopFuture<Void> {
return jobLogEntry.save(on: db)
}
func update(_ id: String, status: JobLogStatus, error: Error?,
using eventLoop: EventLoop) -> EventLoopFuture<Void> {
return JobLogEntry.find(id, on: db)
.flatMap { entry -> EventLoopFuture<Void> in
if let entry = entry {
return update(entry, status: status, error: error, using: eventLoop)
} else {
// In case the `Job` was not a `ProcessMetricsJob` we don't do anything
// This could happen in the future if we add more `Job` types
return eventLoop.future()
}
}
}
func getDashboardFrom(_ from: Date, to: Date, using eventLoop: EventLoop) -> EventLoopFuture<JobDashboard> {
return getCountByStatusFrom(from, to: to, using: eventLoop)
.map { values -> JobDashboard in
let failed: Int = values.filter { $0.key == JobLogStatus.failed.rawValue }.first?.value ?? 0
let successful: Int = values.filter { $0.key == JobLogStatus.successful.rawValue }.first?.value ?? 0
let running: Int = values.filter { $0.key == JobLogStatus.running.rawValue }.first?.value ?? 0
let pending: Int = values.filter { $0.key == JobLogStatus.pending.rawValue }.first?.value ?? 0
return JobDashboard(from: from,
to: to,
successful: successful,
running: running,
failed: failed,
pending: pending,
averageTime: 0.0)
}.flatMap { jobDashboard -> EventLoopFuture<JobDashboard> in
self.getAverageExecutionTimeFrom(from, to: to, using: eventLoop)
.map { jobDashboard.with(averageTime: $0?.result) }
}.flatMap { jobDashboard -> EventLoopFuture<JobDashboard> in
return self.getAveragesPerHour(from, to: to, using: eventLoop)
.and(self.getThroughputPerHours(from, to: to, using: eventLoop))
.map { jobDashboard.with(averageTimes: $0.0, throughput: $0.0) }
}
}
func getJobs(params: JobListRequest, on db: Database) -> EventLoopFuture<Page<JobLogEntry>> {
let query = JobLogEntry.query(on: db)
.filter(\.$createdAt >= params.from)
.filter(\.$createdAt <= params.to)
if let status = params.status {
query.filter(\.$status == status)
}
if let filter = params.filter {
query.filter(\.$logFile ~~ filter)
}
return query.sort(\.$createdAt, .descending)
.paginate(PageRequest(page: params.page, per: params.per))
}
private func update(_ entry: JobLogEntry, status: JobLogStatus,
error: Error?, using eventLoop: EventLoop) -> EventLoopFuture<Void> {
entry.status = status
switch status {
case .running:
entry.dequeuedAt = Date()
case .successful:
entry.finishedAt = Date()
case .failed:
entry.error = error?.localizedDescription ?? "Unknown error"
entry.finishedAt = Date()
case .pending:
entry.queuedAt = Date()
}
return entry.update(on: db)
}
private func getCountByStatusFrom(_ from: Date, to: Date, using eventLoop: EventLoop) -> EventLoopFuture<[CountResult]> {
let query: SQLQueryString = """
SELECT
status as key, count(*) as value
FROM
\(raw: JobLogEntry.schema)
WHERE
created_at BETWEEN \(bind: from) AND \(bind: to)
GROUP BY
status;
"""
guard let sql = db as? SQLDatabase else {
return eventLoop.makeFailedFuture(RepositoryError.unexpected(message: "The database is not SQL"))
}
return sql.raw(query).all(decoding: CountResult.self)
}
private func getAverageExecutionTimeFrom(_ from: Date, to: Date, using eventLoop: EventLoop) -> EventLoopFuture<AgreggatedResult?> {
let query: SQLQueryString = """
SELECT
COALESCE(AVG(EXTRACT(EPOCH FROM (finished_at - dequeued_at))), 0.0) as "result"
FROM
\(raw: JobLogEntry.schema)
WHERE
created_at BETWEEN \(bind: from) AND \(bind: to)
AND
finished_at IS NOT NULL;
"""
guard let sql = db as? SQLDatabase else {
return eventLoop.makeFailedFuture(RepositoryError.unexpected(message: "The database is not SQL"))
}
return sql.raw(query).first(decoding: AgreggatedResult.self)
}
private func getThroughputPerHours(_ from: Date, to: Date, using eventLoop: EventLoop) -> EventLoopFuture<[ChartTimeSeries]> {
let query: SQLQueryString = """
SELECT
TO_CHAR(finished_at, 'dd/MM/yyyy HH24:00') as "key",
COUNT(*) * 1.0 AS "value"
FROM
\(raw: JobLogEntry.schema)
WHERE
finished_at IS NOT NULL
AND
finished_at BETWEEN \(bind: from) AND \(bind: to)
GROUP BY
TO_CHAR(finished_at, 'dd/MM/yyyy HH24:00')
;
"""
guard let sql = db as? SQLDatabase else {
return eventLoop.makeFailedFuture(RepositoryError.unexpected(message: "The database is not SQL"))
}
return sql.raw(query).all(decoding: ChartTimeSeries.self)
}
private func getAveragesPerHour(_ from: Date, to: Date, using eventLoop: EventLoop) -> EventLoopFuture<[ChartTimeSeries]> {
let query: SQLQueryString = """
SELECT
TO_CHAR(finished_at, 'dd/MM/yyyy HH24:00') AS "key",
AVG(EXTRACT(EPOCH FROM (finished_at - dequeued_at))) AS "value"
FROM
\(raw: JobLogEntry.schema)
WHERE
finished_at IS NOT NULL
AND
finished_at BETWEEN \(bind: from) AND \(bind: to)
GROUP BY
TO_CHAR(finished_at, 'dd/MM/yyyy HH24:00')
;
"""
guard let sql = db as? SQLDatabase else {
return eventLoop.makeFailedFuture(RepositoryError.unexpected(message: "The database is not SQL"))
}
return sql.raw(query).all(decoding: ChartTimeSeries.self)
}
/// Struct used to decode queries aggregated results
private struct AgreggatedResult: Decodable {
var result: Double
}
/// Struct used to decode queries results that returs counters
private struct CountResult: Decodable {
var key: String
var value: Int
}
}