Module utils.brocolib_utils.fast_dbt.new_generator
View Source
import sys
import re
from io import StringIO
from typing import Union
from collections import OrderedDict
from ruamel.yaml.scalarstring import DoubleQuotedScalarString
from ruamel.yaml import YAML
from brocolib_utils.ddm import sheet_parser, sources_parser, metrics_parser, exposures_parser
from brocolib_utils.ddm import ddm_settings
from brocolib_utils.utils import datalake
from brocolib_utils import settings
import pandas as pd
# region Generic
RAW_SOURCE_SQL = """with source as (
select * from {{{{ source('{source_name}', '{table_name}') }}}}
),
"""
RAW_PREPARED_SOURCE_SQL = """
prepared_source as (
select
{columns_cast}
from source
)
select * from prepared_source
"""
COL_CAST_INTERLINES = """,
"""
COL_CAST_FIRST_LINE = ""
REGEX_METRICS_FIlTER = r"(?P<field>[\D\_]*)\s(?P<operator>is|\=|\>|\<|\>\=|\<\=|\!\=|\<\>)\s(?P<value>\'?.*\'?)"
REGEX_METRICS_WINDOW = r"(?P<count>[\d]*)\s(?P<period>day|week|month|year|all_time)s?"
def yaml_to_stdout(dc:dict):
yaml = YAML()
yaml.Representer.add_representer(OrderedDict, yaml.Representer.represent_dict)
yaml.dump(dc, sys.stdout)
def object_to_yaml_str(obj:Union[dict, OrderedDict], options=None) -> str:
yaml = YAML()
yaml.Representer.add_representer(OrderedDict, yaml.Representer.represent_dict)
if options == None: options = {}
string_stream = StringIO()
yaml.dump(obj, string_stream, **options)
output_str = string_stream.getvalue()
string_stream.close()
return output_str
# endregion
# region Sources
def generate_source_yaml_asdict(
source_name:str,
datalake_bucket:str = None
):
dc_source_tables = datalake.get_source(
source_name=source_name,
datalake_bucket=datalake_bucket
)
all_sources_df, spreadsheet = sheet_parser.ddm_sheet_to_df(
sheet_name=ddm_settings.DDM_SHEET_NAMES.SOURCES
)
source_description = all_sources_df.query(f"source_name=='{source_name}'")["description"].iloc[0] or None
all_tables_df, spreadsheet = sheet_parser.ddm_sheet_to_df(
sheet_name=ddm_settings.DDM_SHEET_NAMES.SOURCE_TABLES
)
all_columns_df, _ = sheet_parser.ddm_sheet_to_df(
sheet_name=ddm_settings.DDM_SHEET_NAMES.SOURCE_COLUMNS,
worksheet=spreadsheet
)
init_dbt_sources_dict = init_dbt_sources(
database=settings.DATALAKE_PROJECT,
source_name=source_name,
source_description=source_description
)
dbt_sources_dict = generate_loaded_tables_specs(
loaded_sources=dc_source_tables,
init_dbt_sources_dict=init_dbt_sources_dict,
all_tables=all_tables_df,
all_columns=all_columns_df
)
return dbt_sources_dict
def init_dbt_sources(
database:str,
source_name:str,
version: int = 2,
source_description:str = None
):
# dc_dbt_sources = OrderedDict()
dc_dbt_sources = {}
dc_dbt_sources["version"]=version
dc_dbt_sources["sources"]=[]
# for source in getattr(sources_dataframe, SOURCE_DATASET_COL).unique():
# dc_source = OrderedDict()
dc_source = {}
dc_source["name"] = source_name
dc_source["description"] = DoubleQuotedScalarString(source_description)
dc_source["database"] = database
dc_source["loader"] = "gcloud storage"
dc_source["tables"] = []
dc_dbt_sources["sources"].append(dict(dc_source))
return dc_dbt_sources
def generate_loaded_tables_specs(
loaded_sources:dict,
init_dbt_sources_dict:dict,
all_tables:pd.DataFrame,
all_columns:pd.DataFrame
):
for table, path in loaded_sources.items():
table_description = all_tables.query(f"table_name=='{table}'")["description"].iloc[0]
dc_table = {}
dc_table["name"] = table
dc_table["description"] = DoubleQuotedScalarString(table_description)
dc_table["external"] = {}
dc_table["external"]["location"] = DoubleQuotedScalarString(f"{path}*")
dc_table["external"]["options"] = {}
dc_table["external"]["options"]["format"] = "parquet"
dc_table["external"]["options"]["hive_partition_uri_prefix"] = DoubleQuotedScalarString(path)
# dc_table["external"]["partitions"] = [{"name":"year","data_type":"integer"},
# {"name":"month","data_type":"integer"}]
df_table_columns = all_columns.query(f"table_name=='{table}'")
dc_table["columns"] = []
for col in df_table_columns.itertuples():
dc_table["columns"].append(
{
"name":col.column_name,
"data_type":col.data_type,
"description":DoubleQuotedScalarString(col.description)
}
)
init_dbt_sources_dict["sources"][0]["tables"].append(dc_table)
return init_dbt_sources_dict
# endregion
# region Staging
def init_dbt_staging(version: int = 2):
dc = OrderedDict()
# return {"models":[]}
# return {"version":2, "models":[]}
dc["version"] = version
dc["models"] = []
return dc
def generate_staging_model_sql(source_name:str, table:str):
dc_columns = sources_parser.get_all_columns_of_tables(
tables=[table]
)
source_sql = RAW_SOURCE_SQL.format(source_name=source_name, table_name=table)
columns_cast = ""
for x, col in enumerate(dc_columns[table]):
endline_coma = COL_CAST_FIRST_LINE if x == 0 else COL_CAST_INTERLINES
columns_cast += endline_coma + f"cast({col['column_name']} as {col['data_type']}) as {col['column_functional_name']}"
prepared_source_sql = RAW_PREPARED_SOURCE_SQL.format(columns_cast=columns_cast)
staging_sql = source_sql + prepared_source_sql
return staging_sql
def generate_staging_model_yaml(source_name:str, tables:list) -> dict:
dc_columns = sources_parser.get_all_columns_of_tables(
tables=tables
)
all_source_tables_df, spreadsheet = sheet_parser.ddm_sheet_to_df(
sheet_name=ddm_settings.DDM_SHEET_NAMES.SOURCE_TABLES
)
staging_dc = init_dbt_staging()
for table, columns in dc_columns.items():
table_description = all_source_tables_df.query(f"table_name=='{table}'")["description"].iloc[0] or None
dc_table = OrderedDict()
dc_table["name"] = table
dc_table["description"] = DoubleQuotedScalarString(table_description)
dc_table["columns"] = []
for col in columns:
dc_col = OrderedDict()
dc_col["name"] = col["column_functional_name"]
dc_col["description"] = DoubleQuotedScalarString(col["description"])
dc_table["columns"].append(dc_col)
staging_dc["models"].append(dc_table)
return staging_dc
# endregion
# region Metrics
def init_dbt_metrics(version: int = 2):
return OrderedDict(
version = version,
metrics = []
)
def process_filters(metric_filters):
ls_filters = []
for filt in metric_filters.split(',\n'):
re_matchs = re.match(REGEX_METRICS_FIlTER, filt.strip(), flags=re.IGNORECASE)
matchs = re_matchs.groupdict()
ls_filters.append(
OrderedDict(
field = matchs["field"],
operator = matchs["operator"],
value = matchs["value"]
)
)
return ls_filters
def process_window(raw_window):
re_matchs = re.match(REGEX_METRICS_WINDOW, raw_window, flags=re.IGNORECASE)
matchs = re_matchs.groupdict()
return OrderedDict(
count = matchs["count"],
period = matchs["period"]
)
def generate_metrics(metric_list:list):
df_metrics = metrics_parser.get_metrics(metric_list)
metrics_dc = init_dbt_metrics()
for metric in df_metrics.itertuples():
metric_orderdict = OrderedDict(
name = metric.metric_name,
label = metric.label,
model = metric.model,
description = DoubleQuotedScalarString(metric.description),
calculation_method = metric.calculation_method,
expression = metric.expression,
timestamp = metric.timestamp,
time_grains = [tg.strip() for tg in metric.time_grains.split(',')],
dimensions = [d.strip() for d in metric.dimensions.split(',')],
filters = process_filters(metric.filters),
window = process_window(metric.window)
)
metric_orderdict["config"] = OrderedDict(
treat_null_values_as_zero = metric.treat_null_values_as_zero,
enabled = metric.enabled
)
metrics_dc["metrics"].append(metric_orderdict)
return metrics_dc
# endregion
# region Exposures
def init_dbt_exposures(version: int = 2):
return OrderedDict(
version = version,
exposures = []
)
def generate_exposures(exposure_list:list):
df_exposures = exposures_parser.get_exposures(exposure_list)
exposures_dc = init_dbt_exposures()
for exposure in df_exposures.itertuples():
exposure_orderdict = OrderedDict(
name = exposure.exposure_name,
label = DoubleQuotedScalarString(exposure.exposure_label),
maturity = exposure.maturity,
url = exposure.url,
description = DoubleQuotedScalarString(exposure.description),
depends_on = [do.strip() for do in exposure.depends_on.split(',')]
)
exposure_orderdict["owner"] = OrderedDict(
name = exposure.owner_name,
email = exposure.owner_email
)
exposures_dc["exposures"].append(exposure_orderdict)
return exposures_dc
# endregion
Variables
COL_CAST_FIRST_LINE
COL_CAST_INTERLINES
RAW_PREPARED_SOURCE_SQL
RAW_SOURCE_SQL
REGEX_METRICS_FIlTER
REGEX_METRICS_WINDOW
Functions
generate_exposures
def generate_exposures(
exposure_list: list
)
View Source
def generate_exposures(exposure_list:list):
df_exposures = exposures_parser.get_exposures(exposure_list)
exposures_dc = init_dbt_exposures()
for exposure in df_exposures.itertuples():
exposure_orderdict = OrderedDict(
name = exposure.exposure_name,
label = DoubleQuotedScalarString(exposure.exposure_label),
maturity = exposure.maturity,
url = exposure.url,
description = DoubleQuotedScalarString(exposure.description),
depends_on = [do.strip() for do in exposure.depends_on.split(',')]
)
exposure_orderdict["owner"] = OrderedDict(
name = exposure.owner_name,
email = exposure.owner_email
)
exposures_dc["exposures"].append(exposure_orderdict)
return exposures_dc
generate_loaded_tables_specs
def generate_loaded_tables_specs(
loaded_sources: dict,
init_dbt_sources_dict: dict,
all_tables: pandas.core.frame.DataFrame,
all_columns: pandas.core.frame.DataFrame
)
View Source
def generate_loaded_tables_specs(
loaded_sources:dict,
init_dbt_sources_dict:dict,
all_tables:pd.DataFrame,
all_columns:pd.DataFrame
):
for table, path in loaded_sources.items():
table_description = all_tables.query(f"table_name=='{table}'")["description"].iloc[0]
dc_table = {}
dc_table["name"] = table
dc_table["description"] = DoubleQuotedScalarString(table_description)
dc_table["external"] = {}
dc_table["external"]["location"] = DoubleQuotedScalarString(f"{path}*")
dc_table["external"]["options"] = {}
dc_table["external"]["options"]["format"] = "parquet"
dc_table["external"]["options"]["hive_partition_uri_prefix"] = DoubleQuotedScalarString(path)
# dc_table["external"]["partitions"] = [{"name":"year","data_type":"integer"},
# {"name":"month","data_type":"integer"}]
df_table_columns = all_columns.query(f"table_name=='{table}'")
dc_table["columns"] = []
for col in df_table_columns.itertuples():
dc_table["columns"].append(
{
"name":col.column_name,
"data_type":col.data_type,
"description":DoubleQuotedScalarString(col.description)
}
)
init_dbt_sources_dict["sources"][0]["tables"].append(dc_table)
return init_dbt_sources_dict
generate_metrics
def generate_metrics(
metric_list: list
)
View Source
def generate_metrics(metric_list:list):
df_metrics = metrics_parser.get_metrics(metric_list)
metrics_dc = init_dbt_metrics()
for metric in df_metrics.itertuples():
metric_orderdict = OrderedDict(
name = metric.metric_name,
label = metric.label,
model = metric.model,
description = DoubleQuotedScalarString(metric.description),
calculation_method = metric.calculation_method,
expression = metric.expression,
timestamp = metric.timestamp,
time_grains = [tg.strip() for tg in metric.time_grains.split(',')],
dimensions = [d.strip() for d in metric.dimensions.split(',')],
filters = process_filters(metric.filters),
window = process_window(metric.window)
)
metric_orderdict["config"] = OrderedDict(
treat_null_values_as_zero = metric.treat_null_values_as_zero,
enabled = metric.enabled
)
metrics_dc["metrics"].append(metric_orderdict)
return metrics_dc
generate_source_yaml_asdict
def generate_source_yaml_asdict(
source_name: str,
datalake_bucket: str = None
)
View Source
def generate_source_yaml_asdict(
source_name:str,
datalake_bucket:str = None
):
dc_source_tables = datalake.get_source(
source_name=source_name,
datalake_bucket=datalake_bucket
)
all_sources_df, spreadsheet = sheet_parser.ddm_sheet_to_df(
sheet_name=ddm_settings.DDM_SHEET_NAMES.SOURCES
)
source_description = all_sources_df.query(f"source_name=='{source_name}'")["description"].iloc[0] or None
all_tables_df, spreadsheet = sheet_parser.ddm_sheet_to_df(
sheet_name=ddm_settings.DDM_SHEET_NAMES.SOURCE_TABLES
)
all_columns_df, _ = sheet_parser.ddm_sheet_to_df(
sheet_name=ddm_settings.DDM_SHEET_NAMES.SOURCE_COLUMNS,
worksheet=spreadsheet
)
init_dbt_sources_dict = init_dbt_sources(
database=settings.DATALAKE_PROJECT,
source_name=source_name,
source_description=source_description
)
dbt_sources_dict = generate_loaded_tables_specs(
loaded_sources=dc_source_tables,
init_dbt_sources_dict=init_dbt_sources_dict,
all_tables=all_tables_df,
all_columns=all_columns_df
)
return dbt_sources_dict
generate_staging_model_sql
def generate_staging_model_sql(
source_name: str,
table: str
)
View Source
def generate_staging_model_sql(source_name:str, table:str):
dc_columns = sources_parser.get_all_columns_of_tables(
tables=[table]
)
source_sql = RAW_SOURCE_SQL.format(source_name=source_name, table_name=table)
columns_cast = ""
for x, col in enumerate(dc_columns[table]):
endline_coma = COL_CAST_FIRST_LINE if x == 0 else COL_CAST_INTERLINES
columns_cast += endline_coma + f"cast({col['column_name']} as {col['data_type']}) as {col['column_functional_name']}"
prepared_source_sql = RAW_PREPARED_SOURCE_SQL.format(columns_cast=columns_cast)
staging_sql = source_sql + prepared_source_sql
return staging_sql
generate_staging_model_yaml
def generate_staging_model_yaml(
source_name: str,
tables: list
) -> dict
View Source
def generate_staging_model_yaml(source_name:str, tables:list) -> dict:
dc_columns = sources_parser.get_all_columns_of_tables(
tables=tables
)
all_source_tables_df, spreadsheet = sheet_parser.ddm_sheet_to_df(
sheet_name=ddm_settings.DDM_SHEET_NAMES.SOURCE_TABLES
)
staging_dc = init_dbt_staging()
for table, columns in dc_columns.items():
table_description = all_source_tables_df.query(f"table_name=='{table}'")["description"].iloc[0] or None
dc_table = OrderedDict()
dc_table["name"] = table
dc_table["description"] = DoubleQuotedScalarString(table_description)
dc_table["columns"] = []
for col in columns:
dc_col = OrderedDict()
dc_col["name"] = col["column_functional_name"]
dc_col["description"] = DoubleQuotedScalarString(col["description"])
dc_table["columns"].append(dc_col)
staging_dc["models"].append(dc_table)
return staging_dc
init_dbt_exposures
def init_dbt_exposures(
version: int = 2
)
View Source
def init_dbt_exposures(version: int = 2):
return OrderedDict(
version = version,
exposures = []
)
init_dbt_metrics
def init_dbt_metrics(
version: int = 2
)
View Source
def init_dbt_metrics(version: int = 2):
return OrderedDict(
version = version,
metrics = []
)
init_dbt_sources
def init_dbt_sources(
database: str,
source_name: str,
version: int = 2,
source_description: str = None
)
View Source
def init_dbt_sources(
database:str,
source_name:str,
version: int = 2,
source_description:str = None
):
# dc_dbt_sources = OrderedDict()
dc_dbt_sources = {}
dc_dbt_sources["version"]=version
dc_dbt_sources["sources"]=[]
# for source in getattr(sources_dataframe, SOURCE_DATASET_COL).unique():
# dc_source = OrderedDict()
dc_source = {}
dc_source["name"] = source_name
dc_source["description"] = DoubleQuotedScalarString(source_description)
dc_source["database"] = database
dc_source["loader"] = "gcloud storage"
dc_source["tables"] = []
dc_dbt_sources["sources"].append(dict(dc_source))
return dc_dbt_sources
init_dbt_staging
def init_dbt_staging(
version: int = 2
)
View Source
def init_dbt_staging(version: int = 2):
dc = OrderedDict()
# return {"models":[]}
# return {"version":2, "models":[]}
dc["version"] = version
dc["models"] = []
return dc
object_to_yaml_str
def object_to_yaml_str(
obj: Union[dict, collections.OrderedDict],
options=None
) -> str
View Source
def object_to_yaml_str(obj:Union[dict, OrderedDict], options=None) -> str:
yaml = YAML()
yaml.Representer.add_representer(OrderedDict, yaml.Representer.represent_dict)
if options == None: options = {}
string_stream = StringIO()
yaml.dump(obj, string_stream, **options)
output_str = string_stream.getvalue()
string_stream.close()
return output_str
process_filters
def process_filters(
metric_filters
)
View Source
def process_filters(metric_filters):
ls_filters = []
for filt in metric_filters.split(',\n'):
re_matchs = re.match(REGEX_METRICS_FIlTER, filt.strip(), flags=re.IGNORECASE)
matchs = re_matchs.groupdict()
ls_filters.append(
OrderedDict(
field = matchs["field"],
operator = matchs["operator"],
value = matchs["value"]
)
)
return ls_filters
process_window
def process_window(
raw_window
)
View Source
def process_window(raw_window):
re_matchs = re.match(REGEX_METRICS_WINDOW, raw_window, flags=re.IGNORECASE)
matchs = re_matchs.groupdict()
return OrderedDict(
count = matchs["count"],
period = matchs["period"]
)
yaml_to_stdout
def yaml_to_stdout(
dc: dict
)
View Source
def yaml_to_stdout(dc:dict):
yaml = YAML()
yaml.Representer.add_representer(OrderedDict, yaml.Representer.represent_dict)
yaml.dump(dc, sys.stdout)