python_sample.py (97 lines of code) (raw):

import boto3 import botocore import os import pandas as pd from io import BytesIO, StringIO import argparse import gzip import json from datetime import datetime import pytz def get_s3_client(): # keys can be set as environment variables or hardcoded here access_key = os.getenv('AWS_ACCESS_KEY_ID') secret_key = os.getenv('AWS_SECRET_ACCESS_KEY') region = os.getenv('AWS_REGION') s3_client = boto3.client( 's3', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region ) return s3_client def get_last_timestamp(filename): filename = os.path.basename(filename).rsplit('.', 2)[0] date, time, _ = filename.split('_') year, month, day = map(int, date.split('-')) hour, minute, second = map(int, time.split('-')) dt = datetime(year, month, day, hour, minute, second) return int(dt.replace(tzinfo=pytz.utc).timestamp()) def get_s3_keys(bucket, prefix, start_time): client = get_s3_client() keys = [] kwargs = {'Bucket': bucket, 'Prefix': prefix} while True: resp = client.list_objects_v2(**kwargs) for obj in resp['Contents']: key = obj['Key'] # not necessary, but speeds up execution if get_last_timestamp(key) < start_time: continue # we make use of the key format to avoid downloading too many objects https://ember.deltixlab.com/docs/dw/s3/#batches--objects keys.append(key) try: kwargs['ContinuationToken'] = resp['NextContinuationToken'] except KeyError: break return keys def retrieve_objects(bucket, keys, start_time, end_time): start_time = pd.to_datetime(start_time, unit='s') end_time = pd.to_datetime(end_time, unit='s') client = get_s3_client() # clearing content of the output file with open('output.csv', 'w') as f_output: pass with open('output.csv', 'a') as f_output: for key in keys: try: response = client.get_object(Bucket=bucket, Key=key) data = StringIO(gzip.open(BytesIO(response['Body'].read()), 'rt').read()) df = pd.read_json(data, lines=True) if df['Timestamp'].iloc[-1] < start_time: continue elif df['Timestamp'].iloc[0] > end_time: break # filtering out TRADES only df = df[df['Type'] == "OrderTradeReportEvent"] df = df[(df['Timestamp'] >= start_time) & (df['Timestamp'] <= end_time)] # additional processing may be applied here using pandas, e.g. drop columns, change date format # for example df['Timestamp'] = pd.to_datetime(df['Timestamp']) # or df['Timestamp'] = df['Timestamp'].dt.strftime('%d-%m-%Y') if not df.empty: df.to_csv(f_output, header=f_output.tell() == 0, index=False) except botocore.exceptions.ClientError as e: print(f"Failed to read data from {key}: {e}") except KeyError as e: print(f"Failed to extract timestamp from line {line} in {key}: {e}") except ValueError: print(f"Failed to parse JSON from {key}") def main(): parser = argparse.ArgumentParser() parser.add_argument("--start_time", type=int, help="the start time in Unix time format in seconds") parser.add_argument("--end_time", type=int, help="the end time in Unix time format in seconds") parser.add_argument("--bucket", type=str, help="the name of the S3 bucket") parser.add_argument("--prefix", type=str, help="the prefix of the files in the S3 bucket") args = parser.parse_args() if not args.bucket: print("Please provide the name of the S3 bucket") return if not args.prefix: print("Please provide the prefix of the files") return # bucket name and prefix are obtained from command line argument bucket = args.bucket prefix = args.prefix if args.start_time and args.end_time: start_time = args.start_time end_time = args.end_time if start_time >= end_time: print('Error: End time must be greater than start time') return # search for keys matching the specified prefix keys = get_s3_keys(bucket, prefix, start_time) retrieve_objects(bucket, keys, start_time, end_time) else: print('Please provide both --start_time and --end_time as Unix timestamps') if __name__ == "__main__": main()