Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions io/studio/app.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,14 @@ application:
annotations:
require-adobe-auth: false
final: true
bulk-publish:
function: src/bulk-publish/index.js
web: 'yes'
runtime: nodejs:22
limits:
timeout: 300000
inputs:
concurrencyLimit: "5"
annotations:
require-adobe-auth: false
final: true
153 changes: 153 additions & 0 deletions io/studio/src/bulk-publish/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
const { Core } = require('@adobe/aio-sdk');
const { Ims } = require('@adobe/aio-lib-ims');
const { errorResponse, checkMissingRequestInputs, getBearerToken } = require('../../utils.js');
const { processBatchWithConcurrency } = require('../common.js');
const { resolvePaths } = require('./resolver.js');
const { publishChunk } = require('./publisher.js');
const { enqueue } = require('./queue.js');

const logger = Core.Logger('bulk-publish', { level: 'info' });
const DEFAULT_CONCURRENCY = 5;
const MAX_CONCURRENCY = 20;
const MAX_PATHS = 500;
const MAX_LOCALES = 50;
const MAX_RESOLVED = 5000;
const MAX_CHUNK_SIZE = 50;
const PATH_PREFIX = '/content/dam/mas/';
const LOCALE_REGEX = /^\/content\/dam\/mas\/[\w-_]+\/(?<locale>[\w-_]+)\//;
const STATUS = { PUBLISHED: 'published', SKIPPED: 'skipped', FAILED: 'failed' };

async function main(params) {
return enqueue(() => run(params));
}

async function run(params) {
try {
logger.info(JSON.stringify({ event: 'run-start' }));

const odinEndpoint = params.aemOdinEndpoint || params.odinEndpoint;
if (!odinEndpoint) {
return errorResponse(400, 'missing parameter(s) [aemOdinEndpoint|odinEndpoint]', logger);
}

const requiredHeaders = ['Authorization'];
const requiredParams = ['paths'];
const missing = checkMissingRequestInputs(params, requiredParams, requiredHeaders);
if (missing) {
return errorResponse(400, missing, logger);
}

if (!Array.isArray(params.paths) || params.paths.length === 0) {
return errorResponse(400, 'paths must be a non-empty array', logger);
}
if (params.paths.length > MAX_PATHS) {
return errorResponse(400, `paths exceeds maximum of ${MAX_PATHS}`, logger);
}
const invalidPath = params.paths.find((p) => typeof p !== 'string' || !p.startsWith(PATH_PREFIX));
if (invalidPath !== undefined) {
return errorResponse(400, `path must be a non-empty string starting with ${PATH_PREFIX}: ${invalidPath}`, logger);
}
if (params.locales !== undefined && !Array.isArray(params.locales)) {
return errorResponse(400, 'locales must be an array when provided', logger);
}
if (Array.isArray(params.locales) && params.locales.length > MAX_LOCALES) {
return errorResponse(400, `locales exceeds maximum of ${MAX_LOCALES}`, logger);
}

const authToken = getBearerToken(params);
const allowed = await isAllowed(authToken, params.allowedClientId);
if (!allowed) {
return errorResponse(401, 'Authorization failed', logger);
}

const resolved = resolvePaths(params.paths, params.locales);
if (resolved.length === 0) {
return errorResponse(400, 'No valid paths after resolution', logger);
}
if (resolved.length > MAX_RESOLVED) {
return errorResponse(400, `Resolved ${resolved.length} paths exceeds maximum of ${MAX_RESOLVED}`, logger);
}

const chunks = groupAndChunk(resolved, MAX_CHUNK_SIZE);
const concurrency = Math.min(Number(params.concurrencyLimit) || DEFAULT_CONCURRENCY, MAX_CONCURRENCY);
logger.info(JSON.stringify({ event: 'resolved', total: resolved.length, chunks: chunks.length, concurrency }));

const chunkResults = await processBatchWithConcurrency(chunks, concurrency, (chunk) =>
publishOneChunk(chunk, odinEndpoint, authToken),
);
const details = chunkResults.flat();

const summary = buildSummary(details);
logger.info(JSON.stringify({ event: 'run-complete', summary }));

return {
statusCode: 200,
body: { summary, details },
};
} catch (error) {
logger.error(JSON.stringify({ event: 'run-error', error: error.message || String(error) }));
return errorResponse(500, 'Internal server error', logger);
}
}

async function publishOneChunk({ locale, paths }, odinEndpoint, authToken) {
logger.info(JSON.stringify({ event: 'chunk-start', locale, size: paths.length }));
const results = await publishChunk({ chunk: paths, odinEndpoint, authToken, logger });
const counts = results.reduce(
(acc, r) => {
if (r.status === STATUS.PUBLISHED) acc.published += 1;
else if (r.status === STATUS.FAILED) acc.failed += 1;
return acc;
},
{ published: 0, failed: 0 },
);
logger.info(JSON.stringify({ event: 'chunk-result', locale, size: paths.length, ...counts }));
return results;
}

function extractLocale(path) {
if (typeof path !== 'string') return 'unknown';
const match = path.match(LOCALE_REGEX);
return match?.groups?.locale || 'unknown';
}

function groupAndChunk(paths, maxChunkSize) {
const groups = new Map();
for (const path of paths) {
const locale = extractLocale(path);
const list = groups.get(locale);
if (list) list.push(path);
else groups.set(locale, [path]);
}
const chunks = [];
for (const [locale, list] of groups) {
for (let i = 0; i < list.length; i += maxChunkSize) {
chunks.push({ locale, paths: list.slice(i, i + maxChunkSize) });
}
}
return chunks;
}

