Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions src/routes/v2/pages/Editor/lineage/scanPipelinesForLineage.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { beforeEach, describe, expect, it, vi } from "vitest";

import { LINEAGE_ORIGIN_ANNOTATION } from "@/utils/annotations";

import { scanPipelinesForLineage } from "./scanPipelinesForLineage";

vi.mock("@/utils/componentStore", () => ({
getAllComponentFilesFromList: vi.fn(),
}));

const { getAllComponentFilesFromList } = await import("@/utils/componentStore");
const mockGetAll = vi.mocked(getAllComponentFilesFromList);

const ORIGIN = "https://x/train.yaml";
const TARGET = "edited-digest";

const lineageAnn = (originId: string) => ({
[LINEAGE_ORIGIN_ANNOTATION]: JSON.stringify({ originId }),
});

const containerTask = (name: string, digest: string, originId?: string) => ({
componentRef: { name, digest },
annotations: originId ? lineageAnn(originId) : {},
});

const subgraphTask = (
name: string,
nestedTasks: Record<string, unknown>,
originId?: string,
) => ({
componentRef: {
name,
spec: { name, implementation: { graph: { tasks: nestedTasks } } },
},
annotations: originId ? lineageAnn(originId) : {},
});

const pipeline = (name: string, tasks: Record<string, unknown>) => ({
componentRef: { spec: { name, implementation: { graph: { tasks } } } },
});

const asStore = (entries: Record<string, unknown>) =>
new Map<string, any>(Object.entries(entries));

describe("scanPipelinesForLineage", () => {
beforeEach(() => mockGetAll.mockReset());

it("returns pipelines with matching tasks and pending/reconciled counts", async () => {
mockGetAll.mockResolvedValue(
asStore({
"Pipeline A": pipeline("Pipeline A", {
"Train old": containerTask("Train old", "old-digest", ORIGIN),
"Train new": containerTask("Train new", TARGET, ORIGIN),
Unrelated: containerTask("Unrelated", "z", "https://x/other.yaml"),
}),
"Pipeline B": pipeline("Pipeline B", {
Nothing: containerTask("Nothing", "n"),
}),
}),
);

const results = await scanPipelinesForLineage(ORIGIN, TARGET);

expect(results).toHaveLength(1);
expect(results[0]).toMatchObject({
storageKey: "Pipeline A",
pipelineName: "Pipeline A",
pendingCount: 1,
reconciledCount: 1,
});
expect(results[0].tasks.map((t) => t.taskName).sort()).toEqual([
"Train new",
"Train old",
]);
});

it("recurses into subgraphs and records the path", async () => {
mockGetAll.mockResolvedValue(
asStore({
"Pipeline C": pipeline("Pipeline C", {
Group: subgraphTask("Group", {
"Nested train": containerTask("Nested train", "old", ORIGIN),
}),
}),
}),
);

const results = await scanPipelinesForLineage(ORIGIN, TARGET);

expect(results).toHaveLength(1);
expect(results[0].tasks[0]).toMatchObject({
taskName: "Nested train",
subgraphPath: ["Group"],
reconciled: false,
});
});

it("returns nothing when no pipeline shares the origin", async () => {
mockGetAll.mockResolvedValue(
asStore({
"Pipeline D": pipeline("Pipeline D", {
A: containerTask("A", "d", "https://x/other.yaml"),
}),
}),
);

expect(await scanPipelinesForLineage(ORIGIN, TARGET)).toEqual([]);
});
});
93 changes: 93 additions & 0 deletions src/routes/v2/pages/Editor/lineage/scanPipelinesForLineage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { LINEAGE_ORIGIN_ANNOTATION } from "@/utils/annotations";
import {
type ComponentSpec,
isGraphImplementation,
} from "@/utils/componentSpec";
import {
type ComponentFileEntry,
getAllComponentFilesFromList,
} from "@/utils/componentStore";
import { USER_PIPELINES_LIST_NAME } from "@/utils/constants";
import { parseLineage } from "@/utils/lineage";

