diff --git a/public/tour-pipelines/Subgraphs.pipeline.component.yaml b/public/tour-pipelines/Subgraphs.pipeline.component.yaml new file mode 100644 index 000000000..fc0007c07 --- /dev/null +++ b/public/tour-pipelines/Subgraphs.pipeline.component.yaml @@ -0,0 +1,1035 @@ +name: 'Intro: Subgraphs' +description: | + Organising work with subgraphs (nested pipelines). Three top-level tasks are themselves subgraphs; each with its own internal task graph, inputs, and outputs. Data Preparation generates and splits the data; Training fits a linear model; Evaluation predicts and scores. Double-click a subgraph in the UI to see inside it. +metadata: + annotations: + editor.flow-direction: left-to-right +implementation: + graph: + tasks: + Training: + componentRef: + name: Training + digest: bb9f710b9685cc5e839510b94edb30cf5f5ba5b864743afa0292bcfbb821b792 + spec: + name: Training + description: Train a linear regression model + inputs: + - name: training_data + type: CSV + annotations: + editor.position: '{"x":0,"y":0}' + - name: target_column + type: String + default: target + annotations: + editor.position: '{"x":47,"y":176}' + outputs: + - name: model + type: JSON + annotations: + editor.position: '{"x":681,"y":76}' + implementation: + graph: + tasks: + Fit Model: + componentRef: + name: Train regression + digest: e4292a5974ba0c989f95fff77d993e75eb9c6b26ebe23d8df775f804d22309f0 + spec: + name: Train regression + description: |- + Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: train_regression.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def train_regression( + training_data: components.InputPath("CSV"), + model: components.OutputPath("JSON"), + target_column: str = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + python_original_code_path: train_regression.py + components new regenerate python-function-component: 'true' + inputs: + - name: training_data + type: CSV + description: Input CSV with feature columns and a target column. + - name: target_column + type: String + description: Name of the column to predict. + default: target + optional: true + outputs: + - name: model + type: JSON + description: Output JSON file with trained model parameters. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def train_regression( + training_data, + model, + target_column = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + + import argparse + _parser = argparse.ArgumentParser(prog='Train regression', description='Train a simple linear regression model using ordinary least squares.\n\nFits weights and bias to minimise squared error. Uses only Python stdlib\n(no numpy/sklearn). The trained model is saved as a JSON file containing\nthe weight vector, bias, feature names, and training metrics.') + _parser.add_argument("--training-data", dest="training_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--target-column", dest="target_column", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = train_regression(**_parsed_args) + args: + - '--training-data' + - inputPath: training_data + - if: + cond: + isPresent: target_column + then: + - '--target-column' + - inputValue: target_column + - '--model' + - outputPath: model + arguments: + target_column: + graphInput: + inputName: target_column + training_data: + graphInput: + inputName: training_data + annotations: + editor.position: '{"x":301,"y":55.5}' + outputValues: + model: + taskOutput: + outputName: model + taskId: Fit Model + arguments: + training_data: + taskOutput: + outputName: train_data + taskId: Data Preparation + annotations: + editor.position: '{"x": 500, "y": 0}' + Evaluation: + componentRef: + name: Evaluation + digest: 3e29013959318f296132514881cba7efaf948bc0618098527f6c086c3f24c66f + spec: + name: Evaluation + description: Predict on test data and compute regression metrics + inputs: + - name: test_data + type: CSV + annotations: + editor.position: '{"x":0,"y":25.25}' + - name: model + type: JSON + annotations: + editor.position: '{"x":0,"y":201.25}' + outputs: + - name: predictions + type: CSV + annotations: + editor.position: '{"x":749.5,"y":0}' + - name: metrics + type: JSON + annotations: + editor.position: '{"x":1061,"y":202.5}' + implementation: + graph: + tasks: + Predict: + componentRef: + name: Predict + digest: 4841c31fc75f2d26a5a7d3123d6fa6fc6b43d8badd549ad3ac3e20119860938d + spec: + name: Predict + description: |- + Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: predict.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def predict( + test_data: components.InputPath("CSV"), + model: components.InputPath("JSON"), + predictions: components.OutputPath("CSV"), + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + python_original_code_path: predict.py + components new regenerate python-function-component: 'true' + inputs: + - name: test_data + type: CSV + description: Input CSV with the same feature columns used in training. + - name: model + type: JSON + description: Trained model JSON (from train_regression). + outputs: + - name: predictions + type: CSV + description: Output CSV with actual and predicted values. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def predict( + test_data, + model, + predictions, + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + + import argparse + _parser = argparse.ArgumentParser(prog='Predict', description='Apply a trained linear regression model to produce predictions.\n\nReads the model JSON (weights + bias) and the test CSV, computes\npredicted values, and writes a CSV with columns: actual, predicted.') + _parser.add_argument("--test-data", dest="test_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--predictions", dest="predictions", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = predict(**_parsed_args) + args: + - '--test-data' + - inputPath: test_data + - '--model' + - inputPath: model + - '--predictions' + - outputPath: predictions + arguments: + model: + graphInput: + inputName: model + test_data: + graphInput: + inputName: test_data + annotations: + editor.position: '{"x":301,"y":86.75}' + Compute Metrics: + componentRef: + name: Evaluate + digest: c26e9e058d298c1c57dd96e15ea4261b99439a53fa9b323db4e9ef783933954c + spec: + name: Evaluate + description: |- + Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: evaluate.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + import math + + + def evaluate( + predictions: components.InputPath("CSV"), + metrics: components.OutputPath("JSON"), + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + python_original_code_path: evaluate.py + components new regenerate python-function-component: 'true' + inputs: + - name: predictions + type: CSV + description: Input CSV with actual and predicted columns. + outputs: + - name: metrics + type: JSON + description: Output JSON with computed regression metrics. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + import math + + def evaluate( + predictions, + metrics, + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + + import argparse + _parser = argparse.ArgumentParser(prog='Evaluate', description='Compute regression metrics from a predictions CSV.\n\nExpects columns: actual, predicted. Outputs a JSON file with\nMAE, MSE, RMSE, R-squared, and row count.') + _parser.add_argument("--predictions", dest="predictions", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--metrics", dest="metrics", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = evaluate(**_parsed_args) + args: + - '--predictions' + - inputPath: predictions + - '--metrics' + - outputPath: metrics + arguments: + predictions: + taskOutput: + outputName: predictions + taskId: Predict + annotations: + editor.position: '{"x":681,"y":200}' + outputValues: + metrics: + taskOutput: + outputName: metrics + taskId: Compute Metrics + predictions: + taskOutput: + outputName: predictions + taskId: Predict + arguments: + model: + taskOutput: + outputName: model + taskId: Training + test_data: + taskOutput: + outputName: test_data + taskId: Data Preparation + annotations: + editor.position: '{"x": 500, "y": 300}' + Data Preparation: + componentRef: + name: Data Preparation + digest: 653c0dddb09515926be064acab5ae1fc9ad0f25f4b9376882532740b91ca0658 + spec: + name: Data Preparation + description: Generate synthetic data and split into train/test + inputs: + - name: num_rows + type: Integer + default: '500' + annotations: + editor.position: '{"x":0,"y":32.5}' + - name: train_fraction + type: Float + default: '0.8' + annotations: + editor.position: '{"x":288,"y":241}' + outputs: + - name: train_data + type: CSV + annotations: + editor.position: '{"x":954,"y":24.75}' + - name: test_data + type: CSV + annotations: + editor.position: '{"x":958,"y":224.75}' + implementation: + graph: + tasks: + Split: + componentRef: + name: Split csv + digest: 7dbbe3ac41f4e820f0d168ef355ada703716f4593eb5e70664746eebe0fe79e7 + spec: + name: Split csv + description: |- + Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: split_csv.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import random + + + def split_csv( + input_data: components.InputPath("CSV"), + train_data: components.OutputPath("CSV"), + test_data: components.OutputPath("CSV"), + train_fraction: float = 0.8, + random_seed: int = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + python_original_code_path: split_csv.py + components new regenerate python-function-component: 'true' + inputs: + - name: input_data + type: CSV + description: Input CSV file. + - name: train_fraction + type: Float + description: Fraction of rows for training (0.0 to 1.0). + default: '0.8' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducible shuffling. + default: '42' + optional: true + outputs: + - name: train_data + type: CSV + description: Output CSV for the training split. + - name: test_data + type: CSV + description: Output CSV for the test split. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import random + + def split_csv( + input_data, + train_data, + test_data, + train_fraction = 0.8, + random_seed = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + + import argparse + _parser = argparse.ArgumentParser(prog='Split csv', description='Split a CSV dataset into train and test sets.\n\nRandomly shuffles rows, then splits by the given fraction.\nBoth output files keep the same header row.') + _parser.add_argument("--input-data", dest="input_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--train-fraction", dest="train_fraction", type=float, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--train-data", dest="train_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--test-data", dest="test_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = split_csv(**_parsed_args) + args: + - '--input-data' + - inputPath: input_data + - if: + cond: + isPresent: train_fraction + then: + - '--train-fraction' + - inputValue: train_fraction + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--train-data' + - outputPath: train_data + - '--test-data' + - outputPath: test_data + arguments: + input_data: + taskOutput: + outputName: output_data + taskId: Generate + train_fraction: + graphInput: + inputName: train_fraction + annotations: + editor.position: '{"x":574,"y":68.25}' + Generate: + componentRef: + name: Generate dataset + digest: 7f837011088acc8e081f5f2ae5c981cc3bb73ed28bf4b2aea3134bc5297e1674 + spec: + name: Generate dataset + description: |- + Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: generate_dataset.component.yaml + python_original_code: | + from cloud_pipelines import components + import random + import csv + import math + + + def generate_dataset( + output_data: components.OutputPath("CSV"), + num_rows: int = 500, + random_seed: int = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + python_original_code_path: generate_dataset.py + components new regenerate python-function-component: 'true' + inputs: + - name: num_rows + type: Integer + description: Number of rows to generate. + default: '500' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducibility. + default: '42' + optional: true + outputs: + - name: output_data + type: CSV + description: Output CSV file path. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import random + import csv + import math + + def generate_dataset( + output_data, + num_rows = 500, + random_seed = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + + import argparse + _parser = argparse.ArgumentParser(prog='Generate dataset', description='Generate a synthetic regression dataset with 4 features and a target.\n\nCreates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target.\nThe target is a noisy linear combination of the features, suitable for\ndemonstrating regression workflows.') + _parser.add_argument("--num-rows", dest="num_rows", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output-data", dest="output_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = generate_dataset(**_parsed_args) + args: + - if: + cond: + isPresent: num_rows + then: + - '--num-rows' + - inputValue: num_rows + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--output-data' + - outputPath: output_data + arguments: + num_rows: + graphInput: + inputName: num_rows + annotations: + editor.position: '{"x":194,"y":0}' + outputValues: + test_data: + taskOutput: + outputName: test_data + taskId: Split + train_data: + taskOutput: + outputName: train_data + taskId: Split + annotations: + editor.position: '{"x": 0, "y": 100}' diff --git a/src/components/Learn/tours.json b/src/components/Learn/tours.json index c3f1ae6db..875a05313 100644 --- a/src/components/Learn/tours.json +++ b/src/components/Learn/tours.json @@ -50,9 +50,9 @@ { "id": "subgraphs", "title": "Group tasks into reusable subgraphs", - "description": "Select related tasks, bundle them into a subgraph, and use the Pipeline Tree window to navigate nested levels.", + "description": "Step into a nested pipeline, navigate with breadcrumbs, then unpack and re-pack tasks to build your own subgraph.", "difficulty": "Intermediate", - "duration": "4 min", + "duration": "5 min", "area": "Editor" }, { diff --git a/src/components/Learn/tours/subgraphs.tour.json b/src/components/Learn/tours/subgraphs.tour.json new file mode 100644 index 000000000..6b56114c5 --- /dev/null +++ b/src/components/Learn/tours/subgraphs.tour.json @@ -0,0 +1,170 @@ +{ + "id": "subgraphs", + "displayName": "Guided Tour: Subgraphs", + "requiresEditor": true, + "starterPipelineUrl": "tour-pipelines/Subgraphs.pipeline.component.yaml", + "steps": [ + { + "selector": "[data-tour-anchor=\"no-spotlight\"]", + "content": "A **subgraph** is a task that contains a whole pipeline of its own.\n\nSubgraphs help you manage complex projects: bundle a section of work into a single node, hide its internals, and reuse that bundle wherever it makes sense.\n\nThis example pipeline already contains three subgraphs for you to explore.", + "position": "center" + }, + { + "selector": "[data-tour=\"editor-canvas\"]", + "highlightedSelectors": ["[data-tour=\"editor-canvas\"]"], + "ringSelectors": [ + "[data-tour-card=\"task\"][data-tour-card-name=\"Data Preparation\"]", + "[data-tour-card=\"task\"][data-tour-card-name=\"Training\"]", + "[data-tour-card=\"task\"][data-tour-card-name=\"Evaluation\"]" + ], + "resizeObservables": ["[data-tour=\"editor-canvas\"]"], + "content": "The three highlighted tasks (**Data Preparation**, **Training**, and **Evaluation**) are each a subgraph.\n\nThey render like ordinary tasks on the canvas, but each one wraps an internal pipeline of its own.", + "position": [16, 80] + }, + { + "selector": "[data-tour-card=\"task\"][data-tour-card-name=\"Data Preparation\"]", + "ringSelectors": [ + "[data-tour-card=\"task\"][data-tour-card-name=\"Data Preparation\"]" + ], + "resizeObservables": ["[data-tour=\"editor-canvas\"]"], + "content": "**Double-click** the **Data Preparation** task to look inside.", + "position": "top", + "stepInteraction": true, + "interaction": "navigate-into-subgraph", + "targetTaskName": "Data Preparation" + }, + { + "selector": "[data-tour=\"subgraph-breadcrumbs\"]", + "highlightedSelectors": ["[data-tour=\"subgraph-breadcrumbs\"]"], + "mutationObservables": ["[data-tour=\"subgraph-breadcrumbs\"]"], + "resizeObservables": ["[data-tour=\"subgraph-breadcrumbs\"]"], + "content": "You're now one level deep, inside the Data Preparation subgraph.\n\nThe **breadcrumbs** above the canvas show your current location in the pipeline tree. Clicking any crumb takes you back up to that level.", + "position": "bottom" + }, + { + "selector": "[data-dock-window-content=\"pipeline-tree\"]", + "highlightedSelectors": [ + "[data-dock-window=\"pipeline-tree\"]", + "[data-dock-window-content=\"pipeline-tree\"]" + ], + "mutationObservables": ["[data-dock-window-content=\"pipeline-tree\"]"], + "resizeObservables": ["[data-dock-window-content=\"pipeline-tree\"]"], + "content": "For broader navigation, the **Pipeline Structure** window in the left dock shows the full pipeline tree at a glance, with every subgraph expandable in place.\n\nClicking a node in the tree jumps you directly there, no double-clicking required.", + "position": "right", + "ensureWindowRestored": "pipeline-tree" + }, + { + "selector": "[data-tour=\"editor-canvas\"]", + "highlightedSelectors": ["[data-tour=\"editor-canvas\"]"], + "ringSelectors": ["[data-tour-card=\"input\"]"], + "resizeObservables": ["[data-tour=\"editor-canvas\"]"], + "content": "**Input nodes** play a slightly different role inside a subgraph. At the top level they become pipeline parameters set at run submission, but here they become the subgraph's **arguments** on the parent task, fed by whatever the parent connects to (or sets as a static value).", + "position": [16, 80] + }, + { + "selector": "[data-tour=\"editor-canvas\"]", + "highlightedSelectors": ["[data-tour=\"editor-canvas\"]"], + "ringSelectors": ["[data-tour-card=\"output\"]"], + "resizeObservables": ["[data-tour=\"editor-canvas\"]"], + "content": "**Output nodes** play the inverse role. At the top level they're the pipeline's results, captured at run completion. Inside a subgraph, they become the subgraph's **outputs** on the parent task, ready to be wired into downstream tasks at the level above.", + "position": [16, 80] + }, + { + "selector": "[data-tour=\"subgraph-breadcrumbs\"]", + "highlightedSelectors": ["[data-tour=\"subgraph-breadcrumbs\"]"], + "ringSelectors": ["[data-tour-crumb=\"root\"]"], + "mutationObservables": ["[data-tour=\"subgraph-breadcrumbs\"]"], + "resizeObservables": ["[data-tour=\"subgraph-breadcrumbs\"]"], + "content": "Click **Root** in the breadcrumbs to return to the top level.", + "position": "bottom", + "stepInteraction": true, + "interaction": "navigate-to-root" + }, + { + "selector": "[data-tour-anchor=\"no-spotlight\"]", + "content": "Subgraphs behave a little differently at runtime.\n\nThe subgraph itself doesn't execute any code, so it has no logs of its own. Instead, it shows an **aggregate status** that summarizes the state of its inner tasks. To inspect individual task logs in the run view, navigate into the subgraph the same way you did here.\n\nSubgraphs can also be **nested** inside other subgraphs, and the same navigation and aggregation rules apply at every level.", + "position": "center" + }, + { + "selector": "[data-tour-anchor=\"no-spotlight\"]", + "content": "Subgraphs work in the other direction too. **Unpacking** a subgraph flattens it back into its constituent tasks at the parent level.\n\nThis is useful when you want to edit a subgraph's internals inline, or when a bundle is no longer earning its keep.", + "position": "center" + }, + { + "selector": "[data-tour-card=\"task\"][data-tour-card-name=\"Data Preparation\"]", + "ringSelectors": [ + "[data-tour-card=\"task\"][data-tour-card-name=\"Data Preparation\"]" + ], + "resizeObservables": ["[data-tour=\"editor-canvas\"]"], + "content": "Click the **Data Preparation** task once to select it.", + "position": "top", + "stepInteraction": true, + "interaction": "select-task", + "targetTaskName": "Data Preparation" + }, + { + "selector": "[data-tour=\"node-menu-trigger\"]", + "highlightedSelectors": [ + "[data-tour=\"node-menu-trigger\"]", + "[data-tour=\"node-menu-content\"]" + ], + "ringSelectors": ["[data-tour=\"node-menu-unpack\"]"], + "mutationObservables": ["[data-tour=\"node-menu-content\"]"], + "content": "With the subgraph selected, open the **Node** menu in the top bar and choose **Unpack Subgraph**.", + "position": "right", + "stepInteraction": true, + "interaction": "unpack-subgraph" + }, + { + "selector": "[data-tour=\"editor-canvas\"]", + "highlightedSelectors": ["[data-tour=\"editor-canvas\"]"], + "ringSelectors": [ + "[data-tour-card=\"task\"][data-tour-card-name=\"Generate\"]", + "[data-tour-card=\"task\"][data-tour-card-name=\"Split\"]" + ], + "resizeObservables": ["[data-tour=\"editor-canvas\"]"], + "content": "Data Preparation is gone. Its inner tasks, **Generate** and **Split**, are now sitting at the top level, already wired into the rest of the pipeline.", + "position": [16, 80] + }, + { + "selector": "[data-tour=\"editor-canvas\"]", + "highlightedSelectors": ["[data-tour=\"editor-canvas\"]"], + "ringSelectors": [ + "[data-tour-card=\"task\"][data-tour-card-name=\"Generate\"]", + "[data-tour-card=\"task\"][data-tour-card-name=\"Split\"]" + ], + "resizeObservables": ["[data-tour=\"editor-canvas\"]"], + "content": "Now let's repackage them.\n\nHold **Cmd** (or **Ctrl** on Windows) and click the Generate task, then the Split task to select both. Dragging a selection box across them on empty canvas works just as well.", + "position": [16, 80], + "stepInteraction": true, + "interaction": "multi-select-tasks", + "targetMinCount": 2 + }, + { + "selector": "[data-testid=\"selection-create-subgraph\"]", + "highlightedSelectors": [ + "[data-testid=\"selection-toolbar\"]", + "[data-tour=\"create-subgraph-popover\"]" + ], + "ringSelectors": ["[data-testid=\"selection-create-subgraph\"]"], + "mutationObservables": [ + "[data-testid=\"selection-toolbar\"]", + "[data-tour=\"create-subgraph-popover\"]" + ], + "resizeObservables": [ + "[data-tour=\"editor-canvas\"]", + "[data-testid=\"selection-toolbar\"]", + "[data-tour=\"create-subgraph-popover\"]" + ], + "content": "A floating toolbar appears above your selection. Click **Create Subgraph**, give the new subgraph a name, and confirm.", + "position": "top", + "stepInteraction": true, + "interaction": "create-subgraph" + }, + { + "selector": "[data-tour-anchor=\"no-spotlight\"]", + "content": "You've built a new subgraph from scratch.\n\nReach for subgraphs whenever a section of your pipeline reads as a single logical step, or you find you are often repeating combinations of tasks. They keep the top level clean and turn complex workflows into reusable building blocks.", + "position": [500, 60] + } + ] +} diff --git a/src/components/shared/SubgraphBreadcrumbsView.tsx b/src/components/shared/SubgraphBreadcrumbsView.tsx index c31384594..375b1ce20 100644 --- a/src/components/shared/SubgraphBreadcrumbsView.tsx +++ b/src/components/shared/SubgraphBreadcrumbsView.tsx @@ -36,6 +36,7 @@ export const SubgraphBreadcrumbsView = ({ @@ -53,6 +54,8 @@ export const SubgraphBreadcrumbsView = ({ size="sm" onClick={() => onNavigate(index)} className="h-6 px-2" + data-tour-crumb={isRoot ? "root" : "ancestor"} + data-tour-crumb-index={index} {...(getCrumbTracking?.(index) ?? {})} > {isRoot ? ( diff --git a/src/routes/v2/pages/Editor/components/EditorMenuBar/components/NodeMenu.tsx b/src/routes/v2/pages/Editor/components/EditorMenuBar/components/NodeMenu.tsx index f434d9207..67b72625b 100644 --- a/src/routes/v2/pages/Editor/components/EditorMenuBar/components/NodeMenu.tsx +++ b/src/routes/v2/pages/Editor/components/EditorMenuBar/components/NodeMenu.tsx @@ -137,11 +137,18 @@ export const NodeMenu = observer(function NodeMenu() { return ( - + Node - + {isTask && ( <> @@ -181,7 +188,10 @@ export const NodeMenu = observer(function NodeMenu() { {isSubgraph && ( <> - + Unpack Subgraph diff --git a/src/routes/v2/pages/Editor/components/FlowCanvas/components/SelectionToolbar.tsx b/src/routes/v2/pages/Editor/components/FlowCanvas/components/SelectionToolbar.tsx index 7f501c1e0..3745027ba 100644 --- a/src/routes/v2/pages/Editor/components/FlowCanvas/components/SelectionToolbar.tsx +++ b/src/routes/v2/pages/Editor/components/FlowCanvas/components/SelectionToolbar.tsx @@ -74,7 +74,12 @@ export const SelectionToolbar = observer(function SelectionToolbar({ /> - +