Skip to content

Module utils.brocolib_utils.fast_dbt.cli

View Source
# import os

# import json

# import argparse

# from ruamel.yaml import YAML

# from dbt_parser import update_exposures

# from ruamel.yaml import YAML

# from dbt_parser import update_exposures

# from brocolib_utils.settings import DBT_MODELS_PATH

# from brocolib_utils.fast_dbt.generator import (init_dbt_sources,

#     generate_loaded_tables_specs, dict_to_yaml)

# from brocolib_utils.datalake import get_sources

# from collections import OrderedDict

# DATALAKE_BUCKET = os.environ.get('DATALAKE_BUCKET')

# GCP_PROJECT = os.environ.get('BACK_PROJECT_ID')

# DEFAULT_GCS_PARTITIONNING_KEYS = json.loads(os.environ.get('DEFAULT_GCS_PARTITIONNING_KEYS'))

# def main():

#     # Create the parser

#     fast_dbt_parser = argparse.ArgumentParser(description='fast_dbt Python CLI ')

#     command_parser = fast_dbt_parser.add_subparsers(dest='command')

#     ## Exposures

#     exposure_parser = command_parser.add_parser('exposure', help='Manage exposures')

#     exposure_subcommand_parser = exposure_parser.add_subparsers(dest='exposure_subcommand')

#     update_permissions_parser = exposure_subcommand_parser.add_parser('update_permissions', help='Parse dbt exposure .yml file and update data studio permissions')

#     update_permissions_parser.add_argument(

#         'file-path',

#         dest='exposure_file_path',

#         help='path to the dbt exposures .yml file'

#     )

#     args = fast_dbt_parser.parse_args()

#     if args.command == "exposure":

#         if args.exposure_subcommand = "update_permissions"

#             yaml_parser = YAML()

#             with open(args.exposure_file_path, 'r') as f:

#                 data = yaml_parser.load(f)

#             update_exposures(data)

import sys

import typer

from collections import OrderedDict

from pprint import pprint

from typing import Optional

# from ruamel import yaml

from rich.prompt import Prompt

from rich import print as rich_print

from ruamel.yaml import YAML

from brocolib_utils.fast_dbt import new_generator

app = typer.Typer()

@app.callback()

def fast_dbt():

    """

    Domain Data Management Sheets Parser & Generator

    """

@app.command()

def generate_source_yaml(

    source_name: Optional[str] = typer.Argument(None, help="dbt source name"),

    datalake_bucket: Optional[str] = typer.Argument(None, help="datalake bucket name where the data is located")

):

    """

    Filling Source Tables & Source Columns worksheets in the DDM Google Sheets

    """

    source_name = source_name or Prompt.ask("Name of the source :file_cabinet: ")

    if not source_name:

        typer.echo('You must provide a source_name. Exiting.')

        raise typer.Exit(code=1)

    typer.echo(f"Generating YAML for source : {source_name} ..")

    dc_source = new_generator.generate_source_yaml_asdict(

        source_name=source_name,

        datalake_bucket=datalake_bucket

    )

    # new_generator.yaml_to_stdout(dc_source)

    str_sources = new_generator.object_to_yaml_str(dc_source)

    rich_print(str_sources)

@app.command()

def generate_staging_sql(

    source_name: Optional[str] = typer.Argument(None, help="dbt source name"),

    table_name: Optional[str] = typer.Argument(None, help="name of the table")

):

    """

    Generates SQL code for a staging model

    """

    source_name = source_name or Prompt.ask("Name of the source :file_cabinet: ")

    if not source_name:

        typer.echo('You must provide a source_name. Exiting.')

        raise typer.Exit(code=1)

    table_name = table_name or Prompt.ask("Name of the table :card_file_box: ")

    if not table_name:

        typer.echo('You must provide a table_name. Exiting.')

        raise typer.Exit(code=1)

    query = new_generator.generate_staging_model_sql(

        source_name=source_name,

        table=table_name

    )

    rich_print(query)

@app.command()

