Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cookiecutter.json

This file was deleted.

12 changes: 6 additions & 6 deletions template/update_layout.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,22 @@
# Remove Delta and Feature Store code in cases of MLflow Recipes.
{{ if (eq .input_include_mlflow_recipes `yes`) }}
# delta_paths
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/notebooks/Train.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/Train.py`) }}
# feature_store_paths
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `feature_engineering`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `tests/feature_engineering`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/notebooks/TrainWithFeatureStore.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/TrainWithFeatureStore.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `resources/feature-engineering-workflow-resource.yml`) }}
# Remove Delta and MLflow Recipes code in cases of Feature Store.
{{ else if (eq .input_include_feature_store `yes`) }}
# delta_paths
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/notebooks/Train.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/Train.py`) }}
# recipe_paths
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/profiles`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/steps`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/data`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/__init__.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/notebooks/TrainWithMLflowRecipes.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/TrainWithMLflowRecipes.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/recipe.yaml`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/README.md`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `tests/training/ingest_test.py`) }}
Expand All @@ -65,7 +65,7 @@
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/steps`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/data`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/__init__.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/notebooks/TrainWithMLflowRecipes.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/TrainWithMLflowRecipes.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/recipe.yaml`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/README.md`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `tests/training/ingest_test.py`) }}
Expand All @@ -76,7 +76,7 @@
# feature_store_paths
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `feature_engineering`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `tests/feature_engineering`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/notebooks/TrainWithFeatureStore.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `training/TrainWithFeatureStore.py`) }}
{{ skip (printf `%s/%s/%s` $root_dir $project_name_alphanumeric_underscore `resources/feature-engineering-workflow-resource.yml`) }}
{{ end }}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ In each module, there is `compute_features_fn` method that you need to implement
The output dataframe will be persisted in a [time-series Feature Store table]({{ template `generate_doc_link` (map (pair "cloud" .input_cloud) (pair "path" "machine-learning/feature-store/time-series.html")) }}).
See the example modules' documentation for more information.
* Python unit tests for feature computation modules in `tests/feature_engineering` folder.
* Feature engineering notebook, `feature_engineering/notebooks/GenerateAndWriteFeatures.py`, that reads input dataframes, dynamically loads feature computation modules, executes their `compute_features_fn` method and writes the outputs to a Feature Store table (creating it if missing).
* Feature engineering notebook, `feature_engineering/GenerateAndWriteFeatures.py`, that reads input dataframes, dynamically loads feature computation modules, executes their `compute_features_fn` method and writes the outputs to a Feature Store table (creating it if missing).
* Training notebook that [trains]({{ template `generate_doc_link` (map (pair "cloud" .input_cloud) (pair "path" "machine-learning/feature-store/train-models-with-feature-store.html")) }} ) a regression model by creating a training dataset using the Feature Store client.
* Model deployment and batch inference notebooks that deploy and use the trained model.
* An automated integration test is provided (in `.github/workflows/{{ .input_project_name }}-run-tests.yml`) that executes a multi task run on Databricks involving the feature engineering and model training notebooks.
Expand Down Expand Up @@ -200,7 +200,7 @@ Otherwise, e.g. if iterating on ML code for a new project, follow the steps belo
You can iterate on the feature transform modules locally in your favorite IDE before running them on Databricks.

#### Running code on Databricks
You can iterate on ML code by running the provided `feature_engineering/notebooks/GenerateAndWriteFeatures.py` notebook on Databricks using
You can iterate on ML code by running the provided `feature_engineering/GenerateAndWriteFeatures.py` notebook on Databricks using
[Repos]({{ template `generate_doc_link` (map (pair "cloud" .input_cloud) (pair "path" "repos/index.html")) }}). This notebook drives execution of
the feature transforms code defined under ``features``. You can use multiple browser tabs to edit
logic in `features` and run the feature engineering pipeline in the `GenerateAndWriteFeatures.py` notebook.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,7 @@ dbutils.widgets.text(

# COMMAND ----------

import os

notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())
%cd $notebook_path

# COMMAND ----------

# MAGIC %pip install -r ../../../requirements.txt

# COMMAND ----------

dbutils.library.restartPython()

# COMMAND ----------

import sys
import os
notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())
%cd $notebook_path
%cd ..
sys.path.append("../..")

# COMMAND ----------

# DBTITLE 1,Define input and output variables
{{- if (eq .input_include_models_in_unity_catalog "no") }}
from utils import get_deployed_model_stage_for_env{{end}}

env = dbutils.widgets.get("env")
input_table_name = dbutils.widgets.get("input_table_name")
Expand All @@ -75,7 +49,7 @@ assert input_table_name != "", "input_table_name notebook parameter must be spec
assert output_table_name != "", "output_table_name notebook parameter must be specified"
assert model_name != "", "model_name notebook parameter must be specified"
{{- if (eq .input_include_models_in_unity_catalog "no") }}
stage = get_deployed_model_stage_for_env(env)
stage = {"dev": "Staging", "staging": "Staging", "prod": "Production", "test": "Production"}[env]
model_uri = f"models:/{model_name}/{stage}"{{else}}
alias = "champion"
model_uri = f"models:/{model_name}@{alias}"{{end}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def predict_batch(
)
{{ else }}
predict = mlflow.pyfunc.spark_udf(
spark_session, model_uri, result_type="double", env_manager="virtualenv"
spark_session, model_uri, result_type="double"
)

output_df = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ dbutils.widgets.dropdown("env", "None", ["None", "staging", "prod"], "Environmen

# COMMAND ----------

import os
import sys
notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())
%cd $notebook_path
%cd ..
sys.path.append("../..")

# COMMAND ----------

from deploy import deploy

model_uri = dbutils.jobs.taskValues.get("Train", "model_uri", debugValue="")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import sys
import pathlib

sys.path.append(str(pathlib.Path(__file__).parent.parent.parent.resolve()))
{{if (eq .input_include_models_in_unity_catalog "no")}}from utils import get_deployed_model_stage_for_env{{end}}
from mlflow.tracking import MlflowClient

{{ if (eq .input_include_models_in_unity_catalog "no") }}
Expand All @@ -19,7 +15,7 @@ def deploy(model_uri, env):
_, model_name, version = model_uri.split("/")
client = MlflowClient()
mv = client.get_model_version(model_name, version)
target_stage = get_deployed_model_stage_for_env(env)
target_stage = {"dev": "Staging", "staging": "Staging", "prod": "Production", "test": "Production"}[env]
if mv.current_stage != target_stage:
client.transition_model_version_stage(
name=model_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,6 @@ dbutils.widgets.text(

# COMMAND ----------

import os
notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())
%cd $notebook_path
%cd ../features

# COMMAND ----------

# DBTITLE 1,Define input and output variables
input_table_path = dbutils.widgets.get("input_table_path")
output_table_name = dbutils.widgets.get("output_table_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ dbutils.widgets.text(

# COMMAND ----------

import os
import sys
notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())
%cd $notebook_path
%cd ..
sys.path.append("../..")

# COMMAND ----------

from metric_violation_check_query import sql_query

table_name_under_monitor = dbutils.widgets.get("table_name_under_monitor")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
# This file is used for the main SQL query that checks the last {num_evaluation_windows} metric violations and whether at least {num_violation_windows} of those runs violate the condition.

import sys
import pathlib

sys.path.append(str(pathlib.Path(__file__).parent.parent.parent.resolve()))

"""The SQL query is divided into three main parts. The first part selects the top {num_evaluation_windows}
values of the metric to be monitored, ordered by the time window, and saves as recent_metrics.
```sql
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
mlflow==2.11.3
lightgbm
numpy>=1.23.0
pandas==1.5.3
scikit-learn>=1.1.1
matplotlib>=3.5.2
pillow>=10.0.1
Jinja2==3.0.3
pyspark~=3.3.0
pytz~=2022.2.1
pytest>=7.1.2
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ Its central purpose is to evaluate a registered model and validate its quality b
Model validation contains three components:
* [model-workflow-resource.yml](./model-workflow-resource.yml) contains the resource config and input parameters for model validation.
* [validation.py](../validation/validation.py) defines custom metrics and validation thresholds that are referenced by the above resource config files.
* [notebooks/ModelValidation](../validation/notebooks/ModelValidation.py) contains the validation job implementation. In most cases you don't need to modify this file.
* [ModelValidation](../validation/ModelValidation.py) contains the validation job implementation. In most cases you don't need to modify this file.