function buildSummary(details) {
const summary = { total: details.length, published: 0, skipped: 0, failed: 0 };
for (const detail of details) {
if (detail.status === STATUS.PUBLISHED) summary.published += 1;
else if (detail.status === STATUS.SKIPPED) summary.skipped += 1;
else if (detail.status === STATUS.FAILED) summary.failed += 1;
}
return summary;
}

async function isAllowed(token, allowedClientId) {
if (!token || !allowedClientId) return false;
logger.info(JSON.stringify({ event: 'ims-validate', allowedClientId }));
const ims = new Ims('prod');
const imsValidation = await ims.validateTokenAllowList(token, [allowedClientId]);
if (!imsValidation || !imsValidation.valid) {
logger.error(JSON.stringify({ event: 'ims-validate-failed', allowedClientId }));
return false;
}
return true;
}

exports.main = main;
90 changes: 90 additions & 0 deletions io/studio/src/bulk-publish/publisher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
const { fetchOdin } = require('../common.js');

const PUBLISH_URI = '/adobe/sites/cf/fragments/publish';
const WORKFLOW_MODEL_ID = '/var/workflow/models/scheduled_activation_with_references';
const DEFAULT_MAX_RETRIES = 3;

const WORKFLOW_STATUS_MAP = {
SUCCESS_TRIGGERED: { status: 'published' },
ERROR_NOT_FOUND: { status: 'failed', reason: 'not-found' },
ERROR_REFERENCED: { status: 'failed', reason: 'error-referenced' },
ERROR_FORBIDDEN: { status: 'failed', reason: 'error-forbidden' },
ERROR_INVALID: { status: 'failed', reason: 'error-invalid' },
};

async function publishChunk({ chunk, odinEndpoint, authToken, logger, maxRetries = DEFAULT_MAX_RETRIES }) {
try {
return await doPublishChunk({ chunk, odinEndpoint, authToken, logger, maxRetries });
} catch (error) {
const message = error.message || String(error);
logger.error(JSON.stringify({ event: 'publish-unexpected-error', chunkSize: chunk.length, error: message }));
return chunk.map((path) => ({ path, status: 'failed', reason: 'unexpected-error', retries: 0 }));
}
}

async function doPublishChunk({ chunk, odinEndpoint, authToken, logger, maxRetries }) {
logger.info(JSON.stringify({ event: 'publish-start', chunkSize: chunk.length }));

let lastError = null;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await fetchOdin(odinEndpoint, PUBLISH_URI, authToken, {
method: 'POST',
contentType: 'application/json',
body: JSON.stringify({
paths: chunk,
filterReferencesByStatus: ['DRAFT', 'UNPUBLISHED'],
workflowModelId: WORKFLOW_MODEL_ID,
}),
});
const data = await parseResponse(response);
const retries = attempt - 1;
logger.info(JSON.stringify({ event: 'publish-success', chunkSize: chunk.length, retries }));
return mapItemsToResults(chunk, data, retries);
} catch (error) {
lastError = error.message || String(error);
const statusMatch = lastError.match(/status (\d{3})/);
const httpStatus = statusMatch ? Number(statusMatch[1]) : 0;
const retryable = httpStatus === 0 || httpStatus === 429 || httpStatus >= 500;
logger.warn(JSON.stringify({ event: 'retry', attempt, chunkSize: chunk.length, error: lastError, retryable }));
if (!retryable) break;
if (attempt < maxRetries) {
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 5000);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
}

const retries = maxRetries - 1;
logger.error(JSON.stringify({ event: 'publish-failed', chunkSize: chunk.length, error: lastError, retries }));
return chunk.map((path) => ({ path, status: 'failed', reason: lastError, retries }));
}

async function parseResponse(response) {
if (!response || typeof response.json !== 'function') return {};
try {
return await response.json();
} catch (error) {
return {};
}
}

function mapItemsToResults(chunk, data, retries) {
const workflowInstanceId = data?.workflowInstanceId;
const itemsByPath = new Map();
if (Array.isArray(data?.items)) {
for (const item of data.items) {
if (item?.path) itemsByPath.set(item.path, item);
}
}
return chunk.map((path) => {
const item = itemsByPath.get(path);
if (!item) {
return { path, status: 'failed', reason: 'no-response-item', retries, workflowInstanceId };
}
const mapped = WORKFLOW_STATUS_MAP[item.status] || { status: 'failed', reason: 'unknown-status' };
return { path, ...mapped, retries, workflowInstanceId };
});
}

module.exports = { publishChunk };
13 changes: 13 additions & 0 deletions io/studio/src/bulk-publish/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
let activeRun = Promise.resolve();

function enqueue(task) {
const next = activeRun.catch(() => {}).then(() => task());
activeRun = next.catch(() => {});
return next;
}

function resetQueue() {
activeRun = Promise.resolve();
}

module.exports = { enqueue, resetQueue };
21 changes: 21 additions & 0 deletions io/studio/src/bulk-publish/resolver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const { getTargetPath } = require('../common.js');

function resolvePaths(paths, locales) {
if (!Array.isArray(paths) || paths.length === 0) {
return [];
}
const hasLocales = Array.isArray(locales) && locales.length > 0;
const resolved = new Set();
for (const path of paths) {
if (typeof path !== 'string' || !path) continue;
resolved.add(path);
if (!hasLocales) continue;
for (const locale of locales) {
const localePath = getTargetPath(path, locale);
if (localePath) resolved.add(localePath);
}
}
return Array.from(resolved);
}

module.exports = { resolvePaths };
Loading
Loading