def generate_staging_yaml(

    source_name: Optional[str] = typer.Argument(None, help="dbt source name"),

    tables: str = typer.Argument(None, help="name of the table")

):

    """

    Generate YAML for staging models

    """

    source_name = source_name or Prompt.ask("Name of the source :file_cabinet: ")

    if not source_name:

        typer.echo('You must provide a source_name. Exiting.')

        raise typer.Exit(code=1)

    tables = tables or Prompt.ask("Coma-separated list of tables (table1,table2) :card_file_box: ")

    if not tables:

        typer.echo('You must provide tables. Exiting.')

        raise typer.Exit(code=1)

    tables = tables.split(',')

    dc_yaml = new_generator.generate_staging_model_yaml(

        source_name=source_name,

        tables=tables

    )

    # new_generator.yaml_to_stdout(dc_yaml)

    str_staging = new_generator.object_to_yaml_str(dc_yaml)

    rich_print(str_staging)

@app.command()

def generate_metrics_yaml(

    metrics: str = typer.Argument(None, help="Coma-separated list of metrics (metric_1,metric_2) ")

):

    """Generate YAML for metrics"""

    metrics = metrics or Prompt.ask("Coma-separated list of metrics (metric_1,metric_2) :straight_ruler: ")

    if not metrics:

        typer.echo('You must provide metrics. Exiting.')

        raise typer.Exit(code=1)

    metrics = metrics.split(',')

    dc_metrics = new_generator.generate_metrics(

        metric_list = metrics

    )

    # new_generator.yaml_to_stdout(dc_metrics)

    str_metrics = new_generator.object_to_yaml_str(dc_metrics)

    rich_print(str_metrics)

@app.command()

def generate_exposures_yaml(

    exposures: str = typer.Argument(None, help="Coma-separated list of exposures (exposure_1,exposure_2) ")

):

    """Generate YAML for exposures"""

    exposures = exposures or Prompt.ask("Coma-separated list of exposures (exposure_1,exposure_2) :chart_increasing: ")

    if not exposures:

        typer.echo('You must provide exposures. Exiting.')

        raise typer.Exit(code=1)

    exposures = exposures.split(',')

    dc_exposures = new_generator.generate_exposures(

        exposure_list = exposures

    )

    # new_generator.yaml_to_stdout(dc_exposures)

    str_exposures = new_generator.object_to_yaml_str(dc_exposures)

    rich_print(str_exposures)

Variables

app

Functions

fast_dbt

def fast_dbt(

)

Domain Data Management Sheets Parser & Generator

View Source
@app.callback()

def fast_dbt():

    """

    Domain Data Management Sheets Parser & Generator

    """

generate_exposures_yaml

def generate_exposures_yaml(
    exposures: str = <typer.models.ArgumentInfo object at 0x7f31158f6af0>
)

Generate YAML for exposures

View Source
@app.command()

def generate_exposures_yaml(

    exposures: str = typer.Argument(None, help="Coma-separated list of exposures (exposure_1,exposure_2) ")

):

    """Generate YAML for exposures"""

    exposures = exposures or Prompt.ask("Coma-separated list of exposures (exposure_1,exposure_2) :chart_increasing: ")

    if not exposures:

        typer.echo('You must provide exposures. Exiting.')

        raise typer.Exit(code=1)

    exposures = exposures.split(',')

    dc_exposures = new_generator.generate_exposures(

        exposure_list = exposures

    )

    # new_generator.yaml_to_stdout(dc_exposures)

    str_exposures = new_generator.object_to_yaml_str(dc_exposures)

    rich_print(str_exposures)

generate_metrics_yaml

def generate_metrics_yaml(
    metrics: str = <typer.models.ArgumentInfo object at 0x7f31158f6a90>
)

Generate YAML for metrics

View Source
@app.command()

