Skip to content

Module extract_load.brocolib_extract_load.datawarehouse

View Source
from google.cloud import bigquery

def create_external_table(project_id, bucket_name_destination, file_type, file_name, table_ref, dataset, folder_name, schemas_dict ):

  '''

  Function create an external table from files in GCS

  Parameters:

    project_id(str): ID of the project

    bucket_name_destination (str): Name of the bucket

    file_type(str) : the type of the file

    file_name (str) : Name of the file

    table_ref (str) : Name of the table created

    dataset (str) : the dataset where the table is created

    schemas_dict (dict) : dict with fields as keys and types as values

  Returns:

    external table (bigquery.Table): created external table

  '''

  client = bigquery.Client(project_id)

  #Define your schema

  ls_schemas =[]

  for key,value in schemas_dict.items():

    schemafield = bigquery.schema.SchemaField(key,value)

    ls_schemas.append(schemafield)

  dataset_ref = client.dataset(dataset)

  table_ref = bigquery.TableReference(dataset_ref, table_ref )

  table = bigquery.Table(table_ref, ls_schemas)

  external_config = bigquery.ExternalConfig(file_type)

  source_uris = [f'gs://{bucket_name_destination}/{folder_name}/*/{file_name}'] #i.e for a csv file in a Cloud Storage bucket

                                                #it would be something like "gs://<your-bucket>/<your-csv-file>"

  external_config.source_uris = source_uris

  file_type = file_type.upper()

  if file_type == 'CSV':

    external_config.options.field_delimiter = ","

    external_config.options.encoding = "UTF-8"

    external_config.options.skip_leading_rows = 1

  else :

    raise NotImplementedError

  table.external_data_configuration = external_config

  # external_config.options.quote_character

  client.create_table(table)

Functions

create_external_table

def create_external_table(
    project_id,
    bucket_name_destination,
    file_type,
    file_name,
    table_ref,
    dataset,
    folder_name,
    schemas_dict
)

Function create an external table from files in GCS

Parameters:

Name Type Description Default
project_id str ID of the project None
bucket_name_destination str Name of the bucket None
file_type str the type of the file None
file_name str Name of the file None
table_ref str Name of the table created None
dataset str the dataset where the table is created None
schemas_dict dict dict with fields as keys and types as values None

Returns:

Type Description
None external table (bigquery.Table): created external table
View Source
def create_external_table(project_id, bucket_name_destination, file_type, file_name, table_ref, dataset, folder_name, schemas_dict ):

  '''

  Function create an external table from files in GCS

  Parameters:

    project_id(str): ID of the project

    bucket_name_destination (str): Name of the bucket

    file_type(str) : the type of the file

    file_name (str) : Name of the file

    table_ref (str) : Name of the table created

    dataset (str) : the dataset where the table is created

    schemas_dict (dict) : dict with fields as keys and types as values

  Returns:

    external table (bigquery.Table): created external table

  '''

  client = bigquery.Client(project_id)

  #Define your schema

  ls_schemas =[]

  for key,value in schemas_dict.items():

    schemafield = bigquery.schema.SchemaField(key,value)

    ls_schemas.append(schemafield)

  dataset_ref = client.dataset(dataset)

  table_ref = bigquery.TableReference(dataset_ref, table_ref )

  table = bigquery.Table(table_ref, ls_schemas)

  external_config = bigquery.ExternalConfig(file_type)

  source_uris = [f'gs://{bucket_name_destination}/{folder_name}/*/{file_name}'] #i.e for a csv file in a Cloud Storage bucket

                                                #it would be something like "gs://<your-bucket>/<your-csv-file>"

  external_config.source_uris = source_uris

  file_type = file_type.upper()

  if file_type == 'CSV':

    external_config.options.field_delimiter = ","

    external_config.options.encoding = "UTF-8"

    external_config.options.skip_leading_rows = 1

  else :

    raise NotImplementedError

  table.external_data_configuration = external_config

  # external_config.options.quote_character

  client.create_table(table)