func connect()

in MobiusCore/Source/AsyncDispatchQueueConnectable.swift [30:65]


    func connect(_ consumer: @escaping Consumer<Output>) -> Connection<Input> {
        // Synchronized values protect against state changes within the critical regions that are accessed on both the
        // loop queue and the accept queue. An optional consumer allows for clearing the reference when it is no longer
        // valid.
        let disposalStatus = Synchronized(value: false)
        let protectedConsumer = Synchronized<Consumer<Output>?>(value: consumer)

        let connection = underlyingConnectable.connect { value in
            protectedConsumer.read { consumer in
                guard let consumer = consumer else {
                    MobiusHooks.errorHandler("cannot consume value after dispose", #file, #line)
                }
                consumer(value)
            }
        }

        return Connection(
            acceptClosure: { [acceptQueue] input in
                acceptQueue.async {
                    // Prevents forwarding if the connection has since been disposed.
                    disposalStatus.read { disposed in
                        guard !disposed else { return }
                        connection.accept(input)
                    }
                }
            },
            disposeClosure: {
                guard disposalStatus.compareAndSwap(expected: false, with: true) else {
                    MobiusHooks.errorHandler("cannot dispose more than once", #file, #line)
                }

                connection.dispose()
                protectedConsumer.value = nil
            }
        )
    }