Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions api/pending_test_runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"sort"
"strings"
"time"

"github.com/gorilla/mux"
"github.com/web-platform-tests/wpt.fyi/shared"
Expand Down Expand Up @@ -52,11 +53,20 @@ func apiPendingTestRunsHandler(w http.ResponseWriter, r *http.Request) {
for i, key := range keys {
runs[i].ID = key.IntID()
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)

return
// Only show runs updated within the last 14 days to avoid stuck runs
// that have been dropped from the task queue.
if filter == "pending" {
cutoff := time.Now().Add(-14 * 24 * time.Hour)
filteredRuns := make([]shared.PendingTestRun, 0)
for _, run := range runs {
if run.Updated.After(cutoff) {
filteredRuns = append(filteredRuns, run)
}
}
runs = filteredRuns
Comment on lines +57 to +67
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know if this was the best path forward, but the other option is to create a script or something that manually changes the pending status of the existing runs.

}

// When we filter by status (pending) we need to re-sort.
sort.Sort(sort.Reverse(shared.PendingTestRunByUpdated(runs)))
emit(r.Context(), w, runs)
Expand Down
17 changes: 10 additions & 7 deletions api/pending_test_runs_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ func TestAPIPendingTestHandler(t *testing.T) {
}
assert.Nil(t, createPendingRun(ctx, &running))

stale := shared.PendingTestRun{
Created: now.AddDate(0, 0, -20),
Updated: now.AddDate(0, 0, -15),
Stage: shared.StageWptFyiReceived,
}
assert.Nil(t, createPendingRun(ctx, &stale))

t.Run("/api/status", func(t *testing.T) {
r, _ = i.NewRequest("GET", "/api/status", nil)
resp := httptest.NewRecorder()
Expand All @@ -81,13 +88,8 @@ func TestAPIPendingTestHandler(t *testing.T) {
assert.Equal(t, http.StatusOK, resp.Code, string(body))
var results []shared.PendingTestRun
json.Unmarshal(body, &results)
assert.Len(t, results, 5)
// Sorted by Update.
assert.Equal(t, results[0].ID, invalid.ID)
assert.Equal(t, results[1].ID, empty.ID)
assert.Equal(t, results[2].ID, duplicate.ID)
assert.Equal(t, results[3].ID, running.ID)
assert.Equal(t, results[4].ID, received.ID)
// 5 runs + 1 stale = 6
assert.Len(t, results, 6)
})

t.Run("/api/status/pending", func(t *testing.T) {
Expand All @@ -99,6 +101,7 @@ func TestAPIPendingTestHandler(t *testing.T) {
assert.Equal(t, http.StatusOK, resp.Code, string(body))
var results []shared.PendingTestRun
json.Unmarshal(body, &results)
// running, received are returned. stale is excluded by Updated cutoff.
assert.Len(t, results, 2)
assert.Equal(t, results[0].ID, running.ID)
assert.Equal(t, results[1].ID, received.ID)
Expand Down
50 changes: 50 additions & 0 deletions results-processor/cleanup_stuck_runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env python3
import datetime
import logging
from processor import Processor

_log = logging.getLogger(__name__)


def cleanup_stuck_runs(days_threshold: int = 14) -> None:
"""
Identifies PendingTestRun entities that have been in a non-terminal state
for longer than the given threshold and marks them as INVALID.
"""
p = Processor()
cutoff = (datetime.datetime.now(datetime.timezone.utc)
- datetime.timedelta(days=days_threshold))

_log.info("Querying for stuck runs updated before %s", cutoff)

# Query for runs with Stage < 800 (StageValid)
q = p.datastore.query(kind='PendingTestRun')
q.add_filter('Stage', '<', 800)

stuck_runs = list(q.fetch())
_log.info("Found %d potential stuck runs", len(stuck_runs))

count = 0
for run in stuck_runs:
# Check if the Updated time is before our cutoff
updated = run.get('Updated')
if updated and updated < cutoff:
run_id = str(run.key.id)
_log.info("Marking run %s as INVALID (last updated %s)",
run_id, updated)
try:
# Use the update_status method from Processor which handles
# the API call.
p.update_status(
run_id, 'INVALID',
error='Run timed out after {} days'.format(days_threshold))
count += 1
except Exception as e:
_log.exception("Failed to update run %s: %s", run_id, str(e))

_log.info("Successfully updated %d stuck runs", count)


if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
cleanup_stuck_runs()
139 changes: 76 additions & 63 deletions results-processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import posixpath
import shutil
import sys
import tempfile
import time
import traceback
Expand Down Expand Up @@ -374,68 +373,82 @@ def process_report(task_id: Optional[str], params: MultiDict[str, str]) -> str:
labels = params.get('labels', '')

response = []
with Processor() as p:
p.update_status(run_id, 'WPTFYI_PROCESSING', None, callback_url)
if archives:
_log.info("Downloading %d archives", len(archives))
else:
_log.info("Downloading %d results & %d screenshots",
len(results), len(screenshots))
p.download(results, screenshots, archives)
if len(p.results) == 0:
_log.error("No results successfully downloaded")
p.update_status(run_id, 'EMPTY', None, callback_url)
return ''
try:
with Processor() as p:
p.update_status(run_id, 'WPTFYI_PROCESSING', None, callback_url)
if archives:
_log.info("Downloading %d archives", len(archives))
else:
_log.info("Downloading %d results & %d screenshots",
len(results), len(screenshots))
p.download(results, screenshots, archives)
if len(p.results) == 0:
_log.error("No results successfully downloaded")
p.update_status(run_id, 'EMPTY', None, callback_url)
return ''
try:
p.load_report()
# To be deprecated once all reports have all the required
# metadata.
p.report.update_metadata(
revision=params.get('revision'),
browser_name=params.get('browser_name'),
browser_version=params.get('browser_version'),
os_name=params.get('os_name'),
os_version=params.get('os_version'),
)
p.report.finalize()
except wptreport.WPTReportError as e:
# This will register an error in Stackdriver.
_log.exception("Invalid report data: %s", str(e))
# Add the input source to the error message.
if e.path is None:
e.path = archives or results
p.update_status(run_id, 'INVALID', str(e), callback_url)
# The input is invalid and there is no point to retry, so we
# return an empty (but successful) response to drop the task.
return ''

if p.check_existing_run():
_log.warning(
'Skipping the task because RawResultsURL already exists: '
'%s', p.raw_results_url)
p.update_status(run_id, 'DUPLICATE', None, callback_url)
return ''
response.append("{} results loaded from task {}".format(
len(p.report.results), task_id))

_log.info("Uploading merged raw report")
p.upload_raw()
response.append("raw_results_url: " + p.raw_results_url)

_log.info("Uploading split results")
p.upload_split()
response.append("results_url: " + p.results_url)

# Check again because the upload takes a long time.
if p.check_existing_run():
_log.warning(
'Skipping the task because RawResultsURL already exists: '
'%s', p.raw_results_url)
p.update_status(run_id, 'DUPLICATE', None, callback_url)
return ''

p.create_run(run_id, labels, uploader, callback_url)
response.append("run ID: {}".format(p.test_run_id))

p.run_hooks([_upload_screenshots])
except Exception as e:
# For any unhandled exception, attempt to update the run status to
# INVALID before re-raising the exception to allow Cloud Tasks to retry
# if appropriate. We use a separate Processor instance if the original
# one failed or was closed.
_log.exception("Unhandled exception in process_report: %s", str(e))
try:
p.load_report()
# To be deprecated once all reports have all the required metadata.
p.report.update_metadata(
revision=params.get('revision'),
browser_name=params.get('browser_name'),
browser_version=params.get('browser_version'),
os_name=params.get('os_name'),
os_version=params.get('os_version'),
)
p.report.finalize()
except wptreport.WPTReportError as e:
etype, e_, tb = sys.exc_info()
assert e is e_
e.path = results
# This will register an error in Stackdriver.
traceback.print_exception(etype, e, tb)
p.update_status(run_id, 'INVALID', str(e), callback_url)
# The input is invalid and there is no point to retry, so we return
# an empty (but successful) response to drop the task.
return ''

if p.check_existing_run():
_log.warning(
'Skipping the task because RawResultsURL already exists: %s',
p.raw_results_url)
p.update_status(run_id, 'DUPLICATE', None, callback_url)
return ''
response.append("{} results loaded from task {}".format(
len(p.report.results), task_id))

_log.info("Uploading merged raw report")
p.upload_raw()
response.append("raw_results_url: " + p.raw_results_url)

_log.info("Uploading split results")
p.upload_split()
response.append("results_url: " + p.results_url)

# Check again because the upload takes a long time.
if p.check_existing_run():
_log.warning(
'Skipping the task because RawResultsURL already exists: %s',
p.raw_results_url)
p.update_status(run_id, 'DUPLICATE', None, callback_url)
return ''

p.create_run(run_id, labels, uploader, callback_url)
response.append("run ID: {}".format(p.test_run_id))

p.run_hooks([_upload_screenshots])
with Processor() as p:
p.update_status(run_id, 'INVALID', str(e), callback_url)
except Exception:
_log.exception("Failed to update status after unhandled exception")
raise e

return '\n'.join(response)
26 changes: 26 additions & 0 deletions results-processor/processor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,32 @@ def test_params_plumbing_duplicate(self, MockProcessor):
])
mock.create_run.assert_not_called()

@patch('processor.Processor')
def test_params_plumbing_unhandled_error(self, MockProcessor):
# Set up mock context manager to return self.
mock = MockProcessor.return_value
mock.__enter__.return_value = mock
mock.download.side_effect = Exception("Some unexpected error")

params = MultiDict({
'uploader': 'blade-runner',
'id': '654321',
'results': 'https://wpt.fyi/wpt_report.json.gz',
})
with self.assertRaisesRegex(Exception, "Some unexpected error"):
process_report('12345', params)

# Should have updated status to WPTFYI_PROCESSING, then INVALID on
# error.
# Note: the second update_status call uses a NEW Processor instance in
# the implementation. Since we patched the class, it should still be
# tracked.
mock.update_status.assert_has_calls([
call('654321', 'WPTFYI_PROCESSING', None, None),
call('654321', 'INVALID', "Some unexpected error", None),
])
mock.create_run.assert_not_called()


class ProcessorDownloadServerTest(unittest.TestCase):
"""This class tests behaviours of Processor related to downloading
Expand Down
Loading