def generate_metrics_yaml(

    metrics: str = typer.Argument(None, help="Coma-separated list of metrics (metric_1,metric_2) ")

):

    """Generate YAML for metrics"""

    metrics = metrics or Prompt.ask("Coma-separated list of metrics (metric_1,metric_2) :straight_ruler: ")

    if not metrics:

        typer.echo('You must provide metrics. Exiting.')

        raise typer.Exit(code=1)

    metrics = metrics.split(',')

    dc_metrics = new_generator.generate_metrics(

        metric_list = metrics

    )

    # new_generator.yaml_to_stdout(dc_metrics)

    str_metrics = new_generator.object_to_yaml_str(dc_metrics)

    rich_print(str_metrics)

generate_source_yaml

def generate_source_yaml(
    source_name: Union[str, NoneType] = <typer.models.ArgumentInfo object at 0x7f31158f6850>,
    datalake_bucket: Union[str, NoneType] = <typer.models.ArgumentInfo object at 0x7f31158f6880>
)

Filling Source Tables & Source Columns worksheets in the DDM Google Sheets

View Source
@app.command()

def generate_source_yaml(

    source_name: Optional[str] = typer.Argument(None, help="dbt source name"),

    datalake_bucket: Optional[str] = typer.Argument(None, help="datalake bucket name where the data is located")

):

    """

    Filling Source Tables & Source Columns worksheets in the DDM Google Sheets

    """

    source_name = source_name or Prompt.ask("Name of the source :file_cabinet: ")

    if not source_name:

        typer.echo('You must provide a source_name. Exiting.')

        raise typer.Exit(code=1)

    typer.echo(f"Generating YAML for source : {source_name} ..")

    dc_source = new_generator.generate_source_yaml_asdict(

        source_name=source_name,

        datalake_bucket=datalake_bucket

    )

    # new_generator.yaml_to_stdout(dc_source)

    str_sources = new_generator.object_to_yaml_str(dc_source)

    rich_print(str_sources)

generate_staging_sql

def generate_staging_sql(
    source_name: Union[str, NoneType] = <typer.models.ArgumentInfo object at 0x7f31158f68e0>,
    table_name: Union[str, NoneType] = <typer.models.ArgumentInfo object at 0x7f31158f6970>
)

Generates SQL code for a staging model

View Source
@app.command()

def generate_staging_sql(

    source_name: Optional[str] = typer.Argument(None, help="dbt source name"),

    table_name: Optional[str] = typer.Argument(None, help="name of the table")

):

    """

    Generates SQL code for a staging model

    """

    source_name = source_name or Prompt.ask("Name of the source :file_cabinet: ")

    if not source_name:

        typer.echo('You must provide a source_name. Exiting.')

        raise typer.Exit(code=1)

    table_name = table_name or Prompt.ask("Name of the table :card_file_box: ")

    if not table_name:

        typer.echo('You must provide a table_name. Exiting.')

        raise typer.Exit(code=1)

    query = new_generator.generate_staging_model_sql(

        source_name=source_name,

        table=table_name

    )

    rich_print(query)

generate_staging_yaml

def generate_staging_yaml(
    source_name: Union[str, NoneType] = <typer.models.ArgumentInfo object at 0x7f31158f69a0>,
    tables: str = <typer.models.ArgumentInfo object at 0x7f31158f6910>
)

Generate YAML for staging models

View Source
@app.command()

def generate_staging_yaml(

    source_name: Optional[str] = typer.Argument(None, help="dbt source name"),

    tables: str = typer.Argument(None, help="name of the table")

):

    """

    Generate YAML for staging models

    """

    source_name = source_name or Prompt.ask("Name of the source :file_cabinet: ")

    if not source_name:

        typer.echo('You must provide a source_name. Exiting.')

        raise typer.Exit(code=1)

    tables = tables or Prompt.ask("Coma-separated list of tables (table1,table2) :card_file_box: ")

    if not tables:

        typer.echo('You must provide tables. Exiting.')

        raise typer.Exit(code=1)

    tables = tables.split(',')

    dc_yaml = new_generator.generate_staging_model_yaml(

        source_name=source_name,

        tables=tables

    )

    # new_generator.yaml_to_stdout(dc_yaml)

    str_staging = new_generator.object_to_yaml_str(dc_yaml)

    rich_print(str_staging)