MobiusCore/Source/AsyncDispatchQueueConnectable.swift (47 lines of code) (raw):
// Copyright Spotify AB.
// SPDX-License-Identifier: Apache-2.0
import Foundation
/// A connectable adapter which imposes asynchronous dispatch blocks around calls to `accept`.
///
/// Creates `Connection`s that forward invocations to `accept` to a connection returned by the underlying connectable,
/// first switching to the provided `acceptQueue`. In other words, the real `accept` method will always be executed
/// asynchronously on the provided queue.
final class AsyncDispatchQueueConnectable<Input, Output>: Connectable {
private let underlyingConnectable: AnyConnectable<Input, Output>
private let acceptQueue: DispatchQueue
init(
_ underlyingConnectable: AnyConnectable<Input, Output>,
acceptQueue: DispatchQueue
) {
self.underlyingConnectable = underlyingConnectable
self.acceptQueue = acceptQueue
}
convenience init<C: Connectable>(
_ underlyingConnectable: C,
acceptQueue: DispatchQueue
) where C.Input == Input, C.Output == Output {
self.init(AnyConnectable(underlyingConnectable), acceptQueue: acceptQueue)
}
func connect(_ consumer: @escaping Consumer<Output>) -> Connection<Input> {
// Synchronized values protect against state changes within the critical regions that are accessed on both the
// loop queue and the accept queue. An optional consumer allows for clearing the reference when it is no longer
// valid.
let disposalStatus = Synchronized(value: false)
let protectedConsumer = Synchronized<Consumer<Output>?>(value: consumer)
let connection = underlyingConnectable.connect { value in
protectedConsumer.read { consumer in
guard let consumer = consumer else {
MobiusHooks.errorHandler("cannot consume value after dispose", #file, #line)
}
consumer(value)
}
}
return Connection(
acceptClosure: { [acceptQueue] input in
acceptQueue.async {
// Prevents forwarding if the connection has since been disposed.
disposalStatus.read { disposed in
guard !disposed else { return }
connection.accept(input)
}
}
},
disposeClosure: {
guard disposalStatus.compareAndSwap(expected: false, with: true) else {
MobiusHooks.errorHandler("cannot dispose more than once", #file, #line)
}
connection.dispose()
protectedConsumer.value = nil
}
)
}
}