Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions io/studio/app.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,14 @@ application:
annotations:
require-adobe-auth: false
final: true
bulk-publish:
function: src/bulk-publish/index.js
web: 'yes'
runtime: nodejs:22
limits:
timeout: 300000
inputs:
concurrencyLimit: "5"
annotations:
require-adobe-auth: false
final: true
111 changes: 111 additions & 0 deletions io/studio/src/bulk-publish/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
const { Core } = require('@adobe/aio-sdk');
const { Ims } = require('@adobe/aio-lib-ims');
const { errorResponse, checkMissingRequestInputs, getBearerToken } = require('../../utils.js');
const { processBatchWithConcurrency } = require('../common.js');
const { resolvePaths } = require('./resolver.js');
const { publishPath } = require('./publisher.js');
const { enqueue } = require('./queue.js');

const logger = Core.Logger('bulk-publish', { level: 'info' });
const DEFAULT_CONCURRENCY = 5;
const MAX_CONCURRENCY = 20;
const MAX_PATHS = 500;
const MAX_LOCALES = 50;
const MAX_RESOLVED = 5000;
const PATH_PREFIX = '/content/dam/mas/';
const STATUS = { PUBLISHED: 'published', SKIPPED: 'skipped', FAILED: 'failed' };

async function main(params) {
return enqueue(() => run(params));
}

async function run(params) {
try {
logger.info(JSON.stringify({ event: 'run-start' }));

const requiredHeaders = ['Authorization'];
const requiredParams = ['paths', 'odinEndpoint'];
const missing = checkMissingRequestInputs(params, requiredParams, requiredHeaders);
if (missing) {
return errorResponse(400, missing, logger);
}

if (!Array.isArray(params.paths) || params.paths.length === 0) {
return errorResponse(400, 'paths must be a non-empty array', logger);
}
if (params.paths.length > MAX_PATHS) {
return errorResponse(400, `paths exceeds maximum of ${MAX_PATHS}`, logger);
}
const invalidPath = params.paths.find((p) => typeof p !== 'string' || !p.startsWith(PATH_PREFIX));
if (invalidPath !== undefined) {
return errorResponse(400, `path must be a non-empty string starting with ${PATH_PREFIX}: ${invalidPath}`, logger);
}
if (params.locales !== undefined && !Array.isArray(params.locales)) {
return errorResponse(400, 'locales must be an array when provided', logger);
}
if (Array.isArray(params.locales) && params.locales.length > MAX_LOCALES) {
return errorResponse(400, `locales exceeds maximum of ${MAX_LOCALES}`, logger);
}

const authToken = getBearerToken(params);
const allowed = await isAllowed(authToken, params.allowedClientId);
if (!allowed) {
return errorResponse(401, 'Authorization failed', logger);
}

const resolved = resolvePaths(params.paths, params.locales);
if (resolved.length === 0) {
return errorResponse(400, 'No valid paths after resolution', logger);
}
if (resolved.length > MAX_RESOLVED) {
return errorResponse(400, `Resolved ${resolved.length} paths exceeds maximum of ${MAX_RESOLVED}`, logger);
}

const concurrency = Math.min(Number(params.concurrencyLimit) || DEFAULT_CONCURRENCY, MAX_CONCURRENCY);
logger.info(JSON.stringify({ event: 'resolved', total: resolved.length, concurrency }));

const details = await processBatchWithConcurrency(resolved, concurrency, (path) =>
publishPath({
path,
odinEndpoint: params.odinEndpoint,
authToken,
logger,
}),
);

const summary = buildSummary(details);
logger.info(JSON.stringify({ event: 'run-complete', summary }));

return {
statusCode: 200,
body: { summary, details },
};
} catch (error) {
logger.error(JSON.stringify({ event: 'run-error', error: error.message || String(error) }));
return errorResponse(500, 'Internal server error', logger);
}
}

function buildSummary(details) {
const summary = { total: details.length, published: 0, skipped: 0, failed: 0 };
for (const detail of details) {
if (detail.status === STATUS.PUBLISHED) summary.published += 1;
else if (detail.status === STATUS.SKIPPED) summary.skipped += 1;
else if (detail.status === STATUS.FAILED) summary.failed += 1;
}
return summary;
}

async function isAllowed(token, allowedClientId) {
if (!token || !allowedClientId) return false;
logger.info(JSON.stringify({ event: 'ims-validate', allowedClientId }));
const ims = new Ims('prod');
const imsValidation = await ims.validateTokenAllowList(token, [allowedClientId]);
if (!imsValidation || !imsValidation.valid) {
logger.error(JSON.stringify({ event: 'ims-validate-failed', allowedClientId }));
return false;
}
return true;
}

