Sources/XCRemoteCache/Commands/Plugins/Thinning/Parallelization/DispatchGroupParallelizationWorker.swift (66 lines of code) (raw):
// Copyright (c) 2021 Spotify AB.
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import Foundation
enum WorkerResult {
case successes
case errors([Error])
}
/// Worker that manages executing blocks
protocol Worker {
/// Adding an action to run in parallel
/// - Parameter action: action to perform
func appendAction(_ action: @escaping () throws -> Void)
/// Wait for actions to finish
/// - Returns: execution result of all appended actions
func waitForResult() -> WorkerResult
}
/// Worker that executes actions in pararell using DispatchGroup
/// Warning! This implementation is not thread safe: all functions have to be called from the same thread
class DispatchGroupParallelizationWorker: Worker {
private let group: DispatchGroup
private let queue: DispatchQueue
private let qos: DispatchQoS.QoSClass
private var observedErrors: [Error]
/// Default initializer
/// - Parameter qos: QoS of the background queue to execute actions
init(qos: DispatchQoS.QoSClass = .userInteractive) {
group = DispatchGroup()
queue = DispatchQueue(
label: "DispatchGroupParallelization",
qos: .userInteractive,
attributes: .concurrent,
autoreleaseFrequency: .inherit,
target: .global(qos: qos)
)
observedErrors = []
self.qos = qos
}
func appendAction(_ action: @escaping () throws -> Void) {
group.enter()
queue.async {
do {
try action()
} catch {
// Errors are not expected to be frequent so just enqueing another block to the working group
self.group.enter()
self.queue.async(group: self.group, qos: self.qos.dispatchQoS, flags: .barrier) {
self.observedErrors.append(error)
self.group.leave()
}
}
self.group.leave()
}
}
func waitForResult() -> WorkerResult {
group.wait()
if observedErrors.isEmpty {
return .successes
}
defer {
observedErrors = []
}
return .errors(observedErrors)
}
}
extension DispatchQoS.QoSClass {
/// Trivial transform from DispatchQoS.QoSClass to DispatchQoS
var dispatchQoS: DispatchQoS {
switch self {
case .background: return .background
case .default: return .default
case .unspecified: return .unspecified
case .userInitiated: return .userInitiated
case .userInteractive: return .userInteractive
case .utility: return .utility
@unknown default:
return .default
}
}
}