Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cookiecutter.json

This file was deleted.

12 changes: 6 additions & 6 deletions template/update_layout.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,22 @@
# Remove Delta and Feature Store code in cases of MLflow Recipes.
{{ if (eq .input_include_mlflow_recipes `yes`) }}
# delta_paths
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/notebooks/Train.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/Train.py`) }}
# feature_store_paths
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `feature_engineering`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `tests/feature_engineering`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/notebooks/TrainWithFeatureStore.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/TrainWithFeatureStore.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `resources/feature-engineering-workflow-resource.yml`) }}
# Remove Delta and MLflow Recipes code in cases of Feature Store.
{{ else if (eq .input_include_feature_store `yes`) }}
# delta_paths
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/notebooks/Train.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/Train.py`) }}
# recipe_paths
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/profiles`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/steps`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/data`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/__init__.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/notebooks/TrainWithMLflowRecipes.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/TrainWithMLflowRecipes.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/recipe.yaml`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/README.md`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `tests/training/ingest_test.py`) }}
Expand All @@ -65,7 +65,7 @@
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/steps`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/data`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/__init__.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/notebooks/TrainWithMLflowRecipes.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/TrainWithMLflowRecipes.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/recipe.yaml`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/README.md`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `tests/training/ingest_test.py`) }}
Expand All @@ -76,7 +76,7 @@
# feature_store_paths
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `feature_engineering`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `tests/feature_engineering`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/notebooks/TrainWithFeatureStore.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/TrainWithFeatureStore.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `resources/feature-engineering-workflow-resource.yml`) }}
{{ end }}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ In each module, there is `compute_features_fn` method that you need to implement
The output dataframe will be persisted in a [time-series Feature Store table]({{ template `generate_doc_link` (map (pair "cloud" .input_cloud) (pair "path" "machine-learning/feature-store/time-series.html")) }}).
See the example modules' documentation for more information.
* Python unit tests for feature computation modules in `tests/feature_engineering` folder.
* Feature engineering notebook, `feature_engineering/notebooks/GenerateAndWriteFeatures.py`, that reads input dataframes, dynamically loads feature computation modules, executes their `compute_features_fn` method and writes the outputs to a Feature Store table (creating it if missing).
* Feature engineering notebook, `feature_engineering/GenerateAndWriteFeatures.py`, that reads input dataframes, dynamically loads feature computation modules, executes their `compute_features_fn` method and writes the outputs to a Feature Store table (creating it if missing).
* Training notebook that [trains]({{ template `generate_doc_link` (map (pair "cloud" .input_cloud) (pair "path" "machine-learning/feature-store/train-models-with-feature-store.html")) }} ) a regression model by creating a training dataset using the Feature Store client.
* Model deployment and batch inference notebooks that deploy and use the trained model.
* An automated integration test is provided (in `.github/workflows/{{ .input_project_name }}-run-tests.yml`) that executes a multi task run on Databricks involving the feature engineering and model training notebooks.
Expand Down Expand Up @@ -200,7 +200,7 @@ Otherwise, e.g. if iterating on ML code for a new project, follow the steps belo
You can iterate on the feature transform modules locally in your favorite IDE before running them on Databricks.

#### Running code on Databricks
You can iterate on ML code by running the provided `feature_engineering/notebooks/GenerateAndWriteFeatures.py` notebook on Databricks using
You can iterate on ML code by running the provided `feature_engineering/GenerateAndWriteFeatures.py` notebook on Databricks using
[Repos]({{ template `generate_doc_link` (map (pair "cloud" .input_cloud) (pair "path" "repos/index.html")) }}). This notebook drives execution of
the feature transforms code defined under ``features``. You can use multiple browser tabs to edit
logic in `features` and run the feature engineering pipeline in the `GenerateAndWriteFeatures.py` notebook.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,15 @@ dbutils.widgets.text(

# COMMAND ----------

import os

notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())
%cd $notebook_path

# COMMAND ----------

# MAGIC %pip install -r ../../../requirements.txt
# MAGIC %pip install -r ../../requirements.txt

# COMMAND ----------

dbutils.library.restartPython()

# COMMAND ----------

import sys
import os
notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())
%cd $notebook_path
%cd ..
sys.path.append("../..")

# COMMAND ----------

# DBTITLE 1,Define input and output variables
{{- if (eq .input_include_models_in_unity_catalog "no") }}
from utils import get_deployed_model_stage_for_env{{end}}

env = dbutils.widgets.get("env")
input_table_name = dbutils.widgets.get("input_table_name")
Expand All @@ -75,7 +57,7 @@ assert input_table_name != "", "input_table_name notebook parameter must be spec
assert output_table_name != "", "output_table_name notebook parameter must be specified"
assert model_name != "", "model_name notebook parameter must be specified"
{{- if (eq .input_include_models_in_unity_catalog "no") }}
stage = get_deployed_model_stage_for_env(env)
stage = {"dev": "Staging", "staging": "Staging", "prod": "Production", "test": "Production"}[env]
model_uri = f"models:/{model_name}/{stage}"{{else}}
alias = "champion"
model_uri = f"models:/{model_name}@{alias}"{{end}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,31 @@ def predict_batch(
mlflow.set_registry_uri("databricks-uc")
{{ end }}
table = spark_session.table(input_table_name)
{{ if (eq .input_include_feature_store `yes`) }}
{{ if (eq .input_include_models_in_unity_catalog `no`) }}
from databricks.feature_store import FeatureStoreClient

fs_client = FeatureStoreClient()

prediction_df = fs_client.score_batch(model_uri, table)
{{ else }}
from databricks.feature_engineering import FeatureEngineeringClient

{{ if (eq .input_include_feature_store `yes`) }}
from databricks.feature_engineering import FeatureEngineeringClient

fe_client = FeatureEngineeringClient()

prediction_df = fe_client.score_batch(model_uri = model_uri, df = table)
{{ end }}

prediction_df = fe_client.score_batch(model_uri=model_uri, df=table)
output_df = (
prediction_df.withColumn("prediction", prediction_df["prediction"])
.withColumn("model_id", lit(model_version))
.withColumn("timestamp", to_timestamp(lit(ts)))
)
{{ else }}
{{ else }}
predict = mlflow.pyfunc.spark_udf(
spark_session, model_uri, result_type="double", env_manager="virtualenv"
)
)

output_df = (
table.withColumn("prediction", predict(struct(*table.columns)))
.withColumn("model_id", lit(model_version))
.withColumn("timestamp", to_timestamp(lit(ts)))
)
{{ end -}}

output_df.display()

# Model predictions are written to the Delta table provided as input.
# Delta is the default format in Databricks Runtime 8.0 and above.
output_df.write.format("delta").mode("overwrite").saveAsTable(output_table_name)
output_df.write.format("delta").mode("overwrite").saveAsTable(output_table_name)
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ dbutils.widgets.dropdown("env", "None", ["None", "staging", "prod"], "Environmen

# COMMAND ----------

import os
import sys
notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())
%cd $notebook_path
%cd ..
sys.path.append("../..")

# COMMAND ----------

from deploy import deploy

model_uri = dbutils.jobs.taskValues.get("Train", "model_uri", debugValue="")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import sys
import pathlib

sys.path.append(str(pathlib.Path(__file__).parent.parent.parent.resolve()))
{{if (eq .input_include_models_in_unity_catalog "no")}}from utils import get_deployed_model_stage_for_env{{end}}
from mlflow.tracking import MlflowClient

{{ if (eq .input_include_models_in_unity_catalog "no") }}
Expand All @@ -19,7 +15,7 @@ def deploy(model_uri, env):
_, model_name, version = model_uri.split("/")
client = MlflowClient()
mv = client.get_model_version(model_name, version)
target_stage = get_deployed_model_stage_for_env(env)
target_stage = {"dev": "Staging", "staging": "Staging", "prod": "Production", "test": "Production"}[env]
if mv.current_stage != target_stage:
client.transition_model_version_stage(
name=model_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,6 @@ dbutils.widgets.text(

# COMMAND ----------

import os
notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())
%cd $notebook_path
%cd ../features

# COMMAND ----------

# DBTITLE 1,Define input and output variables
input_table_path = dbutils.widgets.get("input_table_path")
output_table_name = dbutils.widgets.get("output_table_name")
Expand Down Expand Up @@ -101,7 +94,7 @@ raw_data = spark.read.format("delta").load(input_table_path)
# Compute the features. This is done by dynamically loading the features module.
from importlib import import_module

mod = import_module(features_module)
mod = import_module(f"features.{features_module}")
compute_features_fn = getattr(mod, "compute_features_fn")

features_df = compute_features_fn(
Expand All @@ -114,35 +107,15 @@ features_df = compute_features_fn(
# COMMAND ----------

# DBTITLE 1, Write computed features.
{{- if (eq .input_include_models_in_unity_catalog `no`) }}
from databricks import feature_store

fs = feature_store.FeatureStoreClient()

# Create the feature table if it does not exist first.
# Note that this is a no-op if a table with the same name and schema already exists.
fs.create_table(
name=output_table_name,
primary_keys=[x.strip() for x in pk_columns.split(",")],
timestamp_keys=[ts_column],
df=features_df,
)

# Write the computed features dataframe.
fs.write_table(
name=output_table_name,
df=features_df,
mode="merge",
){{ else }}
from databricks.feature_engineering import FeatureEngineeringClient

fe = FeatureEngineeringClient()

# Create the feature table if it does not exist first.
# Note that this is a no-op if a table with the same name and schema already exists.
fe.create_table(
name=output_table_name,
primary_keys=[x.strip() for x in pk_columns.split(",")] + [ts_column], # Include timeseries column in primary_keys
name=output_table_name,
primary_keys=[x.strip() for x in pk_columns.split(",")] + [ts_column],
timestamp_keys=[ts_column],
df=features_df,
)
Expand All @@ -152,7 +125,7 @@ fe.write_table(
name=output_table_name,
df=features_df,
mode="merge",
){{ end }}
)

# COMMAND ----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ dbutils.widgets.text(

# COMMAND ----------

import os
import sys
notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())
%cd $notebook_path
%cd ..
sys.path.append("../..")

# COMMAND ----------

from metric_violation_check_query import sql_query

table_name_under_monitor = dbutils.widgets.get("table_name_under_monitor")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
# This file is used for the main SQL query that checks the last {num_evaluation_windows} metric violations and whether at least {num_violation_windows} of those runs violate the condition.

import sys
import pathlib

sys.path.append(str(pathlib.Path(__file__).parent.parent.parent.resolve()))

"""The SQL query is divided into three main parts. The first part selects the top {num_evaluation_windows}
values of the metric to be monitored, ordered by the time window, and saves as recent_metrics.
```sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ Its central purpose is to evaluate a registered model and validate its quality b
Model validation contains three components:
* [model-workflow-resource.yml](./model-workflow-resource.yml) contains the resource config and input parameters for model validation.
* [validation.py](../validation/validation.py) defines custom metrics and validation thresholds that are referenced by the above resource config files.
* [notebooks/ModelValidation](../validation/notebooks/ModelValidation.py) contains the validation job implementation. In most cases you don't need to modify this file.
* [ModelValidation](../validation/ModelValidation.py) contains the validation job implementation. In most cases you don't need to modify this file.

