in scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala [259:334]
def apply(project: String, credentials: => Credentials): BigQuery =
new BigQuery(new Client(project, credentials))
final private[client] class Client(val project: String, _credentials: => Credentials) {
require(
project != null && project.nonEmpty,
"Invalid projectId. It should be a non-empty string"
)
def credentials: Credentials = _credentials
def execute[T](fn: Bigquery => AbstractGoogleClientRequest[T]): T = {
def getAuthenticatedUser: String = {
import com.google.auth.oauth2.{
ImpersonatedCredentials,
ServiceAccountCredentials,
UserCredentials
}
_credentials match {
case sa: ServiceAccountCredentials => s"service account ${sa.getAccount}"
case uc: UserCredentials =>
s"user ${uc.getClientId} in project ${Option(uc.getQuotaProjectId).filterNot(_.isEmpty).getOrElse("unknown")}"
case ic: ImpersonatedCredentials =>
s"impersonated account ${ic.getAccount} in project ${Option(ic.getQuotaProjectId).filterNot(_.isEmpty).getOrElse("unknown")}"
case other: Credentials =>
s"${other.getAuthenticationType} with credential type ${other.getClass.getName}"
}
}
Try(fn(underlying).execute()) match {
case Success(response) => response
case Failure(e: GoogleJsonResponseException)
if e.getStatusCode == HttpStatusCodes.STATUS_CODE_FORBIDDEN && BigQueryConfig.isDebugAuthEnabled =>
throw new GoogleJsonResponseException(
new HttpResponseException.Builder(e.getStatusCode, e.getStatusMessage, e.getHeaders)
.setContent(e.getContent)
.setMessage(s"""
|${e.getMessage}
|
|[${BigQuery.getClass.getName}${BigQuerySysProps.DebugAuth.flag}] Active credential was $getAuthenticatedUser
|""".stripMargin),
e.getDetails
)
case Failure(e) =>
throw e
}
}
private lazy val underlying: Bigquery = {
val requestInitializer = new ChainingHttpRequestInitializer(
new HttpCredentialsAdapter(credentials),
(request: HttpRequest) => {
BigQueryConfig.connectTimeoutMs.foreach(request.setConnectTimeout)
BigQueryConfig.readTimeoutMs.foreach(request.setReadTimeout)
}
)
new Bigquery.Builder(new NetHttpTransport, GsonFactory.getDefaultInstance, requestInitializer)
.setApplicationName("scio")
.build()
}
lazy val storage: BigQueryReadClient = {
val settings = BigQueryReadSettings
.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
.setTransportChannelProvider(
BigQueryReadSettings
.defaultGrpcTransportProviderBuilder()
.setHeaderProvider(FixedHeaderProvider.create("user-agent", "scio"))
.build()
)
.build()
BigQueryReadClient.create(settings)
}
}