def apply()

in finagle-postgresql/src/main/scala/com/twitter/finagle/postgresql/Client.scala [76:162]


  def apply(
    factory: ServiceFactory[Request, Response],
    timeoutFn: () => Duration
  )(
    implicit timer: Timer
  ): Client = new Client {

    private[this] val service = factory.toService

    override def multiQuery(sql: String): Reader[QueryResponse] = {
      val startTime = Time.now
      val timeout = timeoutFn()
      val deadline = startTime + timeout

      val f = service(Request.Query(sql))
        .flatMap {
          case Response.SimpleQueryResponse(responses) =>
            readAllWithin(responses, deadline)

            val observedResponses = responses.map {
              case r @ Response.ResultSet(_, reader, _) =>
                readAllWithin(reader, deadline)
                r
              case r => r
            }

            Future.value(Response.SimpleQueryResponse(observedResponses))
          case other =>
            Future.exception(new IllegalStateException(s"invalid response $other"))
        }
        .map(_.responses)
        .raiseWithin(timeout)

      Reader.fromFuture(f).flatten
    }

    override def query(sql: String): Future[QueryResponse] =
      // this uses an unnamed prepared statement to guarantee that the sql string only has one statement
      prepare(Name.Unnamed, sql).query(Seq.empty)

    override def prepare(sql: String): PreparedStatement =
      prepare(Name.Named(MurmurHash3.stringHash(sql).toString), sql)

    override def prepare(name: Name, sql: String): PreparedStatement = new PreparedStatement {
      // NOTE: this assumes that caching is done down the stack so that named statements aren't re-prepared on the same connection
      //   The rationale is that it allows releasing the connection earlier at the expense
      //   of re-preparing statements on each connection and potentially more than once (but not every time)
      override def query(parameters: Seq[Parameter[_]]): Future[QueryResponse] = {
        val startTime = Time.now
        val timeout = timeoutFn()
        val deadline = startTime + timeout

        factory()
          .flatMap { svc =>
            val params = svc(Request.ConnectionParameters).flatMap(Expect.ConnectionParameters)
            val prepare = svc(Request.Prepare(sql, name)).flatMap(Expect.ParseComplete)

            params
              .join(prepare)
              .flatMap {
                case (params, prepared) =>
                  val values = (prepared.statement.parameterTypes zip parameters)
                    .map {
                      case (tpe, p) =>
                        p.encode(PgType.pgTypeByOid(tpe), params.parsedParameters.clientEncoding)
                    }
                  svc(Request.ExecutePortal(prepared.statement, values))
              }
              .flatMap {
                case r @ Response.ResultSet(_, reader, _) =>
                  readAllWithin(reader, deadline)
                  Future.value(r)
                case r: QueryResponse =>
                  Future.value(r)
                case other =>
                  Future.exception(new IllegalStateException(s"invalid response $other"))
              }
              .ensure {
                svc.close()
              }
          }
          .raiseWithin(timeout)
      }
    }

    override def close(deadline: Time): Future[Unit] = factory.close(deadline)
  }