Skip to content

Module transform.brocolib_transform.dbt_utils

View Source
import os

import shlex

import subprocess

def run_subprocess(ls_commands: list, working_dir: str, logger=None):

    """Run command provided as arg in path provided as arg

    Args:

        ls_commands (list): list of string representing the bash command to run

        working_dir (str): path when you want to change directory to before execution

        logger (logging.logger): (optional) for goblet `app.log`

    """

    out = ""

    err = ""

    try:

        process = subprocess.Popen(

            ls_commands,

            stdout=subprocess.PIPE,

            stderr=subprocess.PIPE,

            universal_newlines=True,

            cwd=working_dir,

            env=os.environ.copy(),

            encoding='utf-8'

        )

        out, err = process.communicate()

        if process.returncode != 0:

            msg = f"{out}\n{err} failed"

            if logger:

                logger.error(msg)

            return msg, False

    except Exception as e:

        if logger:

            logger.error(str(e))

        return str(e), False

    msg = out

    if logger:

        logger.info(msg)

    return msg, True

def run_dbt_model(sources: list, project_dir: str, logger=None):

    """Run `dbt run` and select child of sources provided as arg

    using `"--select source:stg.SOURCE+`

    See [Using Unions Set Operators for Node Selection](https://docs.getdbt.com/reference/node-selection/set-operators#unions)

    Args:

        sources (list): all sources that are parent of the branches that will be run

        project_dir (str): path to the dbt project

        logger (logging.logger): (optional) for goblet `app.log`

    """

    ls_commands = [

        "dbt", "run",

        "--select"

    ]

    ls_commands += [f"source:stg.{source}+" for source in sources]

    logger.info(f'START dbt run for {str(sources)}')

    _, dbt_run_ok = run_subprocess(ls_commands, project_dir, logger)

    if dbt_run_ok:

        logger.info(f'END dbt run successful for {str(sources)}')

    else:

        logger.error(f'END dbt run failed for {str(sources)}')

def stage_table(sources: list, project_dir: str, logger=None):

    """stage tables from datalake to staging layer in dwh

    Args:

        sources (list): list of tables to stage

        project_dir (str): path to the dbt project

        logger (logging.logger): (optional) for goblet `app.log`

    """

    staging_successful = True

    for source in sources:

        ls_commands = [

            "dbt",

            "run-operation",

            "stage_external_sources",

        ]

        ls_commands += shlex.split(f"--args \"select: stg.{source}\"")

        ls_commands += shlex.split(f"--vars \"ext_full_refresh: true\"")

        logger.info(f'START staging {source}')

        _, sources_are_staged = run_subprocess(ls_commands, project_dir, logger)

        if sources_are_staged:

            logger.info(f'END staging successful for {source}')

        else:

            logger.error(f'END staging successful for {source}')

            staging_successful = False

    if staging_successful:

        return True

    else:

        return False

Functions

run_dbt_model

def run_dbt_model(
    sources: list,
    project_dir: str,
    logger=None
)

Run dbt run and select child of sources provided as arg

using "--select source:stg.SOURCE+ See Using Unions Set Operators for Node Selection

Parameters:

Name Type Description Default
sources list all sources that are parent of the branches that will be run None
project_dir str path to the dbt project None
logger logging.logger (optional) for goblet app.log None
View Source
def run_dbt_model(sources: list, project_dir: str, logger=None):

    """Run `dbt run` and select child of sources provided as arg

    using `"--select source:stg.SOURCE+`

    See [Using Unions Set Operators for Node Selection](https://docs.getdbt.com/reference/node-selection/set-operators#unions)

    Args:

        sources (list): all sources that are parent of the branches that will be run

        project_dir (str): path to the dbt project

        logger (logging.logger): (optional) for goblet `app.log`

    """

    ls_commands = [

        "dbt", "run",

        "--select"

    ]

    ls_commands += [f"source:stg.{source}+" for source in sources]

    logger.info(f'START dbt run for {str(sources)}')

    _, dbt_run_ok = run_subprocess(ls_commands, project_dir, logger)

    if dbt_run_ok:

        logger.info(f'END dbt run successful for {str(sources)}')

    else:

        logger.error(f'END dbt run failed for {str(sources)}')

run_subprocess

def run_subprocess(
    ls_commands: list,
    working_dir: str,
    logger=None
)

Run command provided as arg in path provided as arg

Parameters:

Name Type Description Default
ls_commands list list of string representing the bash command to run None
working_dir str path when you want to change directory to before execution None
logger logging.logger (optional) for goblet app.log None
View Source
def run_subprocess(ls_commands: list, working_dir: str, logger=None):

    """Run command provided as arg in path provided as arg

    Args:

        ls_commands (list): list of string representing the bash command to run

        working_dir (str): path when you want to change directory to before execution

        logger (logging.logger): (optional) for goblet `app.log`

    """

    out = ""

    err = ""

    try:

        process = subprocess.Popen(

            ls_commands,

            stdout=subprocess.PIPE,

            stderr=subprocess.PIPE,

            universal_newlines=True,

            cwd=working_dir,

            env=os.environ.copy(),

            encoding='utf-8'

        )

        out, err = process.communicate()

        if process.returncode != 0:

            msg = f"{out}\n{err} failed"

            if logger:

                logger.error(msg)

            return msg, False

    except Exception as e:

        if logger:

            logger.error(str(e))

        return str(e), False

    msg = out

    if logger:

        logger.info(msg)

    return msg, True

stage_table

def stage_table(
    sources: list,
    project_dir: str,
    logger=None
)

stage tables from datalake to staging layer in dwh

Parameters:

Name Type Description Default
sources list list of tables to stage None
project_dir str path to the dbt project None
logger logging.logger (optional) for goblet app.log None
View Source
def stage_table(sources: list, project_dir: str, logger=None):

    """stage tables from datalake to staging layer in dwh

    Args:

        sources (list): list of tables to stage

        project_dir (str): path to the dbt project

        logger (logging.logger): (optional) for goblet `app.log`

    """

    staging_successful = True

    for source in sources:

        ls_commands = [

            "dbt",

            "run-operation",

            "stage_external_sources",

        ]

        ls_commands += shlex.split(f"--args \"select: stg.{source}\"")

        ls_commands += shlex.split(f"--vars \"ext_full_refresh: true\"")

        logger.info(f'START staging {source}')

        _, sources_are_staged = run_subprocess(ls_commands, project_dir, logger)

        if sources_are_staged:

            logger.info(f'END staging successful for {source}')

        else:

            logger.error(f'END staging successful for {source}')

            staging_successful = False

    if staging_successful:

        return True

    else:

        return False