in finagle-postgresql/src/main/scala/com/twitter/finagle/postgresql/Client.scala [76:162]
def apply(
factory: ServiceFactory[Request, Response],
timeoutFn: () => Duration
)(
implicit timer: Timer
): Client = new Client {
private[this] val service = factory.toService
override def multiQuery(sql: String): Reader[QueryResponse] = {
val startTime = Time.now
val timeout = timeoutFn()
val deadline = startTime + timeout
val f = service(Request.Query(sql))
.flatMap {
case Response.SimpleQueryResponse(responses) =>
readAllWithin(responses, deadline)
val observedResponses = responses.map {
case r @ Response.ResultSet(_, reader, _) =>
readAllWithin(reader, deadline)
r
case r => r
}
Future.value(Response.SimpleQueryResponse(observedResponses))
case other =>
Future.exception(new IllegalStateException(s"invalid response $other"))
}
.map(_.responses)
.raiseWithin(timeout)
Reader.fromFuture(f).flatten
}
override def query(sql: String): Future[QueryResponse] =
// this uses an unnamed prepared statement to guarantee that the sql string only has one statement
prepare(Name.Unnamed, sql).query(Seq.empty)
override def prepare(sql: String): PreparedStatement =
prepare(Name.Named(MurmurHash3.stringHash(sql).toString), sql)
override def prepare(name: Name, sql: String): PreparedStatement = new PreparedStatement {
// NOTE: this assumes that caching is done down the stack so that named statements aren't re-prepared on the same connection
// The rationale is that it allows releasing the connection earlier at the expense
// of re-preparing statements on each connection and potentially more than once (but not every time)
override def query(parameters: Seq[Parameter[_]]): Future[QueryResponse] = {
val startTime = Time.now
val timeout = timeoutFn()
val deadline = startTime + timeout
factory()
.flatMap { svc =>
val params = svc(Request.ConnectionParameters).flatMap(Expect.ConnectionParameters)
val prepare = svc(Request.Prepare(sql, name)).flatMap(Expect.ParseComplete)
params
.join(prepare)
.flatMap {
case (params, prepared) =>
val values = (prepared.statement.parameterTypes zip parameters)
.map {
case (tpe, p) =>
p.encode(PgType.pgTypeByOid(tpe), params.parsedParameters.clientEncoding)
}
svc(Request.ExecutePortal(prepared.statement, values))
}
.flatMap {
case r @ Response.ResultSet(_, reader, _) =>
readAllWithin(reader, deadline)
Future.value(r)
case r: QueryResponse =>
Future.value(r)
case other =>
Future.exception(new IllegalStateException(s"invalid response $other"))
}
.ensure {
svc.close()
}
}
.raiseWithin(timeout)
}
}
override def close(deadline: Time): Future[Unit] = factory.close(deadline)
}