interface PipelineLineageTaskMatch {
taskName: string;
subgraphPath: string[];
digest?: string;
/** True when this task is already on the target (edited) version. */
reconciled: boolean;
}

export interface PipelineLineageMatch {
/** Storage key (also the pipeline route param). */
storageKey: string;
pipelineName: string;
tasks: PipelineLineageTaskMatch[];
/** Tasks sharing the origin but not yet on the target version. */
pendingCount: number;
/** Tasks already on the target version. */
reconciledCount: number;
}

function walkSpec(
spec: ComponentSpec | undefined,
originId: string,
targetDigest: string | undefined,
path: string[],
out: PipelineLineageTaskMatch[],
): void {
const impl = spec?.implementation;
if (!impl || !isGraphImplementation(impl)) return;

for (const [taskName, task] of Object.entries(impl.graph.tasks)) {
const lineage = parseLineage(task.annotations?.[LINEAGE_ORIGIN_ANNOTATION]);
if (lineage?.originId === originId) {
const digest = task.componentRef.digest;
out.push({
taskName,
subgraphPath: path,
digest,
reconciled: targetDigest != null && digest === targetDigest,
});
}

const nestedSpec = task.componentRef.spec;
if (nestedSpec && isGraphImplementation(nestedSpec.implementation)) {
walkSpec(nestedSpec, originId, targetDigest, [...path, taskName], out);
}
}
}

/**
* Scan every locally-stored pipeline for tasks sharing `originId` (recursing
* through subgraphs). Pipelines live client-side, so this is the cross-pipeline
* discovery mechanism — no backend involved. `targetDigest` is the edited
* version: tasks already at it are counted as reconciled, the rest as pending.
*
* Returns only pipelines with at least one matching task.
*/
export async function scanPipelinesForLineage(
originId: string,
targetDigest?: string,
): Promise<PipelineLineageMatch[]> {
const files = await getAllComponentFilesFromList(USER_PIPELINES_LIST_NAME);

const results: PipelineLineageMatch[] = [];
for (const [storageKey, entry] of files) {
const spec = (entry as ComponentFileEntry).componentRef.spec;
const tasks: PipelineLineageTaskMatch[] = [];
walkSpec(spec, originId, targetDigest, [], tasks);

if (tasks.length === 0) continue;

results.push({
storageKey,
pipelineName: spec?.name ?? storageKey,
tasks,
pendingCount: tasks.filter((t) => !t.reconciled).length,
reconciledCount: tasks.filter((t) => t.reconciled).length,
});
}

return results;
}
18 changes: 13 additions & 5 deletions src/utils/lineage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,26 @@ export function makeLineage(ref: ReferenceLike): ComponentLineage | undefined {
};
}

/**
* Parse a lineage value as stored in an annotation — either a JSON string (the
* serialized form that round-trips through YAML) or an already-parsed object.
* Returns `undefined` when absent or invalid.
*/
export function parseLineage(raw: unknown): ComponentLineage | undefined {
if (raw == null) return undefined;
const value = typeof raw === "string" ? safeJsonParse(raw) : raw;
const result = componentLineageSchema.safeParse(value);
return result.success ? result.data : undefined;
}

/**
* Read a lineage previously embedded in a (published) component spec's metadata
* annotations, if present and valid.
*/
export function embeddedLineageOf(
spec: ComponentSpec | undefined,
): ComponentLineage | undefined {
const raw = spec?.metadata?.annotations?.[EMBEDDED_LINEAGE_KEY];
if (raw == null) return undefined;
const value = typeof raw === "string" ? safeJsonParse(raw) : raw;
const result = componentLineageSchema.safeParse(value);
return result.success ? result.data : undefined;
return parseLineage(spec?.metadata?.annotations?.[EMBEDDED_LINEAGE_KEY]);
}

/**
Expand Down
Loading