Teams message vulnerability configuration

Regarding the configuration of vulnerability notifications for Docker images pushed to GCP Artifact Registry 

 sending the vulnerability notifications  to be sent to Teams using the pub sub topics , i have used this below python code 

 

import base64
import json
import os
import requests
import functions_framework
import ast
import google.auth
from google.auth.transport.requests import Request

# Read the Teams webhook URL from an environment variable
TEAMS_WEBHOOK_URL = os.getenv("TEAMS_WEBHOOK_URL")

if not TEAMS_WEBHOOK_URL:
raise ValueError("The environment variable TEAMS_WEBHOOK_URL is not set.")

# Google Cloud Container Analysis API URL template
CONTAINER_ANALYSIS_API_URL = (
"https://containeranalysis.googleapis.com/v1/projects/{project_id}/occurrences/{occurrence_id}"
)

def get_access_token():
"""Retrieve the access token for authenticating with Google Cloud APIs."""
credentials, _ = google.auth.default()
credentials.refresh(Request())
return credentials.token

def get_occurrence_details(project_id, occurrence_id):
"""Query Container Analysis API for occurrence details."""
url = CONTAINER_ANALYSIS_API_URL.format(project_id=project_id, occurrence_id=occurrence_id)
headers = {
"Authorization": f"Bearer {get_access_token()}",
"Content-Type": "application/json",
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to fetch occurrence details: {response.status_code}, {response.text}")

@functions_framework.cloud_event
def hello_pubsub(cloud_event):
try:
# Decode the Pub/Sub message
pubsub_message = base64.b64decode(cloud_event.data["message"]["data"]).decode("utf-8")
print(f"This is a Sushmitha : {pubsub_message}")

# Check if the message is already in dictionary format
if isinstance(pubsub_message, dict):
message_data = pubsub_message
else:
# Convert the payload to a dictionary
try:
if pubsub_message.startswith("{") and pubsub_message.endswith("}"):
# JSON-like string, try loading as JSON
message_data = json.loads(pubsub_message.replace("'", '"')) # Handle single quotes
else:
# Non-JSON string, use literal_eval
message_data = ast.literal_eval(pubsub_message)
except (ValueError, SyntaxError) as e:
raise ValueError(f"Failed to parse Pub/Sub payload. Error: {e}")

print(f" Manualy print of Payload raaja : {message_data}")
# Check the kind in the message
note_kind = message_data.get("kind")
if note_kind != "VULNERABILITY":
print(f"Ignoring message with kind: {note_kind}")
return

# Extract project ID and occurrence ID
name_field = message_data.get("name", "")
if not name_field:
raise ValueError("Missing 'name' field in the message.")

parts = name_field.split("/")
if len(parts) >= 4:
project_id = parts[1]
occurrence_id = parts[3]
else:
raise ValueError(f"Unexpected 'name' format: {name_field}")

print(f"Extracted project_id: {project_id}, occurrence_id: {occurrence_id}")

if not project_id or not occurrence_id:
raise ValueError("Missing project ID or occurrence ID in the message.")

# Query Container Analysis API for occurrence details
occurrence_details = get_occurrence_details(project_id, occurrence_id)

# Extract vulnerability details
vulnerability = occurrence_details.get("vulnerability")
if not vulnerability:
print("No vulnerability details found. Skipping alert.")
return

severity = vulnerability.get("severity")
cvss_score = vulnerability.get("cvssScore")
description = occurrence_details.get("resourceUri", "No description available")

# Prepare the Teams alert payload
teams_payload = {
"@type": "MessageCard",
"@context": "https://schema.org/extensions",
"summary": "Vulnerability Alert",
"themeColor": "FF0000", # Red for critical alerts
"title": f"Vulnerability Found: Severity {severity}",
"text": f"A vulnerability has been detected with the following details:\n\n"
f"**Description:** {description}\n"
f"**Severity:** {severity}\n"
f"**CVSS Score:** {cvss_score}\n",
}

# Send the alert to Teams
response = requests.post(
TEAMS_WEBHOOK_URL,
headers={"Content-Type": "application/json"},
data=json.dumps(teams_payload)
)

# Check for errors
if response.status_code != 200:
print(f"Failed to send message to Teams. Status code: {response.status_code}, Response: {response.text}")
else:
print("Vulnerability alert sent to Teams successfully.")

except Exception as e:
print(f"Error processing Pub/Sub message: {e}")

When we use the above code in the cloud run function and deploy it . we get teams messages but seperate teams messages we are getting 

Example, if scanned image has 900+ vulnerabilities, it'll send 900+ messages to teams chat.

sushmitha_0-1736243062589.png

I want single teams message with all vulnerabilities attached in it as pdf or json format please anybody help me on this . 

0 0 127
0 REPLIES 0