To set up and enable model validation, update [validation.py](../validation/validation.py) to return desired custom metrics and validation thresholds, then
resolve the `TODOs` in the ModelValidation task of [model-workflow-resource.yml](./model-workflow-resource.yml).
Expand All @@ -177,9 +177,9 @@ Its central purpose is to track production model performances, feature distribut

Monitoring contains four components:
* [metric_violation_check_query.py](../monitoring/metric_violation_check_query.py) defines a query that checks for violation of the monitored metric.
* [notebooks/MonitoredMetricViolationCheck](../monitoring/notebooks/MonitoredMetricViolationCheck.py) acts as an entry point, executing the violation check query against the monitored inference table.
* [MonitoredMetricViolationCheck](../monitoring/MonitoredMetricViolationCheck.py) acts as an entry point, executing the violation check query against the monitored inference table.
It emits a boolean value based on the query result.
* [monitoring-resource.yml](./monitoring-resource.yml) contains the resource config, inputs parameters for monitoring, and orchestrates model retraining based on monitoring. It first runs the [notebooks/MonitoredMetricViolationCheck](../monitoring/notebooks/MonitoredMetricViolationCheck.py)
* [monitoring-resource.yml](./monitoring-resource.yml) contains the resource config, inputs parameters for monitoring, and orchestrates model retraining based on monitoring. It first runs the [MonitoredMetricViolationCheck](../monitoring/MonitoredMetricViolationCheck.py)
entry point then decides whether to execute the model retraining workflow.

