syndicate/connection/__init__.py (202 lines of code) (raw):
"""
Copyright 2018 EPAM Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from functools import lru_cache
from botocore.config import Config
from syndicate.connection.api_gateway_connection import ApiGatewayConnection, \
ApiGatewayV2Connection
from syndicate.connection.application_autoscaling_connection import (
ApplicationAutoscaling)
from syndicate.connection.appsync_connection import AppSyncConnection
from syndicate.connection.batch_connection import BatchConnection
from syndicate.connection.cloud_watch_connection import (EventConnection,
LogsConnection,
MetricConnection)
from syndicate.connection.cloudfront_connection import CloudFrontConnection
from syndicate.connection.cognito_identity_connection import (
CognitoIdentityConnection)
from syndicate.connection.cognito_identity_provider_connection import \
CognitoIdentityProviderConnection
from syndicate.connection.documentdb_connection import DocumentDBConnection
from syndicate.connection.dynamo_connection import DynamoConnection
from syndicate.connection.ec2_connection import EC2Connection
from syndicate.connection.elastic_beanstalk_connection import (
BeanstalkConnection)
from syndicate.connection.firehose_connection import FirehoseConnection
from syndicate.connection.eventbridge_scheduler_connection import EventBridgeSchedulerConnection
from syndicate.connection.iam_connection import IAMConnection
from syndicate.connection.kinesis_connection import KinesisConnection
from syndicate.connection.kms_connection import KMSConnection
from syndicate.connection.lambda_connection import LambdaConnection
from syndicate.connection.s3_connection import S3Connection
from syndicate.connection.sns_connection import SNSConnection
from syndicate.connection.sqs_connection import SqsConnection
from syndicate.connection.step_functions_connection import SFConnection
from syndicate.connection.resource_groups_tagging_api_connection import \
ResourceGroupsTaggingAPIConnection
from syndicate.connection.dax_connection import DaxConnection
class ConnectionProvider(object):
def __init__(self, credentials):
self.credentials = credentials.copy()
self.client_config = Config(
retries={
'max_attempts': 10,
'mode': 'standard'
}
)
@lru_cache(maxsize=None)
def api_gateway(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return ApiGatewayConnection(**credentials)
def api_gateway_v2(self, region=None):
creds = self.credentials
if region:
creds = {**creds, 'region': region}
return ApiGatewayV2Connection(**creds)
@lru_cache(maxsize=None)
def appsync(self, region=None):
creds = self.credentials
if region:
creds = {**creds, 'region': region}
return AppSyncConnection(**creds)
@lru_cache(maxsize=None)
def lambda_conn(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return LambdaConnection(**credentials)
@lru_cache(maxsize=None)
def cw_events(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return EventConnection(**credentials)
@lru_cache(maxsize=None)
def dynamodb(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return DynamoConnection(**credentials)
@lru_cache(maxsize=None)
def cognito_identity(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return CognitoIdentityConnection(**credentials)
@lru_cache(maxsize=None)
def cognito_identity_provider(self, region=None):
params = self.credentials.copy()
if region:
params['region'] = region
params['client_config'] = self.client_config
return CognitoIdentityProviderConnection(**params)
@lru_cache(maxsize=None)
def iam(self):
return IAMConnection(**self.credentials)
@lru_cache(maxsize=None)
def s3(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return S3Connection(**credentials)
@lru_cache(maxsize=None)
def sns(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return SNSConnection(**credentials)
@lru_cache(maxsize=None)
def cw_logs(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return LogsConnection(**credentials)
@lru_cache(maxsize=None)
def cw_metric(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return MetricConnection(**credentials)
@lru_cache(maxsize=None)
def ec2(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return EC2Connection(**credentials)
@lru_cache(maxsize=None)
def cloud_front(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return CloudFrontConnection(**credentials)
@lru_cache(maxsize=None)
def beanstalk(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return BeanstalkConnection(**credentials)
@lru_cache(maxsize=None)
def step_functions(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return SFConnection(**credentials)
@lru_cache(maxsize=None)
def kinesis(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return KinesisConnection(**credentials)
@lru_cache(maxsize=None)
def firehose(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return FirehoseConnection(**credentials)
@lru_cache(maxsize=None)
def eventbridge_scheduler(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return EventBridgeSchedulerConnection(**credentials)
@lru_cache(maxsize=None)
def application_autoscaling(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return ApplicationAutoscaling(**credentials)
@lru_cache(maxsize=None)
def sqs(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return SqsConnection(**credentials)
@lru_cache(maxsize=None)
def kms(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return KMSConnection(**credentials)
@lru_cache(maxsize=None)
def batch(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return BatchConnection(**credentials)
@lru_cache(maxsize=None)
def documentdb(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return DocumentDBConnection(**credentials)
@lru_cache(maxsize=None)
def groups_tagging_api(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return ResourceGroupsTaggingAPIConnection(**credentials)
@lru_cache(maxsize=None)
def dax(self, region=None):
credentials = self.credentials.copy()
if region:
credentials['region'] = region
return DaxConnection(**credentials)