To set up and enable model validation, update [validation.py](../validation/validation.py) to return desired custom metrics and validation thresholds, then
resolve the `TODOs` in the ModelValidation task of [model-workflow-resource.yml](./model-workflow-resource.yml).
Expand All @@ -177,9 +177,9 @@ Its central purpose is to track production model performances, feature distribut

Monitoring contains four components:
* [metric_violation_check_query.py](../monitoring/metric_violation_check_query.py) defines a query that checks for violation of the monitored metric.
* [notebooks/MonitoredMetricViolationCheck](../monitoring/notebooks/MonitoredMetricViolationCheck.py) acts as an entry point, executing the violation check query against the monitored inference table.
* [MonitoredMetricViolationCheck](../monitoring/MonitoredMetricViolationCheck.py) acts as an entry point, executing the violation check query against the monitored inference table.
It emits a boolean value based on the query result.
* [monitoring-resource.yml](./monitoring-resource.yml) contains the resource config, inputs parameters for monitoring, and orchestrates model retraining based on monitoring. It first runs the [notebooks/MonitoredMetricViolationCheck](../monitoring/notebooks/MonitoredMetricViolationCheck.py)
* [monitoring-resource.yml](./monitoring-resource.yml) contains the resource config, inputs parameters for monitoring, and orchestrates model retraining based on monitoring. It first runs the [MonitoredMetricViolationCheck](../monitoring/MonitoredMetricViolationCheck.py)
entry point then decides whether to execute the model retraining workflow.