To set up and enable monitoring:
Expand Down Expand Up @@ -226,20 +226,20 @@ resources:
- task_key: batch_inference_job
<<: *new_cluster
notebook_task:
notebook_path: ../deployment/batch_inference/notebooks/BatchInference.py
notebook_path: ../deployment/batch_inference/BatchInference.py
base_parameters:
env: ${bundle.target}
input_table_name: batch_inference_input_table_name
...
```

The example above defines a Databricks job with name `${bundle.target}-{{ .input_project_name }}-batch-inference-job`
that runs the notebook under `{{template `project_name_alphanumeric_underscore` .}}/deployment/batch_inference/notebooks/BatchInference.py` to regularly apply your ML model for batch inference.
that runs the notebook under `{{template `project_name_alphanumeric_underscore` .}}/deployment/batch_inference/BatchInference.py` to regularly apply your ML model for batch inference.

At the start of the resource definition, we declared an anchor `new_cluster` that will be referenced and used later. For more information about anchors in yaml schema, please refer to the [yaml documentation](https://yaml.org/spec/1.2.2/#3222-anchors-and-aliases).

We specify a `batch_inference_job` under `resources/jobs` to define a databricks workflow with internal key `batch_inference_job` and job name `{bundle.target}-{{ .input_project_name }}-batch-inference-job`.
The workflow contains a single task with task key `batch_inference_job`. The task runs notebook `../deployment/batch_inference/notebooks/BatchInference.py` with provided parameters `env` and `input_table_name` passing to the notebook.
The workflow contains a single task with task key `batch_inference_job`. The task runs notebook `../deployment/batch_inference/BatchInference.py` with provided parameters `env` and `input_table_name` passing to the notebook.
After setting up databricks CLI, you can run command `databricks bundle schema` to learn more about databricks CLI bundles schema.

The notebook_path is the relative path starting from the resource yaml file.
Expand Down Expand Up @@ -281,7 +281,7 @@ resources:
- task_key: batch_inference_job
<<: *new_cluster
notebook_task:
notebook_path: ../deployment/batch_inference/notebooks/BatchInference.py
notebook_path: ../deployment/batch_inference/BatchInference.py
base_parameters:
env: ${bundle.target}
input_table_name: ${var.batch_inference_input_table}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ resources:
- task_key: batch_inference_job
<<: *new_cluster
notebook_task:
notebook_path: ../deployment/batch_inference/notebooks/BatchInference.py
notebook_path: ../deployment/batch_inference/BatchInference.py
base_parameters:
env: ${bundle.target}
{{ if (eq .input_include_feature_store `yes`) }}{{ if (eq .input_include_models_in_unity_catalog `yes`) }}input_table_name: ${bundle.target}.{{ .input_schema_name }}.feature_store_inference_input # TODO: create input table for inference
Expand Down
Loading