"""
Database connections and engines.
|pic1| |pic2| |pic3| |pic4|
.. |pic1| image:: ../images_source/db_etl_tools/oracle1.png
:width: 20%
.. |pic2| image:: ../images_source/db_etl_tools/postgres1.png
:width: 20%
.. |pic3| image:: ../images_source/db_etl_tools/teradata.png
:width: 20%
.. |pic4| image:: ../images_source/db_etl_tools/redshift1.png
:width: 20%
"""
from datetime import datetime
import os
import re
import time
import cx_Oracle
import numpy as np
import pandas as pd
import psycopg2
from colorama import Fore
from sqlalchemy import text
import psycopg2.extras
from fusetools.text_tools import Export
[docs]class Generic:
"""
Generic functions for SQL queries and ETL.
"""
[docs] @classmethod
def make_groupby(cls, sql, dim_fact_delim):
"""
Creates a dynmaically generated GROUP BY clause for a given SQL statement.
:param sql: SQL statement provided.
:param dim_fact_delim: Delimiter between selected columns.
:return: A complete SQL statement with dynamically generated GROUP BY clause.
"""
dim_segs_ = []
for idxx, d in enumerate(sql.replace("\n", "").split("SELECT")[1].split(dim_fact_delim)[0].split(", ")):
if d.strip() != '':
dim_segs_.append(d.split(" as ")[1].strip())
sql_all = sql + " GROUP BY " + ', '.join(dim_segs_)
sql_all = sql_all.replace("\n", " ").replace('"', "")
return sql_all
[docs] @classmethod
def make_db_schema(cls, df):
"""
Creates a mapping of Pandas data types to SQL data types.
:param df: A Pandas DataFrame with column types to be converted.
:return: A Pandas DataFrame of columns with corresponding SQL data types.
"""
cols = []
dtypes = []
for col in df.columns:
cols.append(col)
col_series = df[col].replace(r'^\s*$', np.nan, regex=True)
col_series = col_series.dropna()
try:
date_len = max(col_series.astype("str").str[:10].str.split("-").apply(lambda x: len(x)))
if date_len == 3:
dtypes.append("datetime64[ns]")
continue
except:
date_len = 0
try:
if col_series.astype("float").apply(float.is_integer).all():
int = True
else:
int = False
except:
dtypes.append("object")
continue
if int and date_len != 3:
dtype = "Int64"
elif not int and date_len != 3:
dtype = "float"
elif date_len == 3:
dtype = "datetime64[ns]"
else:
dtype = "object"
dtypes.append(dtype)
schema_df = pd.DataFrame({"col": cols, "dtype_new": dtypes})
old_schema_df = pd.DataFrame(df.dtypes, columns=["dtype_old"]).reset_index()
schema_df2 = pd.merge(schema_df, old_schema_df, how="inner", left_on="col", right_on="index")
schema_df2['dtype_final'] = np.where(
schema_df2['dtype_new'] != "object",
schema_df2['dtype_new'],
schema_df2['dtype_old']
)
return schema_df2
[docs] @classmethod
def db_apply_schema(cls, df, schema_df):
"""
Converts Pandas DataFrame columns based on schema DataFrame provided.
:param df: A Pandas DataFrame with column types to be converted.
:param schema_df: A Pandas DataFrame of columns with corresponding SQL data types.
:return: Pandas DataFrame with columns converted to SQL schema.
"""
df_ret = df
df_ret = df_ret.replace(r'^\s*$', np.nan, regex=True)
df_ret = df_ret.replace('', np.nan, regex=True)
df_ret = df_ret.replace({np.nan: None})
for idx, row in schema_df.iterrows():
if row['dtype_final'] == "Int64":
df_ret[row['col']] = df_ret[row['col']].replace({np.nan: None})
df_ret[row['col']] = df_ret[row['col']].astype(float).astype("Int64")
elif row['dtype_final'] == "datetime64[ns]":
df_ret[row['col']] = pd.to_datetime(df_ret[row['col']], errors="coerce")
else:
df_ret[row['col']] = df_ret[row['col']].replace({np.nan: None})
df_ret[row['col']] = df_ret[row['col']].astype(row['dtype_final'])
return df_ret
[docs] @classmethod
def make_db_cols(cls, df):
"""
Returns a Pandas DataFrame column names that are converted for database standards.
:param df: A Pandas DataFrame with columns to be transformed
:return: Pandas DataFrame column names that are converted for database standards.
"""
columns = [re.sub('#', 'num', col) for col in df.columns]
columns = [re.sub('%', 'pct', col) for col in columns]
columns = [re.sub('[^a-zA-Z0-9]+', ' ', col) for col in columns]
columns = [col.replace(" ", "_") for col in columns]
columns = [col[:200] for col in columns]
columns = [col.lower() for col in columns]
columns = [c.lstrip("_").rstrip("_") for c in columns]
df.columns = columns
return df
[docs] @classmethod
def run_query(cls, engine, sql):
"""
Executes a SQL query.
:param engine: A database engine object.
:param sql: A SQL statement to be executed.
:return: Time for execution of SQL query.
"""
rptg_tstart = datetime.now()
engine.execute(sql)
rptg_tend = datetime.now()
tdelta = rptg_tend - rptg_tstart
tdelta = tdelta.total_seconds() / 60
print(Fore.RED + f"Runtime: {tdelta}")
[docs]class Oracle:
"""
Generic functions for Oracle SQL queries and ETL.
.. image:: ../images_source/db_etl_tools/oracle1.png
"""
[docs] @classmethod
def make_tbl(cls, df, tbl_name):
"""
Provides a CREATE TABLE SQL statement for a given Pandas DataFrame.
:param df: A Pandas DataFrame to be added as an Oracle table.
:param tbl_name: Oracle table name to be created.
:return: CREATE TABLE SQL statement.
"""
for idx, col in enumerate(df):
col_desc = col + "-" + str(df[col].map(lambda x: len(str(x))).max())
if idx == 0:
col_desc_all = [col_desc]
else:
col_desc_all.append(col_desc)
col_desc_all = pd.DataFrame(col_desc_all)
col_desc_all.columns = ["char"]
col_desc_all['column'], col_desc_all['length'] = col_desc_all['char'].str.split('-', 1).str
col_desc_types = pd.DataFrame(df.dtypes).reset_index()
col_desc_types.columns = ["column", "type"]
col_desc_all = pd.merge(
col_desc_all,
col_desc_types,
how="inner",
on="column")
d = {'object': 'VARCHAR',
'int64': 'NUMBER',
'float64': 'VARCHAR',
'datetime64[ns]': 'VARCHAR'}
col_desc_all = col_desc_all.replace(d)
col_desc_all['concat'] = np.where(col_desc_all['type'] != "NUMBER",
col_desc_all['column'] + " " + col_desc_all['type'] + "(" + col_desc_all[
'length'] + ")",
col_desc_all['column'] + " " + col_desc_all['type'])
col_desc_all = col_desc_all.apply(', '.join).reset_index()
col_desc_all.columns = ["index", "statement"]
statement = col_desc_all[col_desc_all['index'] == 'concat']
sql = statement['statement'].values
sql = str(sql)
sql = sql.replace("[", "")
sql = sql.replace("]", "")
sql = "CREATE TABLE " + tbl_name + " ( " + sql + " )"
sql = sql.replace("'", "")
return sql
[docs] @classmethod
def insert_tbl(cls, df, tbl_name):
"""
Executes an INSERT INTO statement for a given Pandas DataFrame.
:param df: A Pandas DataFrame with values to be inserted.
:param tbl_name: An Oracle table for Pandas DataFrame to be inserted into.
:return: SQL for INSERT INTO statement.
"""
sql = 'INSERT INTO ' + tbl_name + '(' + ', '.join(df.columns) + ') VALUES (' + ''.join(
[':' + str(v) + ', ' for v in list(range(1, len(df.columns)))]) + ':' + str(len(df.columns)) + ')'
return sql
[docs] @classmethod
def insert_exec(cls, sql, conn, df):
"""
Executes a provided SQL statement.
:param sql: A provided SQL query.
:param conn: A database connection.
:param df: A Pandas DataFrame.
:return: Nothing.
"""
cursor = cx_Oracle.Cursor(conn)
cursor.prepare(sql)
cursor.executemany(None, df.values.tolist())
conn.commit()
cursor.close()
# conn.close()
[docs] @classmethod
def make_tbl_complete_force(cls, df, tbl_name, eng, conn, attempt_n,
subcols=False, chunks=False, chunks_delay=False):
"""
Executes a series of SQL statements to CREATE and INSERT into a table from a Pandas DataFrame.
:param df: Pandas DataFrame to create a table from.
:param tbl_name: Name of table to be created.
:param eng: Oracle database engine object.
:param conn: Oracle database connection object.
:param attempt_n: Number of times to attempt to run INSERT statement.
:param subcols: A list of columns of the Pandas DataFrame to apply operations on.
:param chunks: Number of chunks to split Pandas DataFrame into.
:param chunks_delay: Delay between chunk's INSERT statement.
:return: Print statements outline sequential SQL statements executed.
"""
if len(df) > 0:
if subcols:
df = df[subcols]
df.fillna(' ', inplace=True)
df = df.astype(str)
# make create table sql
sql = cls.make_tbl(df, tbl_name)
print(sql)
# drop table
try:
eng.execute("drop table " + str(tbl_name))
except Exception as e:
print(str(e))
pass
# create table
eng.execute(sql)
# split large df into chunks
if chunks:
df_split = np.array_split(df, chunks)
for sub in df_split:
# make insert table sql
sql = cls.insert_tbl(sub, tbl_name)
print(sql)
# execute insert statement
# add try counter
attempts = attempt_n
while attempts > 0:
try:
cls.insert_exec(sql, conn, sub)
except:
attempts -= 1
print(Fore.RED + f"Failed upload attempt...{attempts} remaining.")
time.sleep(1)
if chunks_delay:
time.sleep(chunks_delay)
else:
time.sleep(2)
else:
# make insert table sql
sql = cls.insert_tbl(df, tbl_name)
print(sql)
# execute insert statement
cls.insert_exec(sql, conn, df)
[docs] @classmethod
def make_tbl_complete(cls, df, tbl_name, eng, conn, subcols=False, chunks=False, chunks_delay=False):
"""
Executes a series of SQL statements to CREATE and INSERT into a table from a Pandas DataFrame.
:param df: Pandas DataFrame to create a table from.
:param tbl_name: Name of table to be created.
:param eng: Oracle database engine object.
:param conn: Oracle database connection object.
:param subcols: A list of columns of the Pandas DataFrame to apply operations on.
:param chunks: Number of chunks to split Pandas DataFrame into.
:param chunks_delay: Delay between chunk's INSERT statement.
:return: Print statements outline sequential SQL statements executed.
"""
if len(df) > 0:
if subcols:
df = df[subcols]
df.fillna(' ', inplace=True)
df = df.astype(str)
# make create table sql
sql = cls.make_tbl(df, tbl_name)
print(sql)
# drop table
try:
eng.execute("drop table " + str(tbl_name))
except:
pass
# create table
eng.execute(sql)
# split large df into chunks
if chunks:
df_split = np.array_split(df, chunks)
for sub in df_split:
# make insert table sql
sql = cls.insert_tbl(sub, tbl_name)
print(sql)
# execute insert statement
cls.insert_exec(sql, conn, sub)
if chunks_delay:
time.sleep(chunks_delay)
else:
time.sleep(2)
else:
# make insert table sql
sql = cls.insert_tbl(df, tbl_name)
print(sql)
# execute insert statement
cls.insert_exec(sql, conn, df)
[docs] @classmethod
def get_oracle_date(cls, date):
"""
Converts a date to an Oracle date of format "DD-MMM-YYY"
:param date: A provided date.
:return: An Oracle database date.
"""
# given a datetime YYYY-MM-DD
if "-" in date:
year, month, day = str(pd.to_datetime(date)).split("-")
year = year[2:]
day = day.replace(" 00:00:00", "")
month_name = {
'01': 'JAN',
'02': 'FEB',
'03': 'MAR',
'04': 'APR',
'05': 'MAY',
'06': 'JUN',
'07': 'JUL',
'08': 'AUG',
'09': 'SEP',
'10': 'OCT',
'11': 'NOV',
'12': 'DEC'}
month = month_name.get(month)
date = day + "-" + month + "-" + year
# given an excel date
elif "/" in date:
date = str(pd.to_datetime(date)).replace(" 00:00:00", "")
year, month, day = str(pd.to_datetime(date)).split("-")
year = year[2:]
day = day.replace(" 00:00:00", "")
month_name = {
'01': 'JAN',
'02': 'FEB',
'03': 'MAR',
'04': 'APR',
'05': 'MAY',
'06': 'JUN',
'07': 'JUL',
'08': 'AUG',
'09': 'SEP',
'10': 'OCT',
'11': 'NOV',
'12': 'DEC'}
month = month_name.get(month)
date = day + "-" + month + "-" + year
return date
[docs] @classmethod
def get_orcl_date(cls, dat):
"""
Converts a date to an Oracle date of format "DD-MMM-YYY".
:param dat: A provided date column of a Pandas Series.
:return: An Oracle database date.
"""
dat['mon'] = dat.dt.month
dat['day'] = dat.dt.day
# .astype(str).str.pad(width=2, fillchar="0", side="left")
dat['year'] = dat.dt.year
mon_abbrevs = {
1: 'JAN',
2: 'FEB',
3: 'MAR',
4: 'APR',
5: 'MAY',
6: 'JUN',
7: 'JUL',
8: 'AUG',
9: 'SEP',
10: 'OCT',
11: 'NOV',
12: 'DEC'}
dat['mon_abbrevs'] = \
dat['mon'].map(mon_abbrevs)
dat['day'] = dat['day'].str[:-2]
dat['year'] = dat['year'].astype(str).str[:4]
dat['year'] = dat['year'].astype(str).str[-2:]
dat['date_comb'] = \
dat['day'].astype(str) + "-" + dat['mon_abbrevs'].astype(str) + "-" + dat['year'].astype(str)
return dat['date_comb']
[docs] @classmethod
def orcl_tbl_varchar_convert(cls, tbl_name, convert_cols, engine):
"""
Converts a set of columns to VARCHAR(300) for a given Oracle table.
:param tbl_name: Oracle table name.
:param convert_cols: List of columns to convert.
:param engine: Oracle database engine.
:return: Printed ALTER table statements for each column.
"""
# loop through
for col in convert_cols:
sql = f'''
alter table {tbl_name}
modify {col} varchar(300)
'''
print(sql)
engine.execute(text(sql).execution_options(autocommit=True))
time.sleep(1)
[docs]class Postgres:
"""
Generic functions for Postgres SQL queries and ETL.
.. image:: ../images_source/db_etl_tools/postgres1.png
"""
[docs] @classmethod
def run_query_pg(cls, conn, sql):
"""
Executes a SQL statement with a Postgres database connection.
:param conn: Postgres database connection object,
:param sql: SQL Statement to execute.
:return: Elapsed time to execute query.
"""
rptg_tstart = datetime.now()
cur = conn.cursor()
cur.execute(sql)
conn.commit()
rptg_tend = datetime.now()
tdelta = rptg_tend - rptg_tstart
tdelta = tdelta.total_seconds() / 60
print(Fore.RED + f"Runtime: {tdelta}")
[docs] @classmethod
def insert_val_pg(cls, col_list, val_list, tbl_name):
"""
Creates SQL to run an INSERT operation of a given Postgres table.
:param col_list: List of columns to INSERT or UPDATE.
:param val_list: List of values to INSERT or UPDATE.
:param tbl_name: Name of Postgres table.
:return: SQL to run an INSERT statement.
"""
sql = f'''
INSERT INTO {tbl_name}
(
{str(col_list).replace("[", "").replace("]", "").replace("'", "")}
) values (
{str(val_list).replace("[", "").replace("]", "")}
)
'''
return sql
[docs] @classmethod
def upsert_val_pg(cls, col_list, val_list, tbl_name, constraint_col):
"""
Creates SQL to run an UPSERT (INSERT new records or UPDATE existing records) operation of a given Postgres table.
:param col_list: List of columns to INSERT or UPDATE.
:param val_list: List of values to INSERT or UPDATE.
:param constraint_col: Column/value logic to check against for INSERT or UPDATE.
:param tbl_name: Name of Postgres table.
:return: SQL to run an UPSERT statement.
"""
update = ""
for idx, col in zip(col_list, val_list):
if type(col) in [str]:
update = update + idx + f"='{col}',"
else:
update = update + idx + f"={col},"
update = update[:update.rfind(",")]
sql = f'''
INSERT INTO {tbl_name}
({str(col_list).replace("[", "").replace("]", "").replace("'", "")})
VALUES
({str(val_list).replace("[", "").replace("]", "")})
ON CONFLICT ({constraint_col})
DO
UPDATE SET
{update}
'''
return sql
[docs] @classmethod
def upsert_tbl_pg(cls, src_tbl, tgt_tbl, src_join_cols, src_insert_cols,
src_update_cols=False, update_compare_cols=False):
"""
Creates SQL to run an UPSERT (INSERT new records or UPDATE existing records) operation of a given Postgres table.
:param src_tbl: Postgres source table that contains data to be merged from.
:param tgt_tbl: Postgres target table to receive UPSERT operation.
:param src_join_cols: Columns to use to join source and target tables.
:param src_insert_cols: Columns to be inserted from source table.
:param src_update_cols: Columns to be updated from source table.
:param update_compare_cols: Columns to use to compare values across source and target tables.
:return: A SQL Insert statement and a SQL Update statement.
"""
src_join_cols_ = (
str([f"t.{c} = s.{c} AND "
for c in src_join_cols])
.replace("[", "")
.replace("]", "")
.replace("'", "")
.replace(",", "")
)
src_join_cols_ = src_join_cols_[:src_join_cols_.rfind("AND")]
src_join_cols_f = (
str([f"t.{c} IS NULL AND "
for c in src_join_cols])
.replace("[", "")
.replace("]", "")
.replace("'", "")
.replace(",", "")
)
src_join_cols_f = src_join_cols_f[:src_join_cols_f.rfind("AND")]
src_insert_cols_ = (
str([f"s.{c}"
for c in src_insert_cols])
.replace("[", "")
.replace("]", "")
.replace("'", "")
)
if src_update_cols:
src_update_cols_ = (
str([f"{c} = s.{c},"
for c in src_update_cols])
.replace("[", "")
.replace("]", "")
.replace("', '", "")
.replace("'", "")
)
src_update_cols_ = src_update_cols_[:src_update_cols_.rfind(",")]
# update join statement
src_join_cols2_ = src_join_cols_.replace("t.", f"{tgt_tbl}.")
if update_compare_cols:
update_compare_cols_ = (
str([f"s.{c} != {tgt_tbl}.{c},"
for c in update_compare_cols])
.replace("[", "")
.replace("]", "")
.replace("', '", "")
.replace("'", "")
)
update_compare_cols_ = update_compare_cols_[:update_compare_cols_.rfind(",")]
src_join_cols2_ = src_join_cols2_ + " AND " + update_compare_cols_
# src_join_cols2_ = src_join_cols2_.replace("t.", f"{tgt_tbl}.")
# https://dwgeek.com/amazon-redshift-merge-statement-alternative-and-example.html/
sql_update = f'''
/* Update records*/
UPDATE {tgt_tbl}
SET {src_update_cols_}
FROM {src_tbl} s
WHERE {src_join_cols2_}
'''.replace("\n", " ")
else:
sql_update = ""
sql_insert = f'''
/* Insert records*/
INSERT INTO {tgt_tbl}
SELECT {src_insert_cols_}
FROM {src_tbl} s
LEFT JOIN {tgt_tbl} t
ON {src_join_cols_}
WHERE {src_join_cols_f}
'''.replace("\n", " ")
return sql_update, sql_insert
[docs] @classmethod
def make_df_tbl_pg(cls, tbl_name, df):
"""
Creates SQL to run a CREATE TABLE statement based on a Pandas DataFrame.
:param tbl_name: Postgres table name.
:param df: Pandas DataFrame.
:return: CREATE TABLE SQL statement.
"""
# fix columns
df = Generic.make_db_cols(df)
# loop thru the columns
for idx, col in enumerate(df):
# find the max length of each field
col_desc = col + "-" + str(df[col].map(lambda x: len(str(x))).max())
# find the max value of each fields
try:
col_max = col + "-" + str(max(df[col]))
except:
col_max = col + "-" + 'NA'
if idx == 0:
col_desc_all = [col_desc]
col_max_all = [col_max]
else:
col_desc_all.append(col_desc)
col_max_all.append(col_max)
# make df of column lengths
col_desc_all = pd.DataFrame(col_desc_all)
col_desc_all.columns = ["char"]
col_desc_all['column'], col_desc_all['length'] = \
col_desc_all['char'].str.split('-', 1).str
# make df of column max
col_max_all = pd.DataFrame(col_max_all)
col_max_all.columns = ["char"]
col_max_all['column'], col_max_all['max'] = \
col_max_all['char'].str.split('-', 1).str
# make df of column dtypes
col_desc_types = pd.DataFrame(df.dtypes).reset_index()
col_desc_types.columns = ["column", "type"]
# join dfs
col_desc_all = pd.merge(
col_desc_all,
col_desc_types,
how="inner",
on="column")
col_desc_all = pd.merge(
col_desc_all,
col_max_all[["column", "max"]],
how="inner",
on="column")
# define data type mapping (pandas --> teradata)
d = {'object': 'VARCHAR',
'int64': 'INTEGER',
'Int64': 'INTEGER',
'int32': 'INTEGER',
'bool': 'VARCHAR',
'float64': 'FLOAT',
'datetime64[ns]': 'TIMESTAMP',
"datetime64[ns, UTC]": "TIMESTAMP"}
col_desc_all = col_desc_all.astype(str).replace(d)
# list the columns where you want to specify the lengths
col_desc_all['concat'] = np.where(
# if varchar, use the length of the longest char
col_desc_all['type'] == "VARCHAR",
col_desc_all['column'] + " " + \
col_desc_all['type'].astype(str) + \
"(" + col_desc_all['length'] + ")",
col_desc_all['column'] + " " + \
col_desc_all['type'].astype(str))
# convert integers with a max val over certain amount to varchar
for idx, row in col_desc_all.iterrows():
if str(row['type']) == 'INTEGER' and row['max'] != "nan" and int(row['max']) > 2147483647:
val = row['concat']
col_desc_all.loc[idx, 'concat'] = \
val.replace(
" INTEGER",
f" VARCHAR({row['length']})")
col_desc_all = col_desc_all.apply(', '.join).reset_index()
col_desc_all.columns = ["index", "statement"]
statement = col_desc_all[col_desc_all['index'] == 'concat']
sql = statement['statement'].values
sql = str(sql)
sql = sql.replace("[", "")
sql = sql.replace("]", "")
sql = "CREATE TABLE " + tbl_name + " ( " + sql + " )"
sql = sql.replace("'", "")
return sql
[docs] @classmethod
def insert_df_pg(cls, cursor, conn, df, tbl_name, return_statement=None):
"""
Executes an INSERT INTO statement for a given Pandas DataFrame into a Postgres table..
:param cursor: Postgres database cursor object.
:param conn: Postgres database connection object.
:param df: Pandas DataFrame to insert into a Postgres table.
:param tbl_name: Postgres table name.
:return: Elapsed time to execute query.
"""
df_load = df.replace({np.nan: None})
df_load = df_load.round(3)
df_columns = list(df_load)
# create (col1,col2,...)
columns = ",".join(df_columns)
values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
insert_stmt = "INSERT INTO {} ({}) {}".format(tbl_name, columns, values)
if return_statement:
insert_stmt = insert_stmt + return_statement
rptg_tstart = datetime.now()
psycopg2.extras.execute_batch(cursor, insert_stmt, df_load.values)
conn.commit()
rptg_tend = datetime.now()
tdelta = rptg_tend - rptg_tstart
tdelta = tdelta.total_seconds() / 60
if return_statement:
res = cursor.fetchone()
return res
print(Fore.RED + f"Runtime: {tdelta}")
[docs] @classmethod
def make_tbl_complete_pg(cls, df, tbl_name, conn, cursor, batch_size=False):
"""
Executes a series of SQL statements to CREATE and INSERT into a table from a Pandas DataFrame.
:param df: Pandas DataFrame to create a table from.
:param tbl_name: Name of table to be created.
:param conn: Postgres database connection object.
:param cursor: Postgres database cursor object.
:param batch_size: Records to load per batch.
:return: Elapsed time to execute query.
"""
# 1 drop the table
print(f"dropping table: {tbl_name}")
try:
cls.run_query_pg(sql=f"drop table {tbl_name}", conn=conn)
except:
print(f"table doesn't exist: {tbl_name}")
pass
# create the table
print(f"creating table: {tbl_name}")
sql = cls.make_tbl_pg(df=df, tbl_name=tbl_name)
print(sql)
cls.run_query_pg(sql=sql, conn=conn)
print(f"inserting DF values into table: {tbl_name}")
rptg_tstart = datetime.now()
cls.insert_pg(df=df, tbl=tbl_name, cursor=cursor, conn=conn, batch_size=batch_size)
rptg_tend = datetime.now()
tdelta = rptg_tend - rptg_tstart
tdelta = tdelta.total_seconds() / 60
print(Fore.RED + f"Runtime: {tdelta}")
[docs] @classmethod
def sequential_load_pg(cls,
override,
tgt_tbl,
conn,
dt_start,
dt_end,
saved_day_id_range_placeholder,
dt1_interval,
dt2_interval,
sql_loop_fn,
sql_loop_fn_type,
filter_day_id_field1=False,
sql_loop_fn_dt_placeholder1=False,
filter_day_id_field2=False,
filter_id_type2=False,
sql_loop_fn_dt_placeholder2=False,
filter_day_id_field3=False,
filter_id_type3=False,
sql_loop_fn_dt_placeholder3=False,
loop_src1=False,
loop_src2=False,
loop_src3=False,
log_dir=False):
"""
:param override:
:param tgt_tbl:
:param conn:
:param dt_start:
:param dt_end:
:param saved_day_id_range_placeholder:
:param dt1_interval:
:param dt2_interval:
:param sql_loop_fn:
:param sql_loop_fn_type:
:param filter_day_id_field1:
:param sql_loop_fn_dt_placeholder1:
:param filter_day_id_field2:
:param filter_id_type2:
:param sql_loop_fn_dt_placeholder2:
:param filter_day_id_field3:
:param filter_id_type3:
:param sql_loop_fn_dt_placeholder3:
:param loop_src1:
:param loop_src2:
:param loop_src3:
:param log_dir:
:return:
"""
# define the month startend dates to loop through
rptg_dates = pd.date_range(dt_start, dt_end, freq=dt1_interval) - pd.offsets.MonthBegin(1)
rptg_dates = [str(x)[:10] for x in rptg_dates.to_list()]
rptg_dates = pd.DataFrame({
"start_date": rptg_dates,
"end_date": rptg_dates
})
rptg_dates['end_date'] = rptg_dates['end_date'].shift(-1)
rptg_dates = rptg_dates[pd.to_datetime(rptg_dates['start_date']) <= datetime.now()].dropna()
# define the weekly start/end dates to loop thru
rptg_dates_wk = pd.date_range(dt_start, dt_end, freq=dt2_interval)
rptg_dates_wk = [str(x)[:10] for x in rptg_dates_wk.to_list()]
rptg_dates_wk = pd.DataFrame({
"start_date": rptg_dates_wk,
"end_date": rptg_dates_wk
})
rptg_dates_wk['end_date'] = rptg_dates_wk['end_date'].shift(-1)
rptg_dates_wk = rptg_dates_wk[pd.to_datetime(rptg_dates_wk['start_date']) <= datetime.now()].dropna()
# dropping table if override = True
if override:
print(f'''table override True: Dropping table: {tgt_tbl} ''')
try:
cls.run_query_pg(conn=conn, sql=f'''drop table {tgt_tbl}''')
except:
conn.commit()
pass
# getting max day id value
try:
sql = f'''select max(date(trim(substring(dt_range,regexp_instr(dt_range,'to ')+3,10)))) as day_idnt FROM {tgt_tbl}'''
saved_dates = pd.read_sql_query(sql=sql, con=conn)
except:
conn.commit()
saved_dates = pd.DataFrame({"day_idnt": ["1999-12-31"]}) # arbitrarily old date
saved_date_dt = \
datetime(
year=int(str(saved_dates['day_idnt'].astype(str).values[0]).split("-")[0]),
month=int(str(saved_dates['day_idnt'].astype(str).values[0]).split("-")[1]),
day=int(str(saved_dates['day_idnt'].astype(str).values[0]).split("-")[2])
).replace(day=1).strftime("%Y-%m-%d")
rptg_dates = rptg_dates[
pd.to_datetime(rptg_dates['start_date']) >= \
pd.to_datetime(saved_date_dt)].reset_index(drop=True)
print("Starting load from:")
print(rptg_dates.head(1))
rptg_freq = "M"
for idx, row in rptg_dates.iterrows():
print(f'''{row['start_date']} to {row['end_date']}''')
# if idx == 0:
# break
if idx == 0 and saved_dates['day_idnt'][0] != pd.to_datetime(row['start_date']):
print(Fore.RED + f'''latest saved data date in table is {str(saved_dates['day_idnt'][0])} ...''')
# bump up start range:
new_start = str(pd.to_datetime(str(saved_dates['day_idnt'][0])) + pd.DateOffset(1))[:10]
print(Fore.RED + f'''revising start date to: {new_start} to {row['end_date']}''')
# if its a function, pass in params
if sql_loop_fn_type == "fn":
sql = sql_loop_fn(src=loop_src1,
src2=loop_src2,
src3=loop_src3,
start=new_start,
end=row['end_date'])
# otherwise, we will just replace strings
else:
# date range column for logging
sql = sql_loop_fn.replace(
saved_day_id_range_placeholder,
f" '{new_start} to {row['end_date']}' as dt_range,"
)
# date filters
sql = sql.replace(
sql_loop_fn_dt_placeholder1,
f" AND date({filter_day_id_field1}) >= '{new_start}' AND date({filter_day_id_field1}) < '{row['end_date']}'"
)
# check for other date fields
if sql_loop_fn_dt_placeholder2:
if filter_id_type2 == "range":
sql = sql.replace(
sql_loop_fn_dt_placeholder2,
f" AND date({filter_day_id_field2}) >= '{new_start}' AND date({filter_day_id_field2}) < '{row['end_date']}'"
)
elif filter_id_type2 == "<":
sql = sql.replace(
sql_loop_fn_dt_placeholder2,
f" AND date({filter_day_id_field2}) < '{row['end_date']}'"
)
if sql_loop_fn_dt_placeholder3:
if filter_day_id_field3 == "range":
sql = sql.replace(
sql_loop_fn_dt_placeholder3,
f" AND date({filter_day_id_field3}) >= '{new_start}' AND date({filter_day_id_field3}) < '{row['end_date']}'"
)
elif filter_id_type3 == "<":
sql = sql.replace(
sql_loop_fn_dt_placeholder3,
f" AND date({filter_day_id_field3}) < '{row['end_date']}'"
)
else:
if sql_loop_fn_type == "fn":
sql = sql_loop_fn(
start=row['start_date'],
end=row['end_date'],
src=loop_src1,
src2=loop_src2,
src3=loop_src3
)
else:
# date range column for logging
sql = sql_loop_fn.replace(
saved_day_id_range_placeholder,
f" '{row['start_date']} to {row['end_date']}' as dt_range,"
)
# date range column for logging
sql = sql.replace(
saved_day_id_range_placeholder,
f" '{row['start_date']} to {row['end_date']}' as dt_range,"
)
sql = sql.replace(
sql_loop_fn_dt_placeholder1,
f" AND date({filter_day_id_field1}) >= '{row['start_date']}' AND date({filter_day_id_field1}) < '{row['end_date']}'"
)
# check for other date fields
if sql_loop_fn_dt_placeholder2:
if filter_id_type2 == "range":
sql = sql.replace(
sql_loop_fn_dt_placeholder2,
f" AND date({filter_day_id_field2}) >= '{row['start_date']}' AND date({filter_day_id_field2}) < '{row['end_date']}'"
)
elif filter_id_type2 == "<":
sql = sql.replace(
sql_loop_fn_dt_placeholder2,
f" AND date({filter_day_id_field2}) < '{row['end_date']}'"
)
if sql_loop_fn_dt_placeholder3:
if filter_id_type2 == "range":
sql = sql.replace(
sql_loop_fn_dt_placeholder3,
f" AND date({filter_day_id_field3}) >= '{row['start_date']}' AND date({filter_day_id_field3}) < '{row['end_date']}'"
)
elif filter_id_type3 == "<":
sql = sql.replace(
sql_loop_fn_dt_placeholder3,
f" AND date({filter_day_id_field3}) < '{row['end_date']}'"
)
if idx == 0 and override:
sql_prefix = f"CREATE TABLE {tgt_tbl} AS "
else:
sql_prefix = f"INSERT INTO {tgt_tbl} "
Export.dump_sql(obj=sql_prefix + sql,
dir=log_dir + f"{tgt_tbl}_{idx}.sql")
try:
cls.run_query_pg(conn=conn, sql=sql_prefix + sql)
except Exception as e:
print(str(e))
rptg_freq = "W"
conn.commit()
break
# if the insert failed on a monthly level, cycle down to weekly level
if rptg_freq == "W":
print("Insert failed on monthly level...cycling down to weekly")
# getting max day id value
try:
sql = f'''select max(date(trim(substring(dt_range,regexp_instr(dt_range,'to ')+3,10)))) as day_idnt FROM {tgt_tbl}'''
saved_dates = pd.read_sql_query(sql=sql, con=conn)
except:
conn.commit()
saved_dates = pd.DataFrame({"day_idnt": ["1999-12-31"]}) # arbitrarily old date
saved_date_dt = \
datetime(
year=int(str(saved_dates['day_idnt'].astype(str).values[0]).split("-")[0]),
month=int(str(saved_dates['day_idnt'].astype(str).values[0]).split("-")[1]),
day=int(str(saved_dates['day_idnt'].astype(str).values[0]).split("-")[2])
).replace(day=1).strftime("%Y-%m-%d")
rptg_dates_wk = rptg_dates_wk[
pd.to_datetime(rptg_dates_wk['start_date']) >= \
pd.to_datetime(saved_date_dt)].reset_index(drop=True)
for idx, row in rptg_dates_wk.iterrows():
print(f'''{row['start_date']} to {row['end_date']}''')
if idx == 0 and saved_dates['day_idnt'][0] != pd.to_datetime(row['start_date']):
print(Fore.RED + f'''latest saved data date in table is {str(saved_dates['day_idnt'][0])} ...''')
# bump up start range:
new_start = str(pd.to_datetime(str(saved_dates['day_idnt'][0])) + pd.DateOffset(1))[:10]
print(Fore.RED + f'''revising start date to: {new_start} to {row['end_date']}''')
if sql_loop_fn_type == "fn":
sql = sql_loop_fn(src=loop_src1,
src2=loop_src2,
src3=loop_src3,
start=new_start,
end=row['end_date'])
else:
# date range column for logging
sql = sql_loop_fn.replace(
saved_day_id_range_placeholder,
f" '{new_start} to {row['end_date']}' as dt_range,"
)
sql = sql.replace(
sql_loop_fn_dt_placeholder1,
f" AND date({filter_day_id_field1}) >= '{new_start}' AND date({filter_day_id_field1}) < '{row['end_date']}'"
)
# check for other date fields
if sql_loop_fn_dt_placeholder2:
sql = sql.replace(
sql_loop_fn_dt_placeholder2,
f" AND date({filter_day_id_field2}) >= '{new_start}' AND date({filter_day_id_field2}) < '{row['end_date']}'"
)
if sql_loop_fn_dt_placeholder3:
sql = sql.replace(
sql_loop_fn_dt_placeholder3,
f" AND date({filter_day_id_field3}) >= '{new_start}' AND date({filter_day_id_field3}) < '{row['end_date']}'"
)
else:
if sql_loop_fn_type == "fn":
sql = sql_loop_fn(
start=row['start_date'],
end=row['end_date'],
src=loop_src1,
src2=loop_src2,
src3=loop_src3
)
else:
# date range column for logging
sql = sql_loop_fn.replace(
saved_day_id_range_placeholder,
f" '{row['start_date']} to {row['end_date']}' as dt_range,"
)
sql = sql.replace(
sql_loop_fn_dt_placeholder1,
f" AND date({filter_day_id_field1}) >= '{row['start_date']}' AND date({filter_day_id_field1}) < '{row['end_date']}'"
)
# check for other date fields
if sql_loop_fn_dt_placeholder2:
sql = sql.replace(
sql_loop_fn_dt_placeholder2,
f" AND date({filter_day_id_field2}) >= '{row['start_date']}' AND date({filter_day_id_field2}) < '{row['end_date']}'"
)
if sql_loop_fn_dt_placeholder3:
sql = sql.replace(
sql_loop_fn_dt_placeholder3,
f" AND date({filter_day_id_field3}) >= '{row['start_date']}' AND date({filter_day_id_field3}) < '{row['end_date']}'"
)
if idx == 0 and override:
sql_prefix = f"CREATE TABLE {tgt_tbl} AS "
else:
sql_prefix = f"INSERT INTO {tgt_tbl} "
Export.dump_sql(obj=sql_prefix + sql,
dir=log_dir + f"{tgt_tbl}_{idx}.sql")
cls.run_query_pg(conn=conn, sql=sql_prefix + sql)
[docs] @classmethod
def sequential_load_pg_wk(cls,
rptg_dates,
override,
tgt_tbl,
conn,
rptg_wk,
rptg_wk_start,
rptg_wk_end,
sql_loop_fn,
# filter dates set 1
filter_dt_field1=False,
filter_dt_type1=False,
filter_dt_placeholder1=False,
# filter dates set 2
filter_dt_field2=False,
filter_dt_type2=False,
filter_dt_placeholder2=False,
# filter dates set 3
filter_dt_field3=False,
filter_dt_type3=False,
filter_dt_placeholder3=False,
log_dir=False
):
"""
:param rptg_dates:
:param override:
:param tgt_tbl:
:param conn:
:param rptg_wk:
:param rptg_wk_start:
:param rptg_wk_end:
:param sql_loop_fn:
:param filter_dt_field1:
:param filter_dt_type1:
:param filter_dt_placeholder1:
:param filter_dt_field2:
:param filter_dt_type2:
:param filter_dt_placeholder2:
:param filter_dt_field3:
:param filter_dt_type3:
:param filter_dt_placeholder3:
:param log_dir:
:return:
"""
# dropping table if override = True
if override:
print(f'''table override True: Dropping table: {tgt_tbl} ''')
try:
cls.run_query_pg(conn=conn, sql=f'''drop table {tgt_tbl}''')
except:
conn.commit()
pass
for idx, row in rptg_dates.iterrows():
print(f'''{row['start_date']} to {row['end_date']}''')
# date range column for logging
sql = sql_loop_fn.replace(
rptg_wk,
f" '{row['rptg_wk']}' as rptg_wk,"
)
sql = sql.replace(
rptg_wk_start,
f" '{row['start_date']}' as rptg_wk_start,"
)
sql = sql.replace(
rptg_wk_end,
f" '{row['end_date']}' as rptg_wk_end,"
)
# date filters
sql = sql.replace(
filter_dt_placeholder1,
f" AND date({filter_dt_field1}) > '{row['start_date']}' "
f" AND date({filter_dt_field1}) <= '{row['end_date']}'"
)
# check for other date fields
if filter_dt_placeholder2:
if filter_dt_type2 == "range":
sql = sql.replace(
filter_dt_placeholder2,
f" AND date({filter_dt_field2}) > '{row['start_date']}' "
f" AND date({filter_dt_field2}) <= '{row['end_date']}'"
)
elif filter_dt_type2 == "<=":
sql = sql.replace(
filter_dt_placeholder2,
f" AND date({filter_dt_field2}) <= '{row['end_date']}'"
)
if filter_dt_placeholder3:
if filter_dt_type3 == "range":
sql = sql.replace(
filter_dt_placeholder3,
f" AND date({filter_dt_field3}) > '{row['start_date']}' "
f" AND date({filter_dt_field3}) <= '{row['end_date']}'"
)
elif filter_dt_type3 == "<=":
sql = sql.replace(
filter_dt_placeholder3,
f" AND date({filter_dt_field3}) <= '{row['end_date']}'"
)
if idx == 0 and override:
sql_prefix = f"CREATE TABLE {tgt_tbl} AS "
else:
sql_prefix = f"INSERT INTO {tgt_tbl} "
Export.dump_sql(obj=sql_prefix + sql,
dir=log_dir + f"{tgt_tbl}_{idx}.sql")
try:
cls.run_query_pg(conn=conn, sql=sql_prefix + sql)
except Exception as e:
print(str(e))
conn.commit()
break
[docs]class Redshift:
"""
Generic functions for Redshift SQL queries and ETL.
.. image:: ../images_source/db_etl_tools/redshift1.png
"""
[docs] @classmethod
def run_query_rs(cls, conn, sql):
"""
Executes a SQL statement with a Redshift database connection.
:param conn: Redshift database connection object,
:param sql: SQL Statement to execute.
:return: Elapsed time to execute query.
"""
rptg_tstart = datetime.now()
cur = conn.cursor()
cur.execute(sql)
conn.commit()
rptg_tend = datetime.now()
tdelta = rptg_tend - rptg_tstart
tdelta = tdelta.total_seconds() / 60
print(Fore.RED + f"Runtime: {tdelta}")
[docs] @classmethod
def insert_val_rs(cls, col_list, val_list, tbl_name):
"""
Creates SQL to run an INSERT operation of a given Redshift table.
:param col_list: List of columns to INSERT or UPDATE.
:param val_list: List of values to INSERT or UPDATE.
:param tbl_name: Name of Postgres table.
:return: SQL to run an INSERT statement.
"""
sql = f'''
INSERT INTO {tbl_name}
(
{str(col_list).replace("[", "").replace("]", "").replace("'", "")}
) values (
{str(val_list).replace("[", "").replace("]", "")}
)
'''
return sql
[docs] @classmethod
def upsert_tbl_rs(cls, src_tbl, tgt_tbl, src_join_cols, src_insert_cols,
src_update_cols=False, update_compare_cols=False):
"""
Creates SQL to run an UPSERT (INSERT new records or UPDATE existing records) operation of a given Redshift table.
:param src_tbl: Redshift source table that contains data to be merged from.
:param tgt_tbl: Redshift target table to receive UPSERT operation.
:param src_join_cols: Columns to use to join source and target tables.
:param src_insert_cols: Columns to be inserted from source table.
:param src_update_cols: Columns to be updated from source table.
:param update_compare_cols: Columns to use to compare values across source and target tables.
:return: A SQL Insert statement and a SQL Update statement.
"""
src_join_cols_ = (
str([f"t.{c} = s.{c} AND "
for c in src_join_cols])
.replace("[", "")
.replace("]", "")
.replace("'", "")
.replace(",", "")
)
src_join_cols_ = src_join_cols_[:src_join_cols_.rfind("AND")]
src_join_cols_f = (
str([f"t.{c} IS NULL AND "
for c in src_join_cols])
.replace("[", "")
.replace("]", "")
.replace("'", "")
.replace(",", "")
)
src_join_cols_f = src_join_cols_f[:src_join_cols_f.rfind("AND")]
src_insert_cols_ = (
str([f"s.{c}"
for c in src_insert_cols])
.replace("[", "")
.replace("]", "")
.replace("'", "")
)
if src_update_cols:
src_update_cols_ = (
str([f"{c} = s.{c},"
for c in src_update_cols])
.replace("[", "")
.replace("]", "")
.replace("', '", "")
.replace("'", "")
)
src_update_cols_ = src_update_cols_[:src_update_cols_.rfind(",")]
# update join statement
src_join_cols2_ = src_join_cols_.replace("t.", f"{tgt_tbl}.")
if update_compare_cols:
update_compare_cols_ = (
str([f"s.{c} != {tgt_tbl}.{c},"
for c in update_compare_cols])
.replace("[", "")
.replace("]", "")
.replace("', '", "")
.replace("'", "")
)
update_compare_cols_ = update_compare_cols_[:update_compare_cols_.rfind(",")]
src_join_cols2_ = src_join_cols2_ + " AND " + update_compare_cols_
# src_join_cols2_ = src_join_cols2_.replace("t.", f"{tgt_tbl}.")
# https://dwgeek.com/amazon-redshift-merge-statement-alternative-and-example.html/
sql_update = f'''
/* Update records*/
UPDATE {tgt_tbl}
SET {src_update_cols_}
FROM {src_tbl} s
WHERE {src_join_cols2_}
'''.replace("\n", " ")
else:
sql_update = ""
sql_insert = f'''
/* Insert records*/
INSERT INTO {tgt_tbl}
SELECT {src_insert_cols_}
FROM {src_tbl} s
LEFT JOIN {tgt_tbl} t
ON {src_join_cols_}
WHERE {src_join_cols_f}
'''.replace("\n", " ")
return sql_update, sql_insert
[docs] @classmethod
def make_df_tbl_rs(cls, tbl_name, df):
"""
Creates SQL to run a CREATE TABLE statement based on a Pandas DataFrame.
:param tbl_name: Redshift table name.
:param df: Pandas DataFrame.
:return: CREATE TABLE SQL statement.
"""
# fix columns
df = Generic.make_db_cols(df)
# loop thru the columns
for idx, col in enumerate(df):
# find the max length of each field
col_desc = col + "-" + str(df[col].map(lambda x: len(str(x))).max())
# find the max value of each fields
try:
col_max = col + "-" + str(max(df[col]))
except:
col_max = col + "-" + 'NA'
if idx == 0:
col_desc_all = [col_desc]
col_max_all = [col_max]
else:
col_desc_all.append(col_desc)
col_max_all.append(col_max)
# make df of column lengths
col_desc_all = pd.DataFrame(col_desc_all)
col_desc_all.columns = ["char"]
col_desc_all['column'], col_desc_all['length'] = \
col_desc_all['char'].str.split('-', 1).str
# make df of column max
col_max_all = pd.DataFrame(col_max_all)
col_max_all.columns = ["char"]
col_max_all['column'], col_max_all['max'] = \
col_max_all['char'].str.split('-', 1).str
# make df of column dtypes
col_desc_types = pd.DataFrame(df.dtypes).reset_index()
col_desc_types.columns = ["column", "type"]
# join dfs
col_desc_all = pd.merge(
col_desc_all,
col_desc_types,
how="inner",
on="column")
col_desc_all = pd.merge(
col_desc_all,
col_max_all[["column", "max"]],
how="inner",
on="column")
# define data type mapping (pandas --> teradata)
d = {'object': 'VARCHAR',
'int64': 'INTEGER',
'Int64': 'INTEGER',
'int32': 'INTEGER',
'bool': 'VARCHAR',
'float64': 'FLOAT',
'datetime64[ns]': 'TIMESTAMP',
"datetime64[ns, UTC]": "TIMESTAMP"}
col_desc_all = col_desc_all.astype(str).replace(d)
# list the columns where you want to specify the lengths
col_desc_all['concat'] = np.where(
# if varchar, use the length of the longest char
col_desc_all['type'] == "VARCHAR",
col_desc_all['column'] + " " + \
col_desc_all['type'].astype(str) + \
"(" + col_desc_all['length'] + ")",
col_desc_all['column'] + " " + \
col_desc_all['type'].astype(str))
# convert integers with a max val over certain amount to varchar
for idx, row in col_desc_all.iterrows():
if str(row['type']) == 'INTEGER' and row['max'] != "nan" and int(row['max']) > 2147483647:
val = row['concat']
col_desc_all.loc[idx, 'concat'] = \
val.replace(
" INTEGER",
f" VARCHAR({row['length']})")
col_desc_all = col_desc_all.apply(', '.join).reset_index()
col_desc_all.columns = ["index", "statement"]
statement = col_desc_all[col_desc_all['index'] == 'concat']
sql = statement['statement'].values
sql = str(sql)
sql = sql.replace("[", "")
sql = sql.replace("]", "")
sql = "CREATE TABLE " + tbl_name + " ( " + sql + " )"
sql = sql.replace("'", "")
return sql
[docs] @classmethod
def insert_df_rs(cls, cursor, conn, df, tbl_name):
"""
Executes an INSERT INTO statement for a given Pandas DataFrame into a Redshift table..
:param cursor: Redshift database cursor object.
:param conn: Redshift database connection object.
:param df: Pandas DataFrame to insert into a Redshift table.
:param tbl_name: Redshift table name.
:return: Elapsed time to execute query.
"""
df_load = df.replace({np.nan: None})
df_load = df_load.round(3)
df_columns = list(df_load)
# create (col1,col2,...)
columns = ",".join(df_columns)
values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
insert_stmt = "INSERT INTO {} ({}) {}".format(tbl_name, columns, values)
rptg_tstart = datetime.now()
psycopg2.extras.execute_batch(cursor, insert_stmt, df_load.values)
conn.commit()
rptg_tend = datetime.now()
tdelta = rptg_tend - rptg_tstart
tdelta = tdelta.total_seconds() / 60
print(Fore.RED + f"Runtime: {tdelta}")
[docs] @classmethod
def make_tbl_complete_rs(cls, df, tbl_name, conn, cursor, batch_size=False):
"""
Executes a series of SQL statements to CREATE and INSERT into a table from a Pandas DataFrame.
:param df: Pandas DataFrame to create a table from.
:param tbl_name: Name of table to be created.
:param conn: Redshift database connection object.
:param cursor: Redshift database cursor object.
:param batch_size: Records to load per batch.
:return: Elapsed time to execute query.
"""
# 1 drop the table
print(f"dropping table: {tbl_name}")
try:
cls.run_query_rs(sql=f"drop table {tbl_name}", conn=conn)
except:
print(f"table doesn't exist: {tbl_name}")
pass
# create the table
print(f"creating table: {tbl_name}")
sql = cls.make_tbl_rs(df=df, tbl_name=tbl_name)
print(sql)
cls.run_query_rs(sql=sql, conn=conn)
print(f"inserting DF values into table: {tbl_name}")
rptg_tstart = datetime.now()
cls.insert_rs(df=df, tbl=tbl_name, cursor=cursor, conn=conn, batch_size=batch_size)
rptg_tend = datetime.now()
tdelta = rptg_tend - rptg_tstart
tdelta = tdelta.total_seconds() / 60
print(Fore.RED + f"Runtime: {tdelta}")
[docs] @classmethod
def sequential_load_rs(cls,
override,
tgt_tbl,
conn,
dt_start,
dt_end,
saved_day_id_range_placeholder,
dt1_interval,
dt2_interval,
sql_loop_fn,
sql_loop_fn_type,
filter_day_id_field1=False,
sql_loop_fn_dt_placeholder1=False,
filter_day_id_field2=False,
filter_id_type2=False,
sql_loop_fn_dt_placeholder2=False,
filter_day_id_field3=False,
filter_id_type3=False,
sql_loop_fn_dt_placeholder3=False,
loop_src1=False,
loop_src2=False,
loop_src3=False,
log_dir=False):
"""
:param override:
:param tgt_tbl:
:param conn:
:param dt_start:
:param dt_end:
:param saved_day_id_range_placeholder:
:param dt1_interval:
:param dt2_interval:
:param sql_loop_fn:
:param sql_loop_fn_type:
:param filter_day_id_field1:
:param sql_loop_fn_dt_placeholder1:
:param filter_day_id_field2:
:param filter_id_type2:
:param sql_loop_fn_dt_placeholder2:
:param filter_day_id_field3:
:param filter_id_type3:
:param sql_loop_fn_dt_placeholder3:
:param loop_src1:
:param loop_src2:
:param loop_src3:
:param log_dir:
:return:
"""
# define the month startend dates to loop through
rptg_dates = pd.date_range(dt_start, dt_end, freq=dt1_interval) - pd.offsets.MonthBegin(1)
rptg_dates = [str(x)[:10] for x in rptg_dates.to_list()]
rptg_dates = pd.DataFrame({
"start_date": rptg_dates,
"end_date": rptg_dates
})
rptg_dates['end_date'] = rptg_dates['end_date'].shift(-1)
rptg_dates = rptg_dates[pd.to_datetime(rptg_dates['start_date']) <= datetime.now()].dropna()
# define the weekly start/end dates to loop thru
rptg_dates_wk = pd.date_range(dt_start, dt_end, freq=dt2_interval)
rptg_dates_wk = [str(x)[:10] for x in rptg_dates_wk.to_list()]
rptg_dates_wk = pd.DataFrame({
"start_date": rptg_dates_wk,
"end_date": rptg_dates_wk
})
rptg_dates_wk['end_date'] = rptg_dates_wk['end_date'].shift(-1)
rptg_dates_wk = rptg_dates_wk[pd.to_datetime(rptg_dates_wk['start_date']) <= datetime.now()].dropna()
# dropping table if override = True
if override:
print(f'''table override True: Dropping table: {tgt_tbl} ''')
try:
cls.run_query_rs(conn=conn, sql=f'''drop table {tgt_tbl}''')
except:
conn.commit()
pass
# getting max day id value
try:
sql = f'''select max(date(trim(substring(dt_range,regexp_instr(dt_range,'to ')+3,10)))) as day_idnt FROM {tgt_tbl}'''
saved_dates = pd.read_sql_query(sql=sql, con=conn)
except:
conn.commit()
saved_dates = pd.DataFrame({"day_idnt": ["1999-12-31"]}) # arbitrarily old date
saved_date_dt = \
datetime(
year=int(str(saved_dates['day_idnt'].astype(str).values[0]).split("-")[0]),
month=int(str(saved_dates['day_idnt'].astype(str).values[0]).split("-")[1]),
day=int(str(saved_dates['day_idnt'].astype(str).values[0]).split("-")[2])
).replace(day=1).strftime("%Y-%m-%d")
rptg_dates = rptg_dates[
pd.to_datetime(rptg_dates['start_date']) >= \
pd.to_datetime(saved_date_dt)].reset_index(drop=True)
print("Starting load from:")
print(rptg_dates.head(1))
rptg_freq = "M"
for idx, row in rptg_dates.iterrows():
print(f'''{row['start_date']} to {row['end_date']}''')
# if idx == 0:
# break
if idx == 0 and saved_dates['day_idnt'][0] != pd.to_datetime(row['start_date']):
print(Fore.RED + f'''latest saved data date in table is {str(saved_dates['day_idnt'][0])} ...''')
# bump up start range:
new_start = str(pd.to_datetime(str(saved_dates['day_idnt'][0])) + pd.DateOffset(1))[:10]
print(Fore.RED + f'''revising start date to: {new_start} to {row['end_date']}''')
# if its a function, pass in params
if sql_loop_fn_type == "fn":
sql = sql_loop_fn(src=loop_src1,
src2=loop_src2,
src3=loop_src3,
start=new_start,
end=row['end_date'])
# otherwise, we will just replace strings
else:
# date range column for logging
sql = sql_loop_fn.replace(
saved_day_id_range_placeholder,
f" '{new_start} to {row['end_date']}' as dt_range,"
)
# date filters
sql = sql.replace(
sql_loop_fn_dt_placeholder1,
f" AND date({filter_day_id_field1}) >= '{new_start}' AND date({filter_day_id_field1}) < '{row['end_date']}'"
)
# check for other date fields
if sql_loop_fn_dt_placeholder2:
if filter_id_type2 == "range":
sql = sql.replace(
sql_loop_fn_dt_placeholder2,
f" AND date({filter_day_id_field2}) >= '{new_start}' AND date({filter_day_id_field2}) < '{row['end_date']}'"
)
elif filter_id_type2 == "<":
sql = sql.replace(
sql_loop_fn_dt_placeholder2,
f" AND date({filter_day_id_field2}) < '{row['end_date']}'"
)
if sql_loop_fn_dt_placeholder3:
if filter_day_id_field3 == "range":
sql = sql.replace(
sql_loop_fn_dt_placeholder3,
f" AND date({filter_day_id_field3}) >= '{new_start}' AND date({filter_day_id_field3}) < '{row['end_date']}'"
)
elif filter_id_type3 == "<":
sql = sql.replace(
sql_loop_fn_dt_placeholder3,
f" AND date({filter_day_id_field3}) < '{row['end_date']}'"
)
else:
if sql_loop_fn_type == "fn":
sql = sql_loop_fn(
start=row['start_date'],
end=row['end_date'],
src=loop_src1,
src2=loop_src2,
src3=loop_src3
)
else:
# date range column for logging
sql = sql_loop_fn.replace(
saved_day_id_range_placeholder,
f" '{row['start_date']} to {row['end_date']}' as dt_range,"
)
# date range column for logging
sql = sql.replace(
saved_day_id_range_placeholder,
f" '{row['start_date']} to {row['end_date']}' as dt_range,"
)
sql = sql.replace(
sql_loop_fn_dt_placeholder1,
f" AND date({filter_day_id_field1}) >= '{row['start_date']}' AND date({filter_day_id_field1}) < '{row['end_date']}'"
)
# check for other date fields
if sql_loop_fn_dt_placeholder2:
if filter_id_type2 == "range":
sql = sql.replace(
sql_loop_fn_dt_placeholder2,
f" AND date({filter_day_id_field2}) >= '{row['start_date']}' AND date({filter_day_id_field2}) < '{row['end_date']}'"
)
elif filter_id_type2 == "<":
sql = sql.replace(
sql_loop_fn_dt_placeholder2,
f" AND date({filter_day_id_field2}) < '{row['end_date']}'"
)
if sql_loop_fn_dt_placeholder3:
if filter_id_type2 == "range":
sql = sql.replace(
sql_loop_fn_dt_placeholder3,
f" AND date({filter_day_id_field3}) >= '{row['start_date']}' AND date({filter_day_id_field3}) < '{row['end_date']}'"
)
elif filter_id_type3 == "<":
sql = sql.replace(
sql_loop_fn_dt_placeholder3,
f" AND date({filter_day_id_field3}) < '{row['end_date']}'"
)
if idx == 0 and override:
sql_prefix = f"CREATE TABLE {tgt_tbl} AS "
else:
sql_prefix = f"INSERT INTO {tgt_tbl} "
Export.dump_sql(obj=sql_prefix + sql,
dir=log_dir + f"{tgt_tbl}_{idx}.sql")
try:
cls.run_query_rs(conn=conn, sql=sql_prefix + sql)
except Exception as e:
print(str(e))
rptg_freq = "W"
conn.commit()
break
# if the insert failed on a monthly level, cycle down to weekly level
if rptg_freq == "W":
print("Insert failed on monthly level...cycling down to weekly")
# getting max day id value
try:
sql = f'''select max(date(trim(substring(dt_range,regexp_instr(dt_range,'to ')+3,10)))) as day_idnt FROM {tgt_tbl}'''
saved_dates = pd.read_sql_query(sql=sql, con=conn)
except:
conn.commit()
saved_dates = pd.DataFrame({"day_idnt": ["1999-12-31"]}) # arbitrarily old date
saved_date_dt = \
datetime(
year=int(str(saved_dates['day_idnt'].astype(str).values[0]).split("-")[0]),
month=int(str(saved_dates['day_idnt'].astype(str).values[0]).split("-")[1]),
day=int(str(saved_dates['day_idnt'].astype(str).values[0]).split("-")[2])
).replace(day=1).strftime("%Y-%m-%d")
rptg_dates_wk = rptg_dates_wk[
pd.to_datetime(rptg_dates_wk['start_date']) >= \
pd.to_datetime(saved_date_dt)].reset_index(drop=True)
for idx, row in rptg_dates_wk.iterrows():
print(f'''{row['start_date']} to {row['end_date']}''')
if idx == 0 and saved_dates['day_idnt'][0] != pd.to_datetime(row['start_date']):
print(Fore.RED + f'''latest saved data date in table is {str(saved_dates['day_idnt'][0])} ...''')
# bump up start range:
new_start = str(pd.to_datetime(str(saved_dates['day_idnt'][0])) + pd.DateOffset(1))[:10]
print(Fore.RED + f'''revising start date to: {new_start} to {row['end_date']}''')
if sql_loop_fn_type == "fn":
sql = sql_loop_fn(src=loop_src1,
src2=loop_src2,
src3=loop_src3,
start=new_start,
end=row['end_date'])
else:
# date range column for logging
sql = sql_loop_fn.replace(
saved_day_id_range_placeholder,
f" '{new_start} to {row['end_date']}' as dt_range,"
)
sql = sql.replace(
sql_loop_fn_dt_placeholder1,
f" AND date({filter_day_id_field1}) >= '{new_start}' AND date({filter_day_id_field1}) < '{row['end_date']}'"
)
# check for other date fields
if sql_loop_fn_dt_placeholder2:
sql = sql.replace(
sql_loop_fn_dt_placeholder2,
f" AND date({filter_day_id_field2}) >= '{new_start}' AND date({filter_day_id_field2}) < '{row['end_date']}'"
)
if sql_loop_fn_dt_placeholder3:
sql = sql.replace(
sql_loop_fn_dt_placeholder3,
f" AND date({filter_day_id_field3}) >= '{new_start}' AND date({filter_day_id_field3}) < '{row['end_date']}'"
)
else:
if sql_loop_fn_type == "fn":
sql = sql_loop_fn(
start=row['start_date'],
end=row['end_date'],
src=loop_src1,
src2=loop_src2,
src3=loop_src3
)
else:
# date range column for logging
sql = sql_loop_fn.replace(
saved_day_id_range_placeholder,
f" '{row['start_date']} to {row['end_date']}' as dt_range,"
)
sql = sql.replace(
sql_loop_fn_dt_placeholder1,
f" AND date({filter_day_id_field1}) >= '{row['start_date']}' AND date({filter_day_id_field1}) < '{row['end_date']}'"
)
# check for other date fields
if sql_loop_fn_dt_placeholder2:
sql = sql.replace(
sql_loop_fn_dt_placeholder2,
f" AND date({filter_day_id_field2}) >= '{row['start_date']}' AND date({filter_day_id_field2}) < '{row['end_date']}'"
)
if sql_loop_fn_dt_placeholder3:
sql = sql.replace(
sql_loop_fn_dt_placeholder3,
f" AND date({filter_day_id_field3}) >= '{row['start_date']}' AND date({filter_day_id_field3}) < '{row['end_date']}'"
)
if idx == 0 and override:
sql_prefix = f"CREATE TABLE {tgt_tbl} AS "
else:
sql_prefix = f"INSERT INTO {tgt_tbl} "
Export.dump_sql(obj=sql_prefix + sql,
dir=log_dir + f"{tgt_tbl}_{idx}.sql")
cls.run_query_rs(conn=conn, sql=sql_prefix + sql)
[docs] @classmethod
def sequential_load_rs_wk(cls,
rptg_dates,
override,
tgt_tbl,
conn,
rptg_wk,
rptg_wk_start,
rptg_wk_end,
sql_loop_fn,
# filter dates set 1
filter_dt_field1=False,
filter_dt_type1=False,
filter_dt_placeholder1=False,
# filter dates set 2
filter_dt_field2=False,
filter_dt_type2=False,
filter_dt_placeholder2=False,
# filter dates set 3
filter_dt_field3=False,
filter_dt_type3=False,
filter_dt_placeholder3=False,
log_dir=False
):
"""
:param rptg_dates:
:param override:
:param tgt_tbl:
:param conn:
:param rptg_wk:
:param rptg_wk_start:
:param rptg_wk_end:
:param sql_loop_fn:
:param filter_dt_field1:
:param filter_dt_type1:
:param filter_dt_placeholder1:
:param filter_dt_field2:
:param filter_dt_type2:
:param filter_dt_placeholder2:
:param filter_dt_field3:
:param filter_dt_type3:
:param filter_dt_placeholder3:
:param log_dir:
:return:
"""
# dropping table if override = True
if override:
print(f'''table override True: Dropping table: {tgt_tbl} ''')
try:
cls.run_query_rs(conn=conn, sql=f'''drop table {tgt_tbl}''')
except:
conn.commit()
pass
for idx, row in rptg_dates.iterrows():
print(f'''{row['start_date']} to {row['end_date']}''')
# date range column for logging
sql = sql_loop_fn.replace(
rptg_wk,
f" '{row['rptg_wk']}' as rptg_wk,"
)
sql = sql.replace(
rptg_wk_start,
f" '{row['start_date']}' as rptg_wk_start,"
)
sql = sql.replace(
rptg_wk_end,
f" '{row['end_date']}' as rptg_wk_end,"
)
# date filters
sql = sql.replace(
filter_dt_placeholder1,
f" AND date({filter_dt_field1}) > '{row['start_date']}' "
f" AND date({filter_dt_field1}) <= '{row['end_date']}'"
)
# check for other date fields
if filter_dt_placeholder2:
if filter_dt_type2 == "range":
sql = sql.replace(
filter_dt_placeholder2,
f" AND date({filter_dt_field2}) > '{row['start_date']}' "
f" AND date({filter_dt_field2}) <= '{row['end_date']}'"
)
elif filter_dt_type2 == "<=":
sql = sql.replace(
filter_dt_placeholder2,
f" AND date({filter_dt_field2}) <= '{row['end_date']}'"
)
if filter_dt_placeholder3:
if filter_dt_type3 == "range":
sql = sql.replace(
filter_dt_placeholder3,
f" AND date({filter_dt_field3}) > '{row['start_date']}' "
f" AND date({filter_dt_field3}) <= '{row['end_date']}'"
)
elif filter_dt_type3 == "<=":
sql = sql.replace(
filter_dt_placeholder3,
f" AND date({filter_dt_field3}) <= '{row['end_date']}'"
)
if idx == 0 and override:
sql_prefix = f"CREATE TABLE {tgt_tbl} AS "
else:
sql_prefix = f"INSERT INTO {tgt_tbl} "
Export.dump_sql(obj=sql_prefix + sql,
dir=log_dir + f"{tgt_tbl}_{idx}.sql")
try:
cls.run_query_rs(conn=conn, sql=sql_prefix + sql)
except Exception as e:
print(str(e))
conn.commit()
break
[docs]class Teradata:
"""
Generic functions for Teradata SQL queries and ETL.
.. image:: ../images_source/db_etl_tools/teradata.png
"""
[docs] @classmethod
def insert_td(cls, tbl, df, conn, batch_size=False, date_cols=False):
"""
Executes an INSERT INTO statement for a given Pandas DataFrame.
:param tbl: Teradata table name.
:param df: Pandas DataFrame.
:param conn: Teradata connection object.
:param batch_size: Records to load per batch.
:param date_cols: A list of date columns to convert to Pandas datetime.
:return: Printed SQL statements for each step.
"""
print(f"batch size: {batch_size}")
if type(df) != type(pd.DataFrame()):
print("Detected something other than a DataFrame\n Please use a pandas DataFrame")
raise TypeError('Unsupported object type!')
if date_cols:
# Convert columns to a date object for loading
# TD is picky, and wants 'YYYY-MM-DD' dates
print(' ...Attempting to convert elligible columns to date')
for idx, column in enumerate(date_cols):
df[column] = pd.to_datetime(df[column], errors='ignore')
date_columns = list(df.select_dtypes(include=[np.datetime64]).columns)
print(f" {len(date_columns)} date column(s) found")
for column in date_columns:
df[column] = df[column].dt.strftime('%Y-%m-%d')
sql_vars = ('?, ' * (len(df.columns) - 1)) + '?'
sql = f"insert into {tbl} values({sql_vars})"
data = df
print(" ...Beginning bulk insert operation")
if not batch_size:
batch_size = 10000
try:
print(f"{len(range(0, int(np.floor(df.shape[0] / batch_size) + 1)))} batches found")
for i in range(0, int(np.floor(df.shape[0] / batch_size) + 1)):
data_sample = [tuple(x) for x in data.iloc[batch_size * i:batch_size * (i + 1), :].values]
conn.executemany(sql, data_sample, batch=True)
print(sql)
print(f" ...Completed batch {i} of {len(range(0, int(np.floor(df.shape[0] / batch_size) + 1)))}")
except Exception as e:
print(data.head())
raise e
print(' ...Successfully loaded Data into Teradata')
return None
[docs] @classmethod
def run_query_td(cls, conn, sql):
"""
Executes a SQL statement with a Teradata database connection.
:param conn: Teradata database connection object.
:param sql: SQL statement to execute.
:return: Elapsed time to execute query.
"""
rptg_tstart = datetime.now()
conn.execute(sql)
rptg_tend = datetime.now()
tdelta = rptg_tend - rptg_tstart
tdelta = tdelta.total_seconds() / 60
print(Fore.RED + f"Runtime: {tdelta}")
[docs] @classmethod
def make_tbl_td(cls, df, tbl_name):
"""
Creates SQL to run a CREATE TABLE statement based on a Pandas DataFrame.
:param df: Pandas DataFrame.
:param tbl_name: Teradata table name.
:return: CREATE TABLE SQL statement.
"""
# fix columns
df = cls.make_db_cols(df)
# loop thru the columns
for idx, col in enumerate(df):
# find the max length of each field
col_desc = col + "-" + str(df[col].map(lambda x: len(str(x))).max())
# find the max value of each fields
try:
col_max = col + "-" + str(max(df[col]))
except:
col_max = col + "-" + 'NA'
if idx == 0:
col_desc_all = [col_desc]
col_max_all = [col_max]
else:
col_desc_all.append(col_desc)
col_max_all.append(col_max)
# make df of column lengths
col_desc_all = pd.DataFrame(col_desc_all)
col_desc_all.columns = ["char"]
col_desc_all['column'], col_desc_all['length'] = \
col_desc_all['char'].str.split('-', 1).str
# make df of column max
col_max_all = pd.DataFrame(col_max_all)
col_max_all.columns = ["char"]
col_max_all['column'], col_max_all['max'] = \
col_max_all['char'].str.split('-', 1).str
# make df of column dtypes
col_desc_types = pd.DataFrame(df.dtypes).reset_index()
col_desc_types.columns = ["column", "type"]
# join dfs
col_desc_all = pd.merge(
col_desc_all,
col_desc_types,
how="inner",
on="column")
col_desc_all = pd.merge(
col_desc_all,
col_max_all[["column", "max"]],
how="inner",
on="column")
# define data type mapping (pandas --> teradata)
d = {'object': 'VARCHAR',
'int64': 'INTEGER',
"Int64": "INTEGER",
'int32': 'INTEGER',
'bool': 'VARCHAR',
'float64': 'FLOAT',
'datetime64[ns]': 'DATE'}
col_desc_all = col_desc_all.replace(d)
# list the columns where you want to specify the lengths
col_desc_all['concat'] = np.where(
# if varchar, use the length of the longest char
col_desc_all['type'] == "VARCHAR",
col_desc_all['column'] + " " + \
col_desc_all['type'].astype(str) + \
"(" + col_desc_all['length'] + ")",
col_desc_all['column'] + " " + \
col_desc_all['type'].astype(str))
# convert integers with a max val over certain amount to varchar
for idx, row in col_desc_all.iterrows():
if str(row['type']) == 'INTEGER' and row['max'] != "nan" and int(row['max']) > 2147483647:
val = row['concat']
col_desc_all.loc[idx, 'concat'] = \
val.replace(
" INTEGER",
f" VARCHAR({row['length']})")
col_desc_all = col_desc_all.apply(', '.join).reset_index()
col_desc_all.columns = ["index", "statement"]
statement = col_desc_all[col_desc_all['index'] == 'concat']
sql = statement['statement'].values
sql = str(sql)
sql = sql.replace("[", "")
sql = sql.replace("]", "")
sql = "CREATE TABLE " + tbl_name + " ( " + sql + " )"
sql = sql.replace("'", "")
return sql
[docs] @classmethod
def make_tbl_complete_td(cls, df, tbl_name, conn, batch_size=False):
"""
Executes a series of SQL statements to CREATE and INSERT into a table from a Pandas DataFrame.
:param df: Pandas DataFrame to create a table from.
:param tbl_name: Name of table to be created.
:param conn: Teradata database connection object.
:param batch_size: Records to load per batch.
:return: Elapsed time to execute query.
"""
# 1 drop the table
print(f"dropping table: {tbl_name}")
try:
cls.run_query_td(sql=f"drop table {tbl_name}", conn=conn)
except:
print(f"table doesn't exist: {tbl_name}")
pass
# create the table
print(f"creating table: {tbl_name}")
sql = cls.make_tbl_td(df=df, tbl_name=tbl_name)
print(sql)
cls.run_query_td(sql=sql, conn=conn)
print(f"inserting DF values into table: {tbl_name}")
rptg_tstart = datetime.now()
cls.insert_td(df=df, tbl=tbl_name, conn=conn, batch_size=batch_size)
rptg_tend = datetime.now()
tdelta = rptg_tend - rptg_tstart
tdelta = tdelta.total_seconds() / 60
print(Fore.RED + f"Runtime: {tdelta}")