in MobiusCore/Source/AsyncDispatchQueueConnectable.swift [30:65]
func connect(_ consumer: @escaping Consumer<Output>) -> Connection<Input> {
// Synchronized values protect against state changes within the critical regions that are accessed on both the
// loop queue and the accept queue. An optional consumer allows for clearing the reference when it is no longer
// valid.
let disposalStatus = Synchronized(value: false)
let protectedConsumer = Synchronized<Consumer<Output>?>(value: consumer)
let connection = underlyingConnectable.connect { value in
protectedConsumer.read { consumer in
guard let consumer = consumer else {
MobiusHooks.errorHandler("cannot consume value after dispose", #file, #line)
}
consumer(value)
}
}
return Connection(
acceptClosure: { [acceptQueue] input in
acceptQueue.async {
// Prevents forwarding if the connection has since been disposed.
disposalStatus.read { disposed in
guard !disposed else { return }
connection.accept(input)
}
}
},
disposeClosure: {
guard disposalStatus.compareAndSwap(expected: false, with: true) else {
MobiusHooks.errorHandler("cannot dispose more than once", #file, #line)
}
connection.dispose()
protectedConsumer.value = nil
}
)
}