def write[T]()

in finagle-postgresql/src/it/scala/com/twitter/finagle/postgresql/PreparedStatementSpec.scala [25:150]


  def write[T](tpe: PgType, value: T)(implicit twrites: ValueWrites[T]): WireValue =
    twrites.writes(tpe, value, StandardCharsets.UTF_8)

  "Prepared Statement" should {

    def prepareSpec(name: Name, s: String) = withService() { client =>
      client(Request.Prepare(s, name))
        .map { response =>
          response must beLike {
            case Response.ParseComplete(prepared) => prepared.name must_== name
          }
        }
    }

    def closingSpec(name: Name, s: String) = withClient() { client =>
      client().flatMap { svc =>
        svc(Request.Prepare(s, name))
          .map { response =>
            response must beLike {
              case Response.ParseComplete(_) => ok
            }
          }
          .flatMap { _ =>
            svc(Request.CloseStatement(name))
              .map { response =>
                response must beLike {
                  case Response.Ready => ok
                }
              }
          }
          .respond { _ =>
            val _ = svc.close()
          }
      }
    }

    def executeSpec(
      s: String,
      parameters: Seq[WireValue] = Seq.empty,
      maxResults: Int = 0
    )(f: (Service[Request, Response], Response) => Future[MatchResult[_]]) = withClient() { client =>
      client()
        .flatMap { client =>
          client(Request.Prepare(s))
            .flatMap {
              case Response.ParseComplete(prepared) =>
                client(Request.ExecutePortal(prepared, parameters, maxResults = maxResults))
                  .flatMap { response =>
                    f(client, response)
                  }
              case _ => Future(ko)
            }
        }
    }

    def fullSpec(
      name: String,
      query: => String,
      parameters: Seq[WireValue] = Seq.empty
    )(f: Response => Future[MatchResult[_]]) =
      fragments(
        List(
          s"support preparing unnamed prepared $name" in {
            prepareSpec(Name.Unnamed, query)
          },
          s"support preparing named prepared $name" in {
            prepareSpec(Name.Named(query), query)
          },
          s"support closing unnamed prepared $name" in {
            closingSpec(Name.Unnamed, query)
          },
          s"support closing named prepared $name" in {
            closingSpec(Name.Named(query), query)
          },
          s"support executing $name" in {
            executeSpec(query, parameters) { case (_, response) => f(response) }
          }
        )
      )

    fullSpec("select statements with no arguments", "select 1") {
      case Response.ResultSet(_, rows, _) => Reader.toAsyncStream(rows).toSeq().map(r => r must haveSize(1))
      case _ => Future(ko)
    }

    fullSpec("select statements with one argument", "SELECT 1, $1::bool", write(PgType.Bool, true) :: Nil) {
      case rs: Response.ResultSet =>
        rs.toSeq.map { rows =>
          rows must haveSize(1)
          rows.head must haveSize(2)
        }
      case _ => Future(ko)
    }

    fullSpec("select statements with no arguments", "CREATE TABLE test(col1 bigint)") {
      case Response.Command(tag) => Future(tag must_== CommandTag.Other("CREATE TABLE"))
      case _ => Future(ko)
    }

    // This is a hack to have a temp table to work with in the following spec.
    lazy val tableName = withTmpTable()(identity)
    fullSpec("DML with one argument", s"INSERT INTO $tableName(int4_col) VALUES($$1)", write(PgType.Int4, 56) :: Nil) {
      case Response.Command(tag) => Future(tag must_== CommandTag.AffectedRows(CommandTag.Insert, 1))
      case _ => Future(ko)
    }

    // TODO: investigate CRDB failure
    "support portal suspension" in backend(Postgres) {
      val firstBatchSize = 17
      val secondBatchSize = 143
      executeSpec(InfiniteResultSetQuery, maxResults = firstBatchSize) {
        case (client, rs @ Response.ResultSet(_, _, _)) =>
          rs.toSeq
            .flatMap { batch =>
              batch must haveSize(firstBatchSize)
              client(Request.ResumePortal(Name.Unnamed, maxResults = secondBatchSize))
                .flatMap {
                  case rs @ Response.ResultSet(_, _, _) =>
                    rs.toSeq.map(_ must haveSize(secondBatchSize))
                  case _ => Future.value(ko)
                }
            }
        case _ => Future.value(ko)
      }
    }
  }