MobiusCore/Source/ConnectablePublisher.swift (59 lines of code) (raw):
// Copyright Spotify AB.
// SPDX-License-Identifier: Apache-2.0
import Foundation
/// Internal class that provides a 'publisher' for connectables; that is, something that you can post values to, and
/// that will broadcast posted values to all connections. It also retains a current value, and will post that value to
/// new connections.
final class ConnectablePublisher<Value>: Disposable {
private let access: ConcurrentAccessDetector
private var connections = [UUID: Connection<Value>]()
private var currentValue: Value?
private var _disposed = false
init(accessGuard: ConcurrentAccessDetector = ConcurrentAccessDetector()) {
access = accessGuard
}
var disposed: Bool {
return access.guard { _disposed }
}
func post(_ value: Value) {
let connections: [Connection<Value>] = access.guard {
guard !disposed else {
// Callers are responsible for ensuring post is never entered after dispose.
MobiusHooks.errorHandler(
"ConnectablePublisher<\(Value.self)> cannot accept values when disposed",
#file,
#line
)
}
currentValue = value
return Array(self.connections.values)
}
// Note that we froze the list of connections in the sync block, but dispatch accept here to avoid any
// risk of recursion.
connections.forEach { $0.accept(value) }
}
@discardableResult
func connect(to outputConsumer: @escaping Consumer<Value>) -> Connection<Value> {
return access.guard { () -> Connection<Value> in
guard !_disposed else {
// Callers are responsible for ensuring connect is never entered after dispose.
MobiusHooks.errorHandler(
"ConnectablePublisher<\(Value.self)> cannot add connections when disposed",
#file,
#line
)
}
let uuid = UUID()
let connection = Connection(acceptClosure: outputConsumer, disposeClosure: { [weak self] in self?.removeConnection(for: uuid) })
self.connections[uuid] = connection
if let value = currentValue {
outputConsumer(value)
}
return connection
}
}
func dispose() {
let connections: [Connection<Value>] = access.guard {
guard !_disposed else { return [] }
_disposed = true
return Array(self.connections.values)
}
// Again, this has to be outside the sync block to avoid recursive locking – in this case, recursion into
// removeConnection().
connections.forEach { $0.dispose() }
}
private func removeConnection(for uuid: UUID) {
access.guard {
self.connections[uuid] = nil
}
}
}