Module extract_load.brocolib_extract_load.datalake
View Source
import pandas as pd
from .pubsub import publish_sources
from datetime import datetime
def dataframe_to_bucket(
dataframe: pd.DataFrame,
bucket_name: str,
blob_name: str,
file_type: str,
logger=None
) -> str:
"""Loads a DataFrame to a GCS bucket
Args:
dataframe (pd.DataFrame): DataFrame you want to upload to GCS
bucket_name (str): Name of the destination bucket
blob_name (str): Name of the destination file
file_type (str): File format
logger (optional): Logging interface. Defaults to None.
Parameters example:
blob_name= 'folder/subfolder/filename.csv'
bucket_name = 'PROJECT_ID-landing'
Raises:
NotImplementedError: If the file format is not implemented
Returns:
str: GCS path where the blob is uploaded
"""
gcs_path_temp = f"gs://{bucket_name}/{blob_name}.{{file_extension}}"
if file_type.lower() == 'csv':
gcs_path = gcs_path_temp.format(file_extension="csv")
dataframe.to_csv(gcs_path,index=False)
elif file_type.lower() == 'parquet':
gcs_path = gcs_path_temp.format(file_extension="parquet")
dataframe.to_parquet(gcs_path,index=False)
elif file_type.lower() == 'json':
gcs_path = gcs_path_temp.format(file_extension="json")
dataframe.to_json(gcs_path,index=False)
else:
raise NotImplementedError(f"{file_type} is not implemented.")
if logger:
logger.info(f'Load Destination : {gcs_path}')
return gcs_path
def bucket_to_dataframe(bucket_name: str, blob_name: str, file_type: str) -> pd.DataFrame:
'''
Loads a file located in a GCS bucket into a DataFrame
Parameters:
bucket_name (str): Name of the source bucket
blob_name (str): Name of the blob in the source bucket
file_type (str): Type of the file in the bucket
Returns:
pandas.DataFrame: fetched DataFrame
'''
file_type = file_type.lower()
url = f'gs://{bucket_name}/{blob_name}'
print(f'using {url}')
if file_type == 'csv':
return pd.read_csv(url)
class ExternalTable:
def __init__(
self,
bucket_name: str,
partition_keys: dict,
bucket_file: str,
bucket_table_directory: str,
bucket_directory: str,
dbt_topic: str,
gcp_project: str,
logger=None
):
"""Instanciate a ExternalTable object
Args:
bucket_name (str): Name of the the GCS bucket where the data is located
partition_keys (dict): Pairs of partition keys and values
bucket_file (str): Name of the file in GCS bucket
bucket_table_directory (str): Name of the directory after which the ExternalTable is named
bucket_directory (str): Name of the subdirectories under the bucket's root level
dbt_topic (str): Name of the Pub/Sub dbt topic
gcp_project (str): Namae of the GCP project
logger (_type_, optional): Logging interface. Defaults to None.
"""
self.bucket_name = bucket_name
self.partition_keys = partition_keys
self.subfolders = bucket_directory
self.source_name = bucket_table_directory
self.bucket_table_directory = bucket_table_directory
self.file_name = bucket_file
self.dbt_topic = dbt_topic
self.gcp_project = gcp_project
self.logger = logger if logger else None
self.blob_name = self.format_filename()
self.gcs_path = None
def add_partition_keys(self, path_prefix: str) -> str:
"""Add partition_keys in Hive Format to a GCS path prefix
Args:
path_prefix (str) : GCS path prefix
Returns:
str: GCS Path prefix appended by partition keys
"""
now = datetime.now()
for key, value in self.partition_keys.items():
if key=="year":
value = now.year
elif key=="month":
value = now.month
path_prefix += f"/{key}={value}"
return path_prefix
def format_filename(self) -> str:
now = datetime.now()
path_prefix=self.add_partition_keys(f"{self.subfolders}/{self.bucket_table_directory}")
return f"{path_prefix}/{self.file_name}_{str(now.day)}"
def to_datalake(self, df, logger=None):
self.gcs_path = dataframe_to_bucket(
dataframe=df,
bucket_name=self.bucket_name,
blob_name=self.blob_name,
file_type="parquet",
logger=logger
)
def publish_message(self):
publish_sources(
sources=[self.source_name],
dbt_topic=self.dbt_topic,
gcp_project=self.gcp_project,
logger=self.logger
)
Functions
bucket_to_dataframe
def bucket_to_dataframe(
bucket_name: str,
blob_name: str,
file_type: str
) -> pandas.core.frame.DataFrame
Loads a file located in a GCS bucket into a DataFrame
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket_name | str | Name of the source bucket | None |
blob_name | str | Name of the blob in the source bucket | None |
file_type | str | Type of the file in the bucket | None |
Returns:
Type | Description |
---|---|
pandas.DataFrame | fetched DataFrame |
View Source
def bucket_to_dataframe(bucket_name: str, blob_name: str, file_type: str) -> pd.DataFrame:
'''
Loads a file located in a GCS bucket into a DataFrame
Parameters:
bucket_name (str): Name of the source bucket
blob_name (str): Name of the blob in the source bucket
file_type (str): Type of the file in the bucket
Returns:
pandas.DataFrame: fetched DataFrame
'''
file_type = file_type.lower()
url = f'gs://{bucket_name}/{blob_name}'
print(f'using {url}')
if file_type == 'csv':
return pd.read_csv(url)
dataframe_to_bucket
def dataframe_to_bucket(
dataframe: pandas.core.frame.DataFrame,
bucket_name: str,
blob_name: str,
file_type: str,
logger=None
) -> str
Loads a DataFrame to a GCS bucket
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataframe | pd.DataFrame | DataFrame you want to upload to GCS | None |
bucket_name | str | Name of the destination bucket | None |
blob_name | str | Name of the destination file | None |
file_type | str | File format | None |
logger | optional | Logging interface. Defaults to None. | None |
Returns:
Type | Description |
---|---|
str | GCS path where the blob is uploaded |
Raises:
Type | Description |
---|---|
NotImplementedError | If the file format is not implemented |
View Source
def dataframe_to_bucket(
dataframe: pd.DataFrame,
bucket_name: str,
blob_name: str,
file_type: str,
logger=None
) -> str:
"""Loads a DataFrame to a GCS bucket
Args:
dataframe (pd.DataFrame): DataFrame you want to upload to GCS
bucket_name (str): Name of the destination bucket
blob_name (str): Name of the destination file
file_type (str): File format
logger (optional): Logging interface. Defaults to None.
Parameters example:
blob_name= 'folder/subfolder/filename.csv'
bucket_name = 'PROJECT_ID-landing'
Raises:
NotImplementedError: If the file format is not implemented
Returns:
str: GCS path where the blob is uploaded
"""
gcs_path_temp = f"gs://{bucket_name}/{blob_name}.{{file_extension}}"
if file_type.lower() == 'csv':
gcs_path = gcs_path_temp.format(file_extension="csv")
dataframe.to_csv(gcs_path,index=False)
elif file_type.lower() == 'parquet':
gcs_path = gcs_path_temp.format(file_extension="parquet")
dataframe.to_parquet(gcs_path,index=False)
elif file_type.lower() == 'json':
gcs_path = gcs_path_temp.format(file_extension="json")
dataframe.to_json(gcs_path,index=False)
else:
raise NotImplementedError(f"{file_type} is not implemented.")
if logger:
logger.info(f'Load Destination : {gcs_path}')
return gcs_path
Classes
ExternalTable
class ExternalTable(
bucket_name: str,
partition_keys: dict,
bucket_file: str,
bucket_table_directory: str,
bucket_directory: str,
dbt_topic: str,
gcp_project: str,
logger=None
)
View Source
class ExternalTable:
def __init__(
self,
bucket_name: str,
partition_keys: dict,
bucket_file: str,
bucket_table_directory: str,
bucket_directory: str,
dbt_topic: str,
gcp_project: str,
logger=None
):
"""Instanciate a ExternalTable object
Args:
bucket_name (str): Name of the the GCS bucket where the data is located
partition_keys (dict): Pairs of partition keys and values
bucket_file (str): Name of the file in GCS bucket
bucket_table_directory (str): Name of the directory after which the ExternalTable is named
bucket_directory (str): Name of the subdirectories under the bucket's root level
dbt_topic (str): Name of the Pub/Sub dbt topic
gcp_project (str): Namae of the GCP project
logger (_type_, optional): Logging interface. Defaults to None.
"""
self.bucket_name = bucket_name
self.partition_keys = partition_keys
self.subfolders = bucket_directory
self.source_name = bucket_table_directory
self.bucket_table_directory = bucket_table_directory
self.file_name = bucket_file
self.dbt_topic = dbt_topic
self.gcp_project = gcp_project
self.logger = logger if logger else None
self.blob_name = self.format_filename()
self.gcs_path = None
def add_partition_keys(self, path_prefix: str) -> str:
"""Add partition_keys in Hive Format to a GCS path prefix
Args:
path_prefix (str) : GCS path prefix
Returns:
str: GCS Path prefix appended by partition keys
"""
now = datetime.now()
for key, value in self.partition_keys.items():
if key=="year":
value = now.year
elif key=="month":
value = now.month
path_prefix += f"/{key}={value}"
return path_prefix
def format_filename(self) -> str:
now = datetime.now()
path_prefix=self.add_partition_keys(f"{self.subfolders}/{self.bucket_table_directory}")
return f"{path_prefix}/{self.file_name}_{str(now.day)}"
def to_datalake(self, df, logger=None):
self.gcs_path = dataframe_to_bucket(
dataframe=df,
bucket_name=self.bucket_name,
blob_name=self.blob_name,
file_type="parquet",
logger=logger
)
def publish_message(self):
publish_sources(
sources=[self.source_name],
dbt_topic=self.dbt_topic,
gcp_project=self.gcp_project,
logger=self.logger
)
Methods
add_partition_keys
def add_partition_keys(
self,
path_prefix: str
) -> str
Add partition_keys in Hive Format to a GCS path prefix
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path_prefix | str | GCS path prefix | None |
Returns:
Type | Description |
---|---|
str | GCS Path prefix appended by partition keys |
View Source
def add_partition_keys(self, path_prefix: str) -> str:
"""Add partition_keys in Hive Format to a GCS path prefix
Args:
path_prefix (str) : GCS path prefix
Returns:
str: GCS Path prefix appended by partition keys
"""
now = datetime.now()
for key, value in self.partition_keys.items():
if key=="year":
value = now.year
elif key=="month":
value = now.month
path_prefix += f"/{key}={value}"
return path_prefix
format_filename
def format_filename(
self
) -> str
View Source
def format_filename(self) -> str:
now = datetime.now()
path_prefix=self.add_partition_keys(f"{self.subfolders}/{self.bucket_table_directory}")
return f"{path_prefix}/{self.file_name}_{str(now.day)}"
publish_message
def publish_message(
self
)
View Source
def publish_message(self):
publish_sources(
sources=[self.source_name],
dbt_topic=self.dbt_topic,
gcp_project=self.gcp_project,
logger=self.logger
)
to_datalake
def to_datalake(
self,
df,
logger=None
)
View Source
def to_datalake(self, df, logger=None):
self.gcs_path = dataframe_to_bucket(
dataframe=df,
bucket_name=self.bucket_name,
blob_name=self.blob_name,
file_type="parquet",
logger=logger
)