Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 275 additions & 3 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
app.config['FEEDBACK_FILE'] = os.path.join(app.config['DATA_DIR'], 'feedback.json')
app.config['SUBSCRIBERS_FILE'] = os.path.join(app.config['DATA_DIR'], 'subscribers.json')
app.config['MONITORED_PROJECTS_FILE'] = os.path.join(app.config['DATA_DIR'], 'monitored_projects.json')
app.config['GITHUB_ACTIONS_PROJECTS_FILE'] = os.path.join(app.config['DATA_DIR'], 'github_actions_projects.json')
app.config['SHOW_GRADES_PUBLICLY'] = os.getenv('SHOW_GRADES_PUBLICLY', 'True').lower() in ('true', '1', 'yes')

# Create directories if they don't exist
Expand All @@ -39,6 +40,10 @@
with open(app.config['MONITORED_PROJECTS_FILE'], 'w') as f:
json.dump([], f)

if not os.path.exists(app.config['GITHUB_ACTIONS_PROJECTS_FILE']):
with open(app.config['GITHUB_ACTIONS_PROJECTS_FILE'], 'w') as f:
json.dump([], f)

# Cache busting - changes on each deployment/restart
STATIC_VERSION = str(int(time.time()))

Expand Down Expand Up @@ -100,6 +105,22 @@ def load_monitored_projects():
return monitored


def load_github_actions_projects():
try:
with open(app.config['GITHUB_ACTIONS_PROJECTS_FILE'], 'r') as f:
items = json.load(f) or []
except Exception:
return []

projects = []
for item in items:
if isinstance(item, str):
projects.append({'repo_url': item})
elif isinstance(item, dict) and item.get('repo_url'):
projects.append({'repo_url': item['repo_url']})
return projects


def save_scan_result(report_dict):
if 'metadata' not in report_dict or report_dict['metadata'] is None:
report_dict['metadata'] = {}
Expand Down Expand Up @@ -612,6 +633,242 @@ def refresh_monitored_scans():
})


def is_run_already_saved(run_id):
results_dir = app.config['RESULTS_DIR']
try:
files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
for filename in files:
file_path = os.path.join(results_dir, filename)
with open(file_path, 'r') as f:
data = json.load(f)
if str(data.get('metadata', {}).get('github_run_id', '')) == str(run_id):
return True
except Exception:
pass
return False

def fetch_latest_github_actions_result(repo_url, github_token=None):
import re
match = re.search(r'github\.com/([^/]+)/([^/]+?)(?:\.git)?/?$', repo_url)
if not match:
return None, "Not a valid GitHub URL"
owner, repo = match.groups()

headers = {'Accept': 'application/vnd.github+json'}
if github_token:
headers['Authorization'] = f'Bearer {github_token}'

runs_url = f"https://api.github.com/repos/{owner}/{repo}/actions/runs?status=success&per_page=10"
resp = requests.get(runs_url, headers=headers, timeout=10)
if resp.status_code != 200:
return None, f"Failed to fetch runs: {resp.text}"

runs = resp.json().get('workflow_runs', [])
if not runs:
return None, "No successful runs found"

# Sort runs from newest to oldest
runs.sort(key=lambda x: x.get('created_at', ''), reverse=True)

import zipfile
import io

for run in runs:
run_id = str(run['id'])
print(f"[DEBUG] Checking run {run_id}")

# If saved, continue checking older runs instead of returning
if is_run_already_saved(run_id):
print(f"[DEBUG] Run {run_id} is already saved, skipping...")
continue

artifacts_url = run['artifacts_url']
art_resp = requests.get(artifacts_url, headers=headers, timeout=10)
if art_resp.status_code != 200:
print(f"[DEBUG] Failed to fetch artifacts for run {run_id}: {art_resp.status_code}")
continue

artifacts = art_resp.json().get('artifacts', [])
print(f"[DEBUG] Found {len(artifacts)} artifacts for run {run_id}")

target_artifact = None
for a in artifacts:
name_lower = a['name'].lower()
if 'infrascan' in name_lower or 'report' in name_lower or 'scan' in name_lower:
target_artifact = a
break