To set up and enable monitoring:
Expand Down Expand Up @@ -226,20 +226,20 @@ resources:
- task_key: batch_inference_job
<<: *new_cluster
notebook_task:
notebook_path: ../deployment/batch_inference/notebooks/BatchInference.py
notebook_path: ../deployment/batch_inference/BatchInference.py
base_parameters:
env: ${bundle.target}
input_table_name: batch_inference_input_table_name
...
```

The example above defines a Databricks job with name `${bundle.target}-{{ .input_project_name }}-batch-inference-job`
that runs the notebook under `{{template `project_name_alphanumeric_underscore` .}}/deployment/batch_inference/notebooks/BatchInference.py` to regularly apply your ML model for batch inference.
that runs the notebook under `{{template `project_name_alphanumeric_underscore` .}}/deployment/batch_inference/BatchInference.py` to regularly apply your ML model for batch inference.

At the start of the resource definition, we declared an anchor `new_cluster` that will be referenced and used later. For more information about anchors in yaml schema, please refer to the [yaml documentation](https://yaml.org/spec/1.2.2/#3222-anchors-and-aliases).

We specify a `batch_inference_job` under `resources/jobs` to define a databricks workflow with internal key `batch_inference_job` and job name `{bundle.target}-{{ .input_project_name }}-batch-inference-job`.
The workflow contains a single task with task key `batch_inference_job`. The task runs notebook `../deployment/batch_inference/notebooks/BatchInference.py` with provided parameters `env` and `input_table_name` passing to the notebook.
The workflow contains a single task with task key `batch_inference_job`. The task runs notebook `../deployment/batch_inference/BatchInference.py` with provided parameters `env` and `input_table_name` passing to the notebook.
After setting up databricks CLI, you can run command `databricks bundle schema` to learn more about databricks CLI bundles schema.

The notebook_path is the relative path starting from the resource yaml file.
Expand Down Expand Up @@ -281,7 +281,7 @@ resources:
- task_key: batch_inference_job
<<: *new_cluster
notebook_task:
notebook_path: ../deployment/batch_inference/notebooks/BatchInference.py
notebook_path: ../deployment/batch_inference/BatchInference.py
base_parameters:
env: ${bundle.target}
input_table_name: ${var.batch_inference_input_table}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
new_cluster: &new_cluster
new_cluster:
num_workers: 3
spark_version: 15.3.x-cpu-ml-scala2.12
node_type_id: {{template `cloud_specific_node_type_id` .}}
data_security_mode: "SINGLE_USER"
custom_tags:
clusterSource: mlops-stacks_{{template `stacks_version` .}}

common_permissions: &permissions
permissions:
- level: CAN_VIEW
Expand All @@ -18,9 +9,9 @@ resources:
name: ${bundle.target}-{{ .input_project_name }}-batch-inference-job
tasks:
- task_key: batch_inference_job
<<: *new_cluster
environment_key: default
notebook_task:
notebook_path: ../deployment/batch_inference/notebooks/BatchInference.py
notebook_path: ../deployment/batch_inference/BatchInference.py
base_parameters:
env: ${bundle.target}
{{ if (eq .input_include_feature_store `yes`) }}{{ if (eq .input_include_models_in_unity_catalog `yes`) }}input_table_name: ${bundle.target}.{{ .input_schema_name }}.feature_store_inference_input # TODO: create input table for inference
Expand All @@ -33,6 +24,12 @@ resources:
# git source information of current ML resource deployment. It will be persisted as part of the workflow run
git_source_info: url:${bundle.git.origin_url}; branch:${bundle.git.branch}; commit:${bundle.git.commit}

environments:
- environment_key: default
spec:
client: "4"
dependencies:
- -r ../requirements.txt
schedule:
quartz_cron_expression: "0 0 11 * * ?" # daily at 11am
timezone_id: UTC
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
new_cluster: &new_cluster
new_cluster:
num_workers: 3
spark_version: 15.3.x-cpu-ml-scala2.12
node_type_id: {{template `cloud_specific_node_type_id` .}}
data_security_mode: "SINGLE_USER"
custom_tags:
clusterSource: mlops-stacks_{{template `stacks_version` .}}

common_permissions: &permissions
permissions:
- level: CAN_VIEW
Expand All @@ -16,14 +7,11 @@ resources:
jobs:
write_feature_table_job:
name: ${bundle.target}-{{ .input_project_name }}-write-feature-table-job
job_clusters:
- job_cluster_key: write_feature_table_job_cluster
<<: *new_cluster
tasks:
- task_key: PickupFeatures
job_cluster_key: write_feature_table_job_cluster
environment_key: default
notebook_task:
notebook_path: ../feature_engineering/notebooks/GenerateAndWriteFeatures.py
notebook_path: ../feature_engineering/GenerateAndWriteFeatures.py
base_parameters:
# TODO modify these arguments to reflect your setup.
input_table_path: /databricks-datasets/nyctaxi-with-zipcodes/subsampled
Expand All @@ -38,9 +26,9 @@ resources:
# git source information of current ML resource deployment. It will be persisted as part of the workflow run
git_source_info: url:${bundle.git.origin_url}; branch:${bundle.git.branch}; commit:${bundle.git.commit}
- task_key: DropoffFeatures
job_cluster_key: write_feature_table_job_cluster
environment_key: default
notebook_task:
notebook_path: ../feature_engineering/notebooks/GenerateAndWriteFeatures.py
notebook_path: ../feature_engineering/GenerateAndWriteFeatures.py
base_parameters:
# TODO: modify these arguments to reflect your setup.
input_table_path: /databricks-datasets/nyctaxi-with-zipcodes/subsampled
Expand All @@ -54,6 +42,12 @@ resources:
primary_keys: zip
# git source information of current ML resource deployment. It will be persisted as part of the workflow run
git_source_info: url:${bundle.git.origin_url}; branch:${bundle.git.branch}; commit:${bundle.git.commit}
environments:
- environment_key: default
spec:
client: "4"
dependencies:
- -r "../requirements.txt"
schedule:
quartz_cron_expression: "0 0 7 * * ?" # daily at 7am
timezone_id: UTC
Expand Down
Loading