Skip to content

Module extract_load.brocolib_extract_load.pubsub

View Source
import json

def publish_message_toPubSub(project_id, topic_id, message, logger):

    from google.cloud import pubsub

    publisher = pubsub.PublisherClient()

    topic_path = publisher.topic_path(project_id, topic_id)

    future = publisher.publish(topic_path, message)

    if logger:

        logger.info(future.result())

    return future.result()

def publish_sources(sources, dbt_topic, gcp_project, logger=None):

    message = {"sources":sources}

    result = publish_message_toPubSub(

        project_id=gcp_project,

        topic_id=dbt_topic,

        message=json.dumps(message).encode("utf-8"),

        logger=logger

    )

    if logger:

        logger.info(result)

Functions

publish_message_toPubSub

def publish_message_toPubSub(
    project_id,
    topic_id,
    message,
    logger
)
View Source
def publish_message_toPubSub(project_id, topic_id, message, logger):

    from google.cloud import pubsub

    publisher = pubsub.PublisherClient()

    topic_path = publisher.topic_path(project_id, topic_id)

    future = publisher.publish(topic_path, message)

    if logger:

        logger.info(future.result())

    return future.result()

publish_sources

def publish_sources(
    sources,
    dbt_topic,
    gcp_project,
    logger=None
)
View Source
def publish_sources(sources, dbt_topic, gcp_project, logger=None):

    message = {"sources":sources}

    result = publish_message_toPubSub(

        project_id=gcp_project,

        topic_id=dbt_topic,

        message=json.dumps(message).encode("utf-8"),

        logger=logger

    )

    if logger:

        logger.info(result)