def apply()

in scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala [259:334]


  def apply(project: String, credentials: => Credentials): BigQuery =
    new BigQuery(new Client(project, credentials))

  final private[client] class Client(val project: String, _credentials: => Credentials) {
    require(
      project != null && project.nonEmpty,
      "Invalid projectId. It should be a non-empty string"
    )

    def credentials: Credentials = _credentials

    def execute[T](fn: Bigquery => AbstractGoogleClientRequest[T]): T = {
      def getAuthenticatedUser: String = {
        import com.google.auth.oauth2.{
          ImpersonatedCredentials,
          ServiceAccountCredentials,
          UserCredentials
        }

        _credentials match {
          case sa: ServiceAccountCredentials => s"service account ${sa.getAccount}"
          case uc: UserCredentials =>
            s"user ${uc.getClientId} in project ${Option(uc.getQuotaProjectId).filterNot(_.isEmpty).getOrElse("unknown")}"
          case ic: ImpersonatedCredentials =>
            s"impersonated account ${ic.getAccount} in project ${Option(ic.getQuotaProjectId).filterNot(_.isEmpty).getOrElse("unknown")}"
          case other: Credentials =>
            s"${other.getAuthenticationType} with credential type ${other.getClass.getName}"
        }
      }

      Try(fn(underlying).execute()) match {
        case Success(response) => response
        case Failure(e: GoogleJsonResponseException)
            if e.getStatusCode == HttpStatusCodes.STATUS_CODE_FORBIDDEN && BigQueryConfig.isDebugAuthEnabled =>
          throw new GoogleJsonResponseException(
            new HttpResponseException.Builder(e.getStatusCode, e.getStatusMessage, e.getHeaders)
              .setContent(e.getContent)
              .setMessage(s"""
                   |${e.getMessage}
                   |
                   |[${BigQuery.getClass.getName}${BigQuerySysProps.DebugAuth.flag}] Active credential was $getAuthenticatedUser
                   |""".stripMargin),
            e.getDetails
          )
        case Failure(e) =>
          throw e
      }
    }

    private lazy val underlying: Bigquery = {
      val requestInitializer = new ChainingHttpRequestInitializer(
        new HttpCredentialsAdapter(credentials),
        (request: HttpRequest) => {
          BigQueryConfig.connectTimeoutMs.foreach(request.setConnectTimeout)
          BigQueryConfig.readTimeoutMs.foreach(request.setReadTimeout)
        }
      )
      new Bigquery.Builder(new NetHttpTransport, GsonFactory.getDefaultInstance, requestInitializer)
        .setApplicationName("scio")
        .build()
    }

    lazy val storage: BigQueryReadClient = {
      val settings = BigQueryReadSettings
        .newBuilder()
        .setCredentialsProvider(FixedCredentialsProvider.create(credentials))
        .setTransportChannelProvider(
          BigQueryReadSettings
            .defaultGrpcTransportProviderBuilder()
            .setHeaderProvider(FixedHeaderProvider.create("user-agent", "scio"))
            .build()
        )
        .build()
      BigQueryReadClient.create(settings)
    }
  }