Make dbt great again
dbt is a popular solution for batch data processing in data analytics. While it operates on an open-core model, which can sometimes limit the inclusion of community features in the open-source version. no worries opendbt is here to solve it. opendbt offers a fully open-source package to address these concerns. OpenDBT builds upon dbt-core, adding valuable features without altering its core code.
How? Here’s a step-by-step example. We’ll demonstrate how to use OpenDBT to activate custom adapters and execute Python models locally within dbt. The key feature we’ll use is OpenDBT’s ability to “registering custom adapter classes”.
The problem
Using dbt core, one could do data Transform step inside the data part-form using dbt SQL, or Python (spark, snowpark). pretty much doing Doing T of ELT however its not possible to do ET Extract and load. for eaxmple extracting data from web api and Loading it to Data platform. the very fist step of data pipeline. This task usually done with python coding outside of dbt, which hides data lineage and dataflow dependencies.
Extract Load with opendbt, using dbt-core and python
External data imports with opendbt
In this post we will see how to do Extract and Load with dbt using opendbt. with the solution entire data flow is done with dbt. That means full data dependencies end to end defined by dbt and documented.
While dbt core is powerful for data transformation (the ‘T’ in ELT), it’s limited to E and L step. It cannot handle the initial Extraction and Loading (the ‘E’ and ‘L’) of data, such as fetching data from web APIs and loading it into a data platform.
Typically, these early steps are handled outside of dbt, often with Python scripts. This approach can obscure data lineage and dependencies.
In this post, we’ll explore how to use OpenDBT to perform both extraction and loading within dbt. By doing so, we can maintain end-to-end data flow visibility and dependency tracking, ensuring a more robust and transparent data pipeline.
Using your own customized Adapter
Step 1: Create a Custom Adapter
We’ll start by extending an existing adapter (in this case, DuckDBAdapter) and customizing it to meet our specific needs. We’ll add a new method to the adapter, making it accessible to dbt Jinja templates using the @available decorator. This method will be used to execute dbt Python models later on. see full adapter code
class DuckDBAdapterV2Custom(DuckDBAdapter):
@available
def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
# python metdond to run local pyhon code, dbt pyhn models
model_unique_id = parsed_model.get('unique_id')
__py_code = f"""
{compiled_code}
# NOTE this is local python execution so session is None
model(dbt=dbtObj(None), session=None)
"""
with tempfile.NamedTemporaryFile(suffix=f'__{model_unique_id}.py', delete=False) as fp:
fp.write(__py_code.encode('utf-8'))
fp.close()
print(f"Created temp py file {fp.name}")
Utils.runcommand(command=['python', fp.name])
Step 2: Registering the Custom Adapter
OpenDBT allows you to define custom adapters using the dbt_custom_adapter variable. To activate our custom adapter, add the following configuration to your dbt_project.yml file:
vars:
dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom
Step 3: Creating a dbt Macro to Execute Python Code
Next, we’ll create a dbt macro named execute_python.sql. This macro will pass the compiled Python code to the new method we added to our custom adapter. see full macro code
{% materialization executepython, supported_languages=['python']%}
{%- set identifier = model['alias'] -%}
{%- set language = model['language'] -%}
{% set grant_config = config.get('grants') %}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{{ run_hooks(pre_hooks) }}
{% call noop_statement(name='main', message='Executed Python', code=compiled_code, rows_affected=-1, res=None) %}
{%- set res = adapter.submit_local_python_job(model, compiled_code) -%}
{% endcall %}
{{ run_hooks(post_hooks) }}
{% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
Step 4: Executing Local Python Models
Ready to Run! With everything configured, let’s create and run Python a model locally, eliminating the need for Spark or Snowpark.
Create Your Python Model:
Save your model code in a file named models/my_local_python_model.py. In this model, we’ll simply print the Python version and platform information to demonstrate local execution.
import os
import platform
def print_info():
_str = f"name:{os.name}, system:{platform.system()} release:{platform.release()}"
_str += f"\npython version:{platform.python_version()}, dbt:{version.__version__}"
print(_str)
def model(dbt, session):
dbt.config(materialized="executepython")
print("==================================================")
print("========IM LOCALLY EXECUTED PYTHON MODEL==========")
print("==================================================")
print_info()
print("==================================================")
print("===============MAKE DBT GREAT AGAIN===============")
print("==================================================")
return None
Step 5: Execute the Python Model:
dp = OpenDbtProject(project_dir='/dbt/project', profiles_dir='/dbt/project')
dp.run(command="run", args=['--select', 'my_local_python_model'])
The output should look something like this:
09:57:11 Running with dbt=1.8.7
09:57:11 Registered adapter: duckdb=1.8.4
09:57:12 Unable to do partial parsing because config vars, config profile, or config target have changed
09:57:13 Found 4 models, 4 data tests, 418 macros
09:57:13
09:57:15 Concurrency: 1 threads (target='dev')
09:57:15
09:57:15 1 of 1 START python executepython model main.my_executepython_dbt_model ........ [RUN]
Created temp py file /var/folders/3l/n5dbz15s68592fk76c31hth8ffmnng/T/tmp4l6hufwl__model.dbttest.my_executepython_dbt_model.py
==================================================
========IM LOCALLY EXECUTED PYTHON MODEL==========
==================================================
name:posix, system:Darwin release:23.5.0
python version:3.9.19, dbt:1.8.7
==================================================
===============MAKE DBT GREAT AGAIN===============
==================================================
09:57:15 1 of 1 OK created python executepython model main.my_executepython_dbt_model ... [Executed Python in 0.51s]
09:57:15
09:57:15 Finished running 1 executepython model in 0 hours 0 minutes and 2.53 seconds (2.53s).
09:57:15
09:57:15 Completed successfully
09:57:15
09:57:15 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
Here’s a breakdown of the execution flow:
The full code, test cases, documentation and additional features are available on GitHub.
Wrap-Up and Contributions
The project completely open-source, using the Apache 2.0 license. opendbt still is a young project and there are things to improve. Please feel free to test it, give feedback, open feature requests or send pull requests. You can see more examples and start experimenting with opendbt using github project
For Part 2 please see: Streamline Your Data Pipelines with opendbt: Leverage dbt and dlt for Effortless End-to-End ELT Workflows