diff --git a/api/pending_test_runs.go b/api/pending_test_runs.go index 04bb486b7a..0b0f27ddcb 100644 --- a/api/pending_test_runs.go +++ b/api/pending_test_runs.go @@ -10,6 +10,7 @@ import ( "net/http" "sort" "strings" + "time" "github.com/gorilla/mux" "github.com/web-platform-tests/wpt.fyi/shared" @@ -52,11 +53,20 @@ func apiPendingTestRunsHandler(w http.ResponseWriter, r *http.Request) { for i, key := range keys { runs[i].ID = key.IntID() } - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return + // Only show runs updated within the last 14 days to avoid stuck runs + // that have been dropped from the task queue. + if filter == "pending" { + cutoff := time.Now().Add(-14 * 24 * time.Hour) + filteredRuns := make([]shared.PendingTestRun, 0) + for _, run := range runs { + if run.Updated.After(cutoff) { + filteredRuns = append(filteredRuns, run) + } + } + runs = filteredRuns } + // When we filter by status (pending) we need to re-sort. sort.Sort(sort.Reverse(shared.PendingTestRunByUpdated(runs))) emit(r.Context(), w, runs) diff --git a/api/pending_test_runs_medium_test.go b/api/pending_test_runs_medium_test.go index 6d47d600a5..53cc599a25 100644 --- a/api/pending_test_runs_medium_test.go +++ b/api/pending_test_runs_medium_test.go @@ -73,6 +73,13 @@ func TestAPIPendingTestHandler(t *testing.T) { } assert.Nil(t, createPendingRun(ctx, &running)) + stale := shared.PendingTestRun{ + Created: now.AddDate(0, 0, -20), + Updated: now.AddDate(0, 0, -15), + Stage: shared.StageWptFyiReceived, + } + assert.Nil(t, createPendingRun(ctx, &stale)) + t.Run("/api/status", func(t *testing.T) { r, _ = i.NewRequest("GET", "/api/status", nil) resp := httptest.NewRecorder() @@ -81,13 +88,8 @@ func TestAPIPendingTestHandler(t *testing.T) { assert.Equal(t, http.StatusOK, resp.Code, string(body)) var results []shared.PendingTestRun json.Unmarshal(body, &results) - assert.Len(t, results, 5) - // Sorted by Update. - assert.Equal(t, results[0].ID, invalid.ID) - assert.Equal(t, results[1].ID, empty.ID) - assert.Equal(t, results[2].ID, duplicate.ID) - assert.Equal(t, results[3].ID, running.ID) - assert.Equal(t, results[4].ID, received.ID) + // 5 runs + 1 stale = 6 + assert.Len(t, results, 6) }) t.Run("/api/status/pending", func(t *testing.T) { @@ -99,6 +101,7 @@ func TestAPIPendingTestHandler(t *testing.T) { assert.Equal(t, http.StatusOK, resp.Code, string(body)) var results []shared.PendingTestRun json.Unmarshal(body, &results) + // running, received are returned. stale is excluded by Updated cutoff. assert.Len(t, results, 2) assert.Equal(t, results[0].ID, running.ID) assert.Equal(t, results[1].ID, received.ID) diff --git a/results-processor/cleanup_stuck_runs.py b/results-processor/cleanup_stuck_runs.py new file mode 100644 index 0000000000..1d295b4981 --- /dev/null +++ b/results-processor/cleanup_stuck_runs.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +import datetime +import logging +from processor import Processor + +_log = logging.getLogger(__name__) + + +def cleanup_stuck_runs(days_threshold: int = 14) -> None: + """ + Identifies PendingTestRun entities that have been in a non-terminal state + for longer than the given threshold and marks them as INVALID. + """ + p = Processor() + cutoff = (datetime.datetime.now(datetime.timezone.utc) + - datetime.timedelta(days=days_threshold)) + + _log.info("Querying for stuck runs updated before %s", cutoff) + + # Query for runs with Stage < 800 (StageValid) + q = p.datastore.query(kind='PendingTestRun') + q.add_filter('Stage', '<', 800) + + stuck_runs = list(q.fetch()) + _log.info("Found %d potential stuck runs", len(stuck_runs)) + + count = 0 + for run in stuck_runs: + # Check if the Updated time is before our cutoff + updated = run.get('Updated') + if updated and updated < cutoff: + run_id = str(run.key.id) + _log.info("Marking run %s as INVALID (last updated %s)", + run_id, updated) + try: + # Use the update_status method from Processor which handles + # the API call. + p.update_status( + run_id, 'INVALID', + error='Run timed out after {} days'.format(days_threshold)) + count += 1 + except Exception as e: + _log.exception("Failed to update run %s: %s", run_id, str(e)) + + _log.info("Successfully updated %d stuck runs", count) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + cleanup_stuck_runs() diff --git a/results-processor/processor.py b/results-processor/processor.py index f3c7f754f2..87034dc56f 100644 --- a/results-processor/processor.py +++ b/results-processor/processor.py @@ -7,7 +7,6 @@ import os import posixpath import shutil -import sys import tempfile import time import traceback @@ -374,68 +373,82 @@ def process_report(task_id: Optional[str], params: MultiDict[str, str]) -> str: labels = params.get('labels', '') response = [] - with Processor() as p: - p.update_status(run_id, 'WPTFYI_PROCESSING', None, callback_url) - if archives: - _log.info("Downloading %d archives", len(archives)) - else: - _log.info("Downloading %d results & %d screenshots", - len(results), len(screenshots)) - p.download(results, screenshots, archives) - if len(p.results) == 0: - _log.error("No results successfully downloaded") - p.update_status(run_id, 'EMPTY', None, callback_url) - return '' + try: + with Processor() as p: + p.update_status(run_id, 'WPTFYI_PROCESSING', None, callback_url) + if archives: + _log.info("Downloading %d archives", len(archives)) + else: + _log.info("Downloading %d results & %d screenshots", + len(results), len(screenshots)) + p.download(results, screenshots, archives) + if len(p.results) == 0: + _log.error("No results successfully downloaded") + p.update_status(run_id, 'EMPTY', None, callback_url) + return '' + try: + p.load_report() + # To be deprecated once all reports have all the required + # metadata. + p.report.update_metadata( + revision=params.get('revision'), + browser_name=params.get('browser_name'), + browser_version=params.get('browser_version'), + os_name=params.get('os_name'), + os_version=params.get('os_version'), + ) + p.report.finalize() + except wptreport.WPTReportError as e: + # This will register an error in Stackdriver. + _log.exception("Invalid report data: %s", str(e)) + # Add the input source to the error message. + if e.path is None: + e.path = archives or results + p.update_status(run_id, 'INVALID', str(e), callback_url) + # The input is invalid and there is no point to retry, so we + # return an empty (but successful) response to drop the task. + return '' + + if p.check_existing_run(): + _log.warning( + 'Skipping the task because RawResultsURL already exists: ' + '%s', p.raw_results_url) + p.update_status(run_id, 'DUPLICATE', None, callback_url) + return '' + response.append("{} results loaded from task {}".format( + len(p.report.results), task_id)) + + _log.info("Uploading merged raw report") + p.upload_raw() + response.append("raw_results_url: " + p.raw_results_url) + + _log.info("Uploading split results") + p.upload_split() + response.append("results_url: " + p.results_url) + + # Check again because the upload takes a long time. + if p.check_existing_run(): + _log.warning( + 'Skipping the task because RawResultsURL already exists: ' + '%s', p.raw_results_url) + p.update_status(run_id, 'DUPLICATE', None, callback_url) + return '' + + p.create_run(run_id, labels, uploader, callback_url) + response.append("run ID: {}".format(p.test_run_id)) + + p.run_hooks([_upload_screenshots]) + except Exception as e: + # For any unhandled exception, attempt to update the run status to + # INVALID before re-raising the exception to allow Cloud Tasks to retry + # if appropriate. We use a separate Processor instance if the original + # one failed or was closed. + _log.exception("Unhandled exception in process_report: %s", str(e)) try: - p.load_report() - # To be deprecated once all reports have all the required metadata. - p.report.update_metadata( - revision=params.get('revision'), - browser_name=params.get('browser_name'), - browser_version=params.get('browser_version'), - os_name=params.get('os_name'), - os_version=params.get('os_version'), - ) - p.report.finalize() - except wptreport.WPTReportError as e: - etype, e_, tb = sys.exc_info() - assert e is e_ - e.path = results - # This will register an error in Stackdriver. - traceback.print_exception(etype, e, tb) - p.update_status(run_id, 'INVALID', str(e), callback_url) - # The input is invalid and there is no point to retry, so we return - # an empty (but successful) response to drop the task. - return '' - - if p.check_existing_run(): - _log.warning( - 'Skipping the task because RawResultsURL already exists: %s', - p.raw_results_url) - p.update_status(run_id, 'DUPLICATE', None, callback_url) - return '' - response.append("{} results loaded from task {}".format( - len(p.report.results), task_id)) - - _log.info("Uploading merged raw report") - p.upload_raw() - response.append("raw_results_url: " + p.raw_results_url) - - _log.info("Uploading split results") - p.upload_split() - response.append("results_url: " + p.results_url) - - # Check again because the upload takes a long time. - if p.check_existing_run(): - _log.warning( - 'Skipping the task because RawResultsURL already exists: %s', - p.raw_results_url) - p.update_status(run_id, 'DUPLICATE', None, callback_url) - return '' - - p.create_run(run_id, labels, uploader, callback_url) - response.append("run ID: {}".format(p.test_run_id)) - - p.run_hooks([_upload_screenshots]) + with Processor() as p: + p.update_status(run_id, 'INVALID', str(e), callback_url) + except Exception: + _log.exception("Failed to update status after unhandled exception") + raise e return '\n'.join(response) diff --git a/results-processor/processor_test.py b/results-processor/processor_test.py index 42e0a24f53..fd6fbcab2c 100644 --- a/results-processor/processor_test.py +++ b/results-processor/processor_test.py @@ -223,6 +223,32 @@ def test_params_plumbing_duplicate(self, MockProcessor): ]) mock.create_run.assert_not_called() + @patch('processor.Processor') + def test_params_plumbing_unhandled_error(self, MockProcessor): + # Set up mock context manager to return self. + mock = MockProcessor.return_value + mock.__enter__.return_value = mock + mock.download.side_effect = Exception("Some unexpected error") + + params = MultiDict({ + 'uploader': 'blade-runner', + 'id': '654321', + 'results': 'https://wpt.fyi/wpt_report.json.gz', + }) + with self.assertRaisesRegex(Exception, "Some unexpected error"): + process_report('12345', params) + + # Should have updated status to WPTFYI_PROCESSING, then INVALID on + # error. + # Note: the second update_status call uses a NEW Processor instance in + # the implementation. Since we patched the class, it should still be + # tracked. + mock.update_status.assert_has_calls([ + call('654321', 'WPTFYI_PROCESSING', None, None), + call('654321', 'INVALID', "Some unexpected error", None), + ]) + mock.create_run.assert_not_called() + class ProcessorDownloadServerTest(unittest.TestCase): """This class tests behaviours of Processor related to downloading