Module utils.brocolib_utils.catalog_gen.dbt_catalog
View Source
import json
import os
import re
from google.cloud import storage
import shlex
import subprocess
import requests as rq
from brocolib_utils import settings
DBT_DOCS_BUCKET = os.environ.get('DBT_DOCS_BUCKET')
GCP_PROJECT = os.environ.get('FRONT_PROJECT_ID')
DBT_DOCS_READ_GROUP = os.environ.get('DBT_DOCS_READ_GROUP')
DBT_PROJECT_DIR = os.environ.get('DBT_PATH')
MAPPING_DICT = {
"num_rows": {
"id": "num_rows",
"label": "# Lignes",
"description": "Nombre approximatif de lignes dans la table"
},
"num_bytes": {
"id": "num_bytes",
"label": "Taille",
"description": "Taille de la table (en bytes)"
}
}
def translate_catalog(catalog_data):
for element_type in ['nodes', 'sources']: # navigate into catalog
for node in catalog_data[element_type]:
if catalog_data[element_type][node]["stats"].get('num_rows'):
catalog_data[element_type][node]["stats"]["num_rows"]["label"] = MAPPING_DICT["num_rows"]["label"]
catalog_data[element_type][node]["stats"]["num_rows"]["description"] = MAPPING_DICT["num_rows"]["description"]
if catalog_data[element_type][node]["stats"].get('num_bytes'):
catalog_data[element_type][node]["stats"]["num_bytes"]["label"] = MAPPING_DICT["num_bytes"]["label"]
catalog_data[element_type][node]["stats"]["num_bytes"]["description"] = MAPPING_DICT["num_bytes"]["description"]
return catalog_data
def retrieve_data_catalog_index():
response = rq.get(settings.DATA_CATALOG_RELEASE_INDEX_URL)
return response.content.decode()
def get_dbt_populated_index(target_folder):
print('Populating index.html ...')
# base_index_path = os.path.join(os.path.dirname(__file__), 'base_index.html')
manifest_path = os.path.join(target_folder, 'manifest.json')
catalog_path = os.path.join(target_folder, 'catalog.json')
search_str = 'o=[i("manifest","manifest.json"+t),i("catalog","catalog.json"+t)]'
# with open(base_index_path, 'r') as f:
# content_index = f.read()
content_index = retrieve_data_catalog_index()
with open(manifest_path, 'r') as f:
json_manifest = json.loads(f.read())
IGNORE_PROJECTS = [
'dbt', 'dbt_bigquery', 'dbt_external_tables', 'dbt_utils',
'codegen'
]
for element_type in ['nodes', 'sources', 'macros', 'parent_map', 'child_map']: # navigate into manifest
# We transform to list to not change dict size during iteration, we use default value {} to handle KeyError
for key in list(json_manifest.get(element_type, {}).keys()):
for ignore_project in IGNORE_PROJECTS:
if re.match(fr'^.*\.{ignore_project}\.', key): # match with string that start with '*.<ignore_project>.'
del json_manifest[element_type][key] # delete element
with open(catalog_path, 'r') as f:
json_catalog = json.loads(f.read())
json_catalog = translate_catalog(json_catalog)
# Write manifest & catalog jsons in index.html
new_str = "o=[{label: 'manifest', data: "+json.dumps(json_manifest)+"},{label: 'catalog', data: "+json.dumps(json_catalog)+"}]"
new_content = content_index.replace(search_str, new_str)
# Select "Database" tab by default & Hide "Project" tab
# new_content = new_content.replace('{e.nav_selected="project"}', '{e.nav_selected="database"}')
# new_content = new_content.replace('<div class="switch ">', '<div class="switch " hidden>')
print('Successfully populated index.html')
return new_content
def upload_populated_index(
content,
file_name='index.html'
):
print('Loading index.html to GCS ...')
storage_client = storage.Client(project=GCP_PROJECT)
bucket = storage_client.get_bucket(DBT_DOCS_BUCKET)
blob = bucket.blob(file_name)
blob.upload_from_string(content, content_type='text/html')
print('Successfully loaded index.html to GCS')
# Manage ACL
acl = blob.acl
acl.reload()
acl.group(DBT_DOCS_READ_GROUP).grant_read()
acl.save()
blob.acl.save(acl=acl)
print('Added ACL to index.html')
def run_subprocess(ls_commands, working_dir):
"""Run command provided as arg in path provided as arg
Args:
ls_commands (list): list of string representing the bash command to run
working_dir (str): path when you want to change directory to before execution
logger (logging.logger): (optional) for goblet `app.log`
"""
out = ""
err = ""
try:
process = subprocess.Popen(
ls_commands,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
cwd=working_dir,
env=os.environ.copy(),
encoding='utf-8'
)
out, err = process.communicate()
if process.returncode != 0:
msg = f"{out}\n{err} failed"
print(msg)
return msg, False
except Exception as e:
raise e
msg = out
print(msg)
return msg, True
def generate_dbt_docs():
"""
Run `dbt docs generate`
"""
ls_commands = [
"dbt", "docs", "generate"
]
print(f'Starting dbt docs generate ...')
_, dbt_run_ok = run_subprocess(ls_commands, DBT_PROJECT_DIR)
if dbt_run_ok:
print(f'Successfully run dbt docs generate')
else:
print(f'Failed run dbt docs generate')
def run_dbt_debug():
"""
Run `dbt debug `
"""
ls_commands = [
"dbt", "debug"
]
print(f'Starting dbt debug ...')
_, dbt_run_ok = run_subprocess(ls_commands, DBT_PROJECT_DIR)
if dbt_run_ok:
print(f'Successfully run dbt debug')
else:
print(f'Failed while trying to run dbt debug')
def run_dbt_deps():
"""
Run `dbt debug `
"""
ls_commands = [
"dbt", "deps"
]
print(f'Starting dbt deps ...')
_, dbt_run_ok = run_subprocess(ls_commands, DBT_PROJECT_DIR)
if dbt_run_ok:
print(f'Successfully run dbt deps')
else:
print(f'Failed while trying to run dbt deps')
Variables
DBT_DOCS_BUCKET
DBT_DOCS_READ_GROUP
DBT_PROJECT_DIR
GCP_PROJECT
MAPPING_DICT
Functions
generate_dbt_docs
def generate_dbt_docs(
)
Run dbt docs generate
View Source
def generate_dbt_docs():
"""
Run `dbt docs generate`
"""
ls_commands = [
"dbt", "docs", "generate"
]
print(f'Starting dbt docs generate ...')
_, dbt_run_ok = run_subprocess(ls_commands, DBT_PROJECT_DIR)
if dbt_run_ok:
print(f'Successfully run dbt docs generate')
else:
print(f'Failed run dbt docs generate')
get_dbt_populated_index
def get_dbt_populated_index(
target_folder
)
View Source
def get_dbt_populated_index(target_folder):
print('Populating index.html ...')
# base_index_path = os.path.join(os.path.dirname(__file__), 'base_index.html')
manifest_path = os.path.join(target_folder, 'manifest.json')
catalog_path = os.path.join(target_folder, 'catalog.json')
search_str = 'o=[i("manifest","manifest.json"+t),i("catalog","catalog.json"+t)]'
# with open(base_index_path, 'r') as f:
# content_index = f.read()
content_index = retrieve_data_catalog_index()
with open(manifest_path, 'r') as f:
json_manifest = json.loads(f.read())
IGNORE_PROJECTS = [
'dbt', 'dbt_bigquery', 'dbt_external_tables', 'dbt_utils',
'codegen'
]
for element_type in ['nodes', 'sources', 'macros', 'parent_map', 'child_map']: # navigate into manifest
# We transform to list to not change dict size during iteration, we use default value {} to handle KeyError
for key in list(json_manifest.get(element_type, {}).keys()):
for ignore_project in IGNORE_PROJECTS:
if re.match(fr'^.*\.{ignore_project}\.', key): # match with string that start with '*.<ignore_project>.'
del json_manifest[element_type][key] # delete element
with open(catalog_path, 'r') as f:
json_catalog = json.loads(f.read())
json_catalog = translate_catalog(json_catalog)
# Write manifest & catalog jsons in index.html
new_str = "o=[{label: 'manifest', data: "+json.dumps(json_manifest)+"},{label: 'catalog', data: "+json.dumps(json_catalog)+"}]"
new_content = content_index.replace(search_str, new_str)
# Select "Database" tab by default & Hide "Project" tab
# new_content = new_content.replace('{e.nav_selected="project"}', '{e.nav_selected="database"}')
# new_content = new_content.replace('<div class="switch ">', '<div class="switch " hidden>')
print('Successfully populated index.html')
return new_content
retrieve_data_catalog_index
def retrieve_data_catalog_index(
)
View Source
def retrieve_data_catalog_index():
response = rq.get(settings.DATA_CATALOG_RELEASE_INDEX_URL)
return response.content.decode()
run_dbt_debug
def run_dbt_debug(
)
Run dbt debug
View Source
def run_dbt_debug():
"""
Run `dbt debug `
"""
ls_commands = [
"dbt", "debug"
]
print(f'Starting dbt debug ...')
_, dbt_run_ok = run_subprocess(ls_commands, DBT_PROJECT_DIR)
if dbt_run_ok:
print(f'Successfully run dbt debug')
else:
print(f'Failed while trying to run dbt debug')
run_dbt_deps
def run_dbt_deps(
)
Run dbt debug
View Source
def run_dbt_deps():
"""
Run `dbt debug `
"""
ls_commands = [
"dbt", "deps"
]
print(f'Starting dbt deps ...')
_, dbt_run_ok = run_subprocess(ls_commands, DBT_PROJECT_DIR)
if dbt_run_ok:
print(f'Successfully run dbt deps')
else:
print(f'Failed while trying to run dbt deps')
run_subprocess
def run_subprocess(
ls_commands,
working_dir
)
Run command provided as arg in path provided as arg
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ls_commands | list | list of string representing the bash command to run | None |
working_dir | str | path when you want to change directory to before execution | None |
logger | logging.logger | (optional) for goblet app.log |
None |
View Source
def run_subprocess(ls_commands, working_dir):
"""Run command provided as arg in path provided as arg
Args:
ls_commands (list): list of string representing the bash command to run
working_dir (str): path when you want to change directory to before execution
logger (logging.logger): (optional) for goblet `app.log`
"""
out = ""
err = ""
try:
process = subprocess.Popen(
ls_commands,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
cwd=working_dir,
env=os.environ.copy(),
encoding='utf-8'
)
out, err = process.communicate()
if process.returncode != 0:
msg = f"{out}\n{err} failed"
print(msg)
return msg, False
except Exception as e:
raise e
msg = out
print(msg)
return msg, True
translate_catalog
def translate_catalog(
catalog_data
)
View Source
def translate_catalog(catalog_data):
for element_type in ['nodes', 'sources']: # navigate into catalog
for node in catalog_data[element_type]:
if catalog_data[element_type][node]["stats"].get('num_rows'):
catalog_data[element_type][node]["stats"]["num_rows"]["label"] = MAPPING_DICT["num_rows"]["label"]
catalog_data[element_type][node]["stats"]["num_rows"]["description"] = MAPPING_DICT["num_rows"]["description"]
if catalog_data[element_type][node]["stats"].get('num_bytes'):
catalog_data[element_type][node]["stats"]["num_bytes"]["label"] = MAPPING_DICT["num_bytes"]["label"]
catalog_data[element_type][node]["stats"]["num_bytes"]["description"] = MAPPING_DICT["num_bytes"]["description"]
return catalog_data
upload_populated_index
def upload_populated_index(
content,
file_name='index.html'
)
View Source
def upload_populated_index(
content,
file_name='index.html'
):
print('Loading index.html to GCS ...')
storage_client = storage.Client(project=GCP_PROJECT)
bucket = storage_client.get_bucket(DBT_DOCS_BUCKET)
blob = bucket.blob(file_name)
blob.upload_from_string(content, content_type='text/html')
print('Successfully loaded index.html to GCS')
# Manage ACL
acl = blob.acl
acl.reload()
acl.group(DBT_DOCS_READ_GROUP).grant_read()
acl.save()
blob.acl.save(acl=acl)
print('Added ACL to index.html')