AWS Data Wrangler (awswrangler) is now called AWS SDK for Pandas), is a python library which make it easier to integrate AWS services for ETL (Extract Transform Load) works.
I will use Google Colab and AWS SDK for Pandas for creating:
- database in AWS Data Catalog
- tables
Prerequisites
- S3 bucket created and data exist (using charity data)
- Necessary permissions to use AWS services such as AWS Glue
- access to Google CoLab
Steps
Let’s get started.
- install awswrangler (pip install awswrangler)
- create a database in AWS Data Catalog – s3.catalog.create_database(‘database_name’)
- crate table – s3.store_parquet_metadata
# create database before parquet_metadata
# EntityNotFoundException: An error occurred (EntityNotFoundException) when calling the CreateTable operation: Database ellodb not found.
wr.catalog.create_database('database_name')
# create table with existing data in S3
# very fast!!!
columns_types, partitions_types, partitions_values = wr.s3.store_parquet_metadata(
path='s3://{bucket_ame}/data/charity/',
database='database_name',
table='charity',
dataset=True
)
Verify your table
Go to AWS Console and check your table metadata under Data Catalog
Read data from S3 bucket
df = wr.s3.read_parquet("s3://{bucket_name}data/charity/")
Create a table using Pandas dataframe
You can also create a table using – s3.to_parquet method
# create table using Pandas dataframe
# https://aws-data-wrangler.readthedocs.io/en/stable/stubs/awswrangler.s3.to_parquet.html
wr.s3.to_parquet(
df=df_date,
path="s3://{bucket_name}/data/charity/",
datTrue,
database="database_name",
table="table_name",
partition_cols=["column_name"]
);
Read data from a database
Using Pandas library to read from a relational database instead of using data from Data Catalog
from sqlalchemy import create_engine, text
# postgres
user = 'user'
password = 'password'
database = 'database_name'
url = f'postgresql://{user}:{password}@xxxxx.aws.neon.tech/{database}'
pgengine = create_engine(url)
pgconn = pgengine.connect()
# get data using query
query = text('SELECT version();')
df = pd.read_sql_query(query, pgconn)