if not target_artifact:
print(f"[DEBUG] No matching artifact in run {run_id}")
continue

print(f"[DEBUG] Found artifact: {target_artifact['name']} (ID: {target_artifact['id']})")

dl_url = target_artifact['archive_download_url']
# Explicitly allow redirects for GitHub API -> AWS S3 redirect
dl_resp = requests.get(dl_url, headers=headers, timeout=20, allow_redirects=True)
if dl_resp.status_code == 401:
return None, "GitHub Token is required to download artifacts. Please set GITHUB_TOKEN in .env"
if dl_resp.status_code != 200:
print(f"[DEBUG] Failed to download artifact: {dl_resp.status_code}")
continue

try:
with zipfile.ZipFile(io.BytesIO(dl_resp.content)) as z:
all_files = z.namelist()
json_files = [f for f in all_files if f.endswith('.json')]
html_files = [f for f in all_files if f.endswith('.html')]

report_data = None

if json_files:
for json_file in json_files:
with z.open(json_file) as jf:
try:
report_data = json.loads(jf.read().decode('utf-8'))
break
except Exception as e:
print(f"[DEBUG] Error parsing JSON {json_file}: {e}")
continue
elif html_files:
print(f"[DEBUG] Found HTML report instead of JSON, parsing fallback...")
for html_file in html_files:
with z.open(html_file) as jf:
try:
html_content = jf.read().decode('utf-8')
import re

# 1. Try to extract injected JSON data directly (Base64)
import base64
match_b64 = re.search(r'window\.CLI_INJECTED_DATA_B64\s*=\s*[\'"]([A-Za-z0-9+/=]+)[\'"]', html_content)
if match_b64:
try:
b64_data = match_b64.group(1)
json_str = base64.b64decode(b64_data).decode('utf-8')
report_data = json.loads(json_str)
print(f"[DEBUG] Extracted JSON data from HTML report via CLI_INJECTED_DATA_B64")
break
except Exception as e:
print(f"[DEBUG] Error decoding B64 data: {e}")

# 1b. Try to extract injected JSON data directly (Raw)
match = re.search(r'window\.CLI_INJECTED_DATA\s*=\s*(\{.*?\});', html_content, re.DOTALL)
if match:
report_data = json.loads(match.group(1))
print(f"[DEBUG] Extracted JSON data from HTML report via CLI_INJECTED_DATA")
break

# 2. Fallback to BeautifulSoup parsing
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')

fallback_data = {
"metadata": {},
"summary": {"total": 0, "scanner_used": "unknown"},
"overall": {"letter": "?", "percentage": 0},
"cost": {"letter": "?", "percentage": 0},
"security": {"letter": "?", "percentage": 0},
"container": {"letter": "?", "percentage": 0},
"results": []
}

def extract_grade(grade_name, text):
m = re.search(fr'{grade_name}.*?([A-F\?])\s*\(?(\d+)%?\)?', text, re.IGNORECASE | re.DOTALL)
if m:
return {"letter": m.group(1).upper(), "percentage": int(m.group(2))}
m2 = re.search(fr'{grade_name}.*?([A-F\?])\s+(\d+)', text, re.IGNORECASE | re.DOTALL)
if m2:
return {"letter": m2.group(1).upper(), "percentage": int(m2.group(2))}
return {"letter": "?", "percentage": 0}

full_text = soup.get_text(separator=' ', strip=True)

fallback_data["overall"] = extract_grade('Overall', full_text)
fallback_data["security"] = extract_grade('Security', full_text)
fallback_data["cost"] = extract_grade('Cost', full_text)
fallback_data["container"] = extract_grade('Container', full_text)

m_total = re.search(r'(?:Total\s*(?:findings|issues|vulnerabilities)[:\s]+)(\d+)', full_text, re.IGNORECASE)
if m_total:
fallback_data["summary"]["total"] = int(m_total.group(1))

title = soup.title.string if soup.title else "HTML Report"
fallback_data["metadata"]["title"] = title

