Module utils.tests.test_datalake
View Source
import pandas as pd
from .conftest import TEST_BUCKET_NAME, populate_bucket
from brocolib_utils.utils import datalake, gcs
from brocolib_utils import settings
from brocolib_utils.ddm import sources_parser
# def test_no_import_error():
# from brocolib_utils.datalake import datalake
# pass
# def test_create_bucket():
# bucket = datalake.create_bucket(BUCKET_NAME)
# assert bucket.name == BUCKET_NAME
# assert bucket.location == BUCKET_LOCATION
# assert bucket.storage_class == BUCKET_STORAGE_CLASS
# def test_delete_bucket():
# datalake.delete_bucket(BUCKET_NAME)
# assert BUCKET_NAME not in datalake.list_buckets()
def test_setup_ok(populate_bucket):
bucket = populate_bucket
storage_client = gcs.get_storage_client()
raw_blobs = storage_client.list_blobs(bucket.name, prefix="test_source/")
blobs = [blob.name for blob in raw_blobs]
assert blobs == [
"test_source/test_table/month=1/userdata1.parquet",
"test_source/test_table/month=2/userdata2.parquet",
"test_source/test_table/month=3/userdata3.parquet"
]
def test_get_sources(populate_bucket):
bucket = populate_bucket
test_source = datalake.get_source(
gcp_project=settings.DATALAKE_PROJECT,
datalake_bucket=bucket.name,
source_name="test_source"
)
assert test_source == {
'test_table': 'gs://brocolib_utils-test-bucket/test_source/test_table/'
}
def test_get_all_columns_for_tables(populate_bucket):
source_columns = sources_parser.get_source_columns(
source_name="test_source",
table_name="test_table",
datalake_bucket=TEST_BUCKET_NAME
)
assert source_columns == [
{"name": "registration_dttm", "type": "TIMESTAMP"},
{"name": "id", "type": "FLOAT"},
{"name": "first_name", "type": "STRING"},
{"name": "last_name", "type": "STRING"},
{"name": "email", "type": "STRING"},
{"name": "gender", "type": "STRING"},
{"name": "ip_address", "type": "STRING"},
{"name": "cc", "type": "STRING"},
{"name": "country", "type": "STRING"},
{"name": "birthdate", "type": "STRING"},
{"name": "salary", "type": "FLOAT"},
{"name": "title", "type": "STRING"},
{"name": "comments", "type": "STRING"},
{"name": "month", "type": "STRING"},
]
Variables
TEST_BUCKET_NAME
Functions
test_get_all_columns_for_tables
def test_get_all_columns_for_tables(
populate_bucket
)
View Source
def test_get_all_columns_for_tables(populate_bucket):
source_columns = sources_parser.get_source_columns(
source_name="test_source",
table_name="test_table",
datalake_bucket=TEST_BUCKET_NAME
)
assert source_columns == [
{"name": "registration_dttm", "type": "TIMESTAMP"},
{"name": "id", "type": "FLOAT"},
{"name": "first_name", "type": "STRING"},
{"name": "last_name", "type": "STRING"},
{"name": "email", "type": "STRING"},
{"name": "gender", "type": "STRING"},
{"name": "ip_address", "type": "STRING"},
{"name": "cc", "type": "STRING"},
{"name": "country", "type": "STRING"},
{"name": "birthdate", "type": "STRING"},
{"name": "salary", "type": "FLOAT"},
{"name": "title", "type": "STRING"},
{"name": "comments", "type": "STRING"},
{"name": "month", "type": "STRING"},
]
test_get_sources
def test_get_sources(
populate_bucket
)
View Source
def test_get_sources(populate_bucket):
bucket = populate_bucket
test_source = datalake.get_source(
gcp_project=settings.DATALAKE_PROJECT,
datalake_bucket=bucket.name,
source_name="test_source"
)
assert test_source == {
'test_table': 'gs://brocolib_utils-test-bucket/test_source/test_table/'
}
test_setup_ok
def test_setup_ok(
populate_bucket
)
View Source
def test_setup_ok(populate_bucket):
bucket = populate_bucket
storage_client = gcs.get_storage_client()
raw_blobs = storage_client.list_blobs(bucket.name, prefix="test_source/")
blobs = [blob.name for blob in raw_blobs]
assert blobs == [
"test_source/test_table/month=1/userdata1.parquet",
"test_source/test_table/month=2/userdata2.parquet",
"test_source/test_table/month=3/userdata3.parquet"
]