Regarding the configuration of vulnerability notifications for Docker images pushed to GCP Artifact Registry
sending the vulnerability notifications to be sent to Teams using the pub sub topics , i have used this below python code
import base64
import json
import os
import requests
import functions_framework
import ast
import google.auth
from google.auth.transport.requests import Request
# Read the Teams webhook URL from an environment variable
TEAMS_WEBHOOK_URL = os.getenv("TEAMS_WEBHOOK_URL")
if not TEAMS_WEBHOOK_URL:
raise ValueError("The environment variable TEAMS_WEBHOOK_URL is not set.")
# Google Cloud Container Analysis API URL template
CONTAINER_ANALYSIS_API_URL = (
"https://containeranalysis.googleapis.com/v1/projects/{project_id}/occurrences/{occurrence_id}"
)
def get_access_token():
"""Retrieve the access token for authenticating with Google Cloud APIs."""
credentials, _ = google.auth.default()
credentials.refresh(Request())
return credentials.token
def get_occurrence_details(project_id, occurrence_id):
"""Query Container Analysis API for occurrence details."""
url = CONTAINER_ANALYSIS_API_URL.format(project_id=project_id, occurrence_id=occurrence_id)
headers = {
"Authorization": f"Bearer {get_access_token()}",
"Content-Type": "application/json",
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to fetch occurrence details: {response.status_code}, {response.text}")
@functions_framework.cloud_event
def hello_pubsub(cloud_event):
try:
# Decode the Pub/Sub message
pubsub_message = base64.b64decode(cloud_event.data["message"]["data"]).decode("utf-8")
print(f"This is a Sushmitha : {pubsub_message}")
# Check if the message is already in dictionary format
if isinstance(pubsub_message, dict):
message_data = pubsub_message
else:
# Convert the payload to a dictionary
try:
if pubsub_message.startswith("{") and pubsub_message.endswith("}"):
# JSON-like string, try loading as JSON
message_data = json.loads(pubsub_message.replace("'", '"')) # Handle single quotes
else:
# Non-JSON string, use literal_eval
message_data = ast.literal_eval(pubsub_message)
except (ValueError, SyntaxError) as e:
raise ValueError(f"Failed to parse Pub/Sub payload. Error: {e}")
print(f" Manualy print of Payload raaja : {message_data}")
# Check the kind in the message
note_kind = message_data.get("kind")
if note_kind != "VULNERABILITY":
print(f"Ignoring message with kind: {note_kind}")
return
# Extract project ID and occurrence ID
name_field = message_data.get("name", "")
if not name_field:
raise ValueError("Missing 'name' field in the message.")
parts = name_field.split("/")
if len(parts) >= 4:
project_id = parts[1]
occurrence_id = parts[3]
else:
raise ValueError(f"Unexpected 'name' format: {name_field}")
print(f"Extracted project_id: {project_id}, occurrence_id: {occurrence_id}")
if not project_id or not occurrence_id:
raise ValueError("Missing project ID or occurrence ID in the message.")
# Query Container Analysis API for occurrence details
occurrence_details = get_occurrence_details(project_id, occurrence_id)
# Extract vulnerability details
vulnerability = occurrence_details.get("vulnerability")
if not vulnerability:
print("No vulnerability details found. Skipping alert.")
return
severity = vulnerability.get("severity")
cvss_score = vulnerability.get("cvssScore")
description = occurrence_details.get("resourceUri", "No description available")
# Prepare the Teams alert payload
teams_payload = {
"@type": "MessageCard",
"@context": "https://schema.org/extensions",
"summary": "Vulnerability Alert",
"themeColor": "FF0000", # Red for critical alerts
"title": f"Vulnerability Found: Severity {severity}",
"text": f"A vulnerability has been detected with the following details:\n\n"
f"**Description:** {description}\n"
f"**Severity:** {severity}\n"
f"**CVSS Score:** {cvss_score}\n",
}
# Send the alert to Teams
response = requests.post(
TEAMS_WEBHOOK_URL,
headers={"Content-Type": "application/json"},
data=json.dumps(teams_payload)
)
# Check for errors
if response.status_code != 200:
print(f"Failed to send message to Teams. Status code: {response.status_code}, Response: {response.text}")
else:
print("Vulnerability alert sent to Teams successfully.")
except Exception as e:
print(f"Error processing Pub/Sub message: {e}")
When we use the above code in the cloud run function and deploy it . we get teams messages but seperate teams messages we are getting
Example, if scanned image has 900+ vulnerabilities, it'll send 900+ messages to teams chat.
I want single teams message with all vulnerabilities attached in it as pdf or json format please anybody help me on this .