exports.main = main;
74 changes: 74 additions & 0 deletions io/studio/src/bulk-publish/publisher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
const { fetchOdin, fetchFragmentByPath } = require('../common.js');
const { isAlreadyPublished } = require('./skip-check.js');

const PUBLISH_URI = '/adobe/sites/cf/fragments/publish';
const DEFAULT_MAX_RETRIES = 3;
const WORKFLOW_MODEL_ID = '/var/workflow/models/scheduled_activation_with_references';

async function publishPath({ path, odinEndpoint, authToken, logger, maxRetries = DEFAULT_MAX_RETRIES }) {
try {
return await doPublishPath({ path, odinEndpoint, authToken, logger, maxRetries });
} catch (error) {
logger.error(JSON.stringify({ event: 'publish-unexpected-error', path, error: error.message || String(error) }));
return { path, status: 'failed', reason: 'unexpected-error', retries: 0 };
}
}

async function doPublishPath({ path, odinEndpoint, authToken, logger, maxRetries }) {
logger.info(JSON.stringify({ event: 'fetch-metadata', path }));
const { fragment, status, etag } = await fetchFragmentByPath(odinEndpoint, path, authToken);

if (status === 404 || !fragment) {
logger.error(JSON.stringify({ event: 'fragment-not-found', path, status }));
return { path, status: 'failed', reason: 'not-found', httpStatus: status, retries: 0 };
}

const decision = isAlreadyPublished(fragment);
if (decision.skip) {
logger.info(
JSON.stringify({
event: 'skip',
path,
reason: decision.reason,
publishedAt: decision.publishedAt,
modifiedAt: decision.modifiedAt,
}),
);
return { path, status: 'skipped', reason: decision.reason, retries: 0 };
}

logger.info(JSON.stringify({ event: 'publish-start', path, fragmentId: fragment.id }));
let lastError = null;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await fetchOdin(odinEndpoint, PUBLISH_URI, authToken, {
method: 'POST',
contentType: 'application/json',
etag,
body: JSON.stringify({
paths: [fragment.path],
filterReferencesByStatus: ['DRAFT', 'UNPUBLISHED'],
workflowModelId: WORKFLOW_MODEL_ID,
}),
});
logger.info(JSON.stringify({ event: 'publish-success', path, retries: attempt - 1 }));
return { path, status: 'published', retries: attempt - 1 };
} catch (error) {
lastError = error.message || String(error);
const statusMatch = lastError.match(/status (\d{3})/);
const httpStatus = statusMatch ? Number(statusMatch[1]) : 0;
const retryable = httpStatus === 0 || httpStatus === 429 || httpStatus >= 500;
logger.warn(JSON.stringify({ event: 'retry', path, attempt, error: lastError, retryable }));
if (!retryable) break;
if (attempt < maxRetries) {
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 5000);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
}

logger.error(JSON.stringify({ event: 'publish-failed', path, error: lastError, retries: maxRetries - 1 }));
return { path, status: 'failed', reason: lastError, retries: maxRetries - 1 };
}

module.exports = { publishPath };
13 changes: 13 additions & 0 deletions io/studio/src/bulk-publish/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
let activeRun = Promise.resolve();

function enqueue(task) {
const next = activeRun.catch(() => {}).then(() => task());
activeRun = next.catch(() => {});
return next;
}

function resetQueue() {
activeRun = Promise.resolve();
}

module.exports = { enqueue, resetQueue };
21 changes: 21 additions & 0 deletions io/studio/src/bulk-publish/resolver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const { getTargetPath } = require('../common.js');

function resolvePaths(paths, locales) {
if (!Array.isArray(paths) || paths.length === 0) {
return [];
}
const hasLocales = Array.isArray(locales) && locales.length > 0;
const resolved = new Set();
for (const path of paths) {
if (typeof path !== 'string' || !path) continue;
resolved.add(path);
if (!hasLocales) continue;
for (const locale of locales) {
const localePath = getTargetPath(path, locale);
if (localePath) resolved.add(localePath);
}
}
return Array.from(resolved);
}

module.exports = { resolvePaths };
19 changes: 19 additions & 0 deletions io/studio/src/bulk-publish/skip-check.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
function isAlreadyPublished(fragment) {
if (!fragment) {
return { skip: false, reason: 'no-fragment' };
}
const publishedAt = fragment.published?.at;
const modifiedAt = fragment.modified?.at;
if (!publishedAt) {
return { skip: false, reason: 'never-published' };
}
if (!modifiedAt) {
return { skip: true, reason: 'already-published', publishedAt, modifiedAt: null };
}
if (publishedAt >= modifiedAt) {
return { skip: true, reason: 'already-published', publishedAt, modifiedAt };
}
return { skip: false, reason: 'modified-after-publish', publishedAt, modifiedAt };
}

module.exports = { isAlreadyPublished };
Loading
Loading