report_data = fallback_data
print(f"[DEBUG] Extracted 4 sections from HTML report")
print(f"[DEBUG] Converted HTML report to InfraScan JSON")
break
except Exception as e:
print(f"[DEBUG] Error parsing HTML {html_file}: {e}")
continue

if report_data is not None:
if 'metadata' not in report_data:
report_data['metadata'] = {}
report_data['metadata']['github_run_id'] = run_id
report_data['metadata']['scan_source'] = 'github_actions'
report_data['metadata']['repository_url'] = repo_url

if 'scan_timestamp' not in report_data['metadata']:
from datetime import datetime, timezone
created_at = run.get('created_at')
if created_at:
try:
dt = datetime.strptime(created_at, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
report_data['metadata']['scan_timestamp'] = dt.strftime('%Y-%m-%d %H:%M:%S UTC')
except Exception:
report_data['metadata']['scan_timestamp'] = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')
else:
report_data['metadata']['scan_timestamp'] = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')

if 'is_private' not in report_data['metadata']:
report_data['metadata']['is_private'] = False

print(f"[DEBUG] Successfully parsed artifact from run {run_id}")
return report_data, None
else:
print(f"[DEBUG] Artifact zip does not contain parsable .json or .html files! Contents: {all_files}")
continue

except zipfile.BadZipFile:
print(f"[DEBUG] Downloaded file is not a valid ZIP archive")
continue

return None, "No valid artifact found in recent runs"


@app.route('/api/scans/github-actions/refresh', methods=['POST'])
def refresh_github_actions_scans():
projects = load_github_actions_projects()
if not projects:
return jsonify({'error': 'No GitHub Actions projects configured.', 'projects': []}), 400

github_token = os.getenv('GITHUB_TOKEN', '').strip()

refresh_results = []
for project in projects:
repo_url = project['repo_url']
try:
report_data, error = fetch_latest_github_actions_result(repo_url, github_token)
if error == "already_saved":
refresh_results.append({'repo_url': repo_url, 'status': 'skipped', 'message': 'Run already saved'})
continue
if error:
refresh_results.append({'repo_url': repo_url, 'status': 'error', 'message': error})
continue

scan_id = save_scan_result(report_data)
refresh_results.append({'repo_url': repo_url, 'status': 'ok', 'scan_id': scan_id})
except Exception as e:
refresh_results.append({'repo_url': repo_url, 'status': 'error', 'message': str(e)})

return jsonify({
'results': refresh_results,
'refreshed_at': datetime.datetime.now(datetime.timezone.utc).isoformat()
})


@app.route('/api/results/save', methods=['POST'])
def save_results():
data = request.get_json()
Expand Down Expand Up @@ -866,7 +1123,23 @@ def get_supported_projects():
continue

proj_name = extract_project_name(repo_url)
proj_key = proj_name.lower()
normalized_repo_url = normalize_repository_url(repo_url)
# Use the full normalized URL as the unique key (owner + repo)
# This ensures forks from different users are separate projects
proj_key = normalized_repo_url.lower()

# Build a display name that includes owner for disambiguation:
# e.g. "olszewskiigor / openmrs-contrib-cluster"
try:
from urllib.parse import urlparse as _urlparse
_parsed = _urlparse(normalized_repo_url)
_parts = [p for p in _parsed.path.strip('/').split('/') if p]
if len(_parts) >= 2:
display_name = f"{_parts[-2]} / {proj_name}"
else:
display_name = proj_name
except Exception:
display_name = proj_name

# Check rolling 12-month window
in_window = scan_dt >= twelve_months_ago
Expand All @@ -875,10 +1148,9 @@ def get_supported_projects():
latest_scan_pct = data.get('overall', {}).get('percentage') if data.get('overall') else None
latest_scan_source = metadata.get('scan_source') or 'unknown'

normalized_repo_url = normalize_repository_url(repo_url)
if proj_key not in projects_map:
projects_map[proj_key] = {
'raw_name': proj_name,
'raw_name': display_name,
'repository_url': normalized_repo_url,
'scans_in_window': 0,
'latest_scan_dt': scan_dt,
Expand Down
1 change: 1 addition & 0 deletions data/github_actions_projects.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
Loading
Loading