Picture this: You're a security analyst knee-deep in investigating an alert within Google Security Operations . You've got multiple browser tabs open, you're manually correlating events, and you wish you could just script the repetitive parts – pulling specific logs, checking entity reputations, maybe even pushing a new detection rule once you've figured things out. The UI is powerful, but sometimes you need the speed and automation of code.
What if you could interact with Google Security Operations (SecOps) features directly from your Python scripts or terminal? Now you can. Meet the Google SecOps SDK for Python, a powerful toolkit designed to bring programmatic control and automation to your security workflows.
Coupled with its comprehensive Command Line Interface (CLI), this SDK empowers you to move faster, integrate seamlessly, and automate tasks that used to be manual chores. Let's walk through a typical investigation scenario and see how the SDK and CLI can accelerate each step.
First thing’s first – Get the SDK and join the community: https://github.com/google/secops-wrapper
First, let's get the tool. Installation via pip is straightforward:
pip install secops
Authentication is key. The SDK recommends using Application Default Credentials (ADC), especially for local development. It's the simplest way to get going. Just run this command once using the Google Cloud SDK:
# Login and set up application-default credentials
gcloud auth application-default login
Now, initializing the SDK client in your Python script is incredibly clean. It automatically picks up the credentials you just set up:
from secops import SecOpsClient
# Initialize with default credentials - no explicit configuration needed
client = SecOpsClient()
# Now, get the Chronicle-specific client for SIEM operations
# Remember to replace placeholders with your actual instance details!
chronicle = client.chronicle(
customer_id="your-chronicle-instance-id", # Your Chronicle instance ID
project_id="your-project-id", # Your GCP project ID
region="us" # Your Chronicle API region (e.g., "us", "europe")
)
print("SDK initialized and connected to Chronicle!")
(The SDK also supports authentication via environment variables or service account files/dictionaries if needed – check the README for details).
An alert triggers. Let's say we need to find all network connections from a specific host in the last 24 hours.
Using the SDK (Python):
from datetime import datetime, timedelta, timezone
# Set time range for the query
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24) # Last 24 hours
# Our UDM query string
# Note: Use snake_case for UDM fields in the API/SDK
udm_query = 'metadata.event_type = "NETWORK_CONNECTION" AND principal.hostname = "workstation-1"'
print(f"Searching for events from {start_time.isoformat()} to {end_time.isoformat()}...")
# Perform the UDM search
results = chronicle.search_udm(
query=udm_query,
start_time=start_time,
end_time=end_time,
max_events=10 # Let's just get the first 10 for this example
)
# Process the results (structure matches the example in the README)
if results.get("events"):
print(f"Found {results.get('total_events', len(results['events']))} matching events:")
for event_data in results["events"]:
event_udm = event_data.get("udm", {})
timestamp = event_udm.get("metadata", {}).get("event_timestamp", "N/A")
target_ip = event_udm.get("target", {}).get("ip", ["N/A"])[0]
print(f" - Timestamp: {timestamp}, Target IP: {target_ip}")
else:
print("No matching events found.")
# Want the results as CSV instead?
# csv_data = chronicle.fetch_udm_search_csv(
# query=udm_query,
# start_time=start_time,
# end_time=end_time,
# fields=["metadata.event_timestamp", "principal.hostname", "target.ip", "target.port"]
# )
# print("\nCSV Output:\n", csv_data)
Using the CLI (Terminal):
For quick checks, the secops CLI is fantastic. Let's configure it first to save typing common parameters:
# Save your credentials and region (do this once)
secops config set --customer-id "your-instance-id" --project-id "your-project-id" --region "us"
# Optionally, set a default time window (e.g., 24 hours)
# secops config set --time-window 24
Now, run the search:
# Use the query directly. If time-window isn't set, use --start-time/--end-time
secops search --query 'metadata.event_type = "NETWORK_CONNECTION" AND principal.hostname = "workstation-1"' --time-window 24 --max-events 10
During the investigation, you spot a suspicious IP address 8.8.8.8 (Google DNS, but let's pretend it's suspicious for this example) and want to know more about its activity in your environment.
Using the SDK (Python):
# Set time range for the query
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24) # Last 24 hours
suspicious_ip = "8.8.8.8"
print(f"\nGetting entity summary for {suspicious_ip}...")
# Use the same start/end time as the search
ip_summary = chronicle.summarize_entity(
value=suspicious_ip,
start_time=start_time, # Use previously defined start_time
end_time=end_time # Use previously defined end_time
)
# The SDK automatically detects the entity type (IP, domain, hash, etc.)
# Access the rich summary data (structure shown in README)
print(f" Entity Type Detected: {ip_summary.primary_entity.metadata.entity_type}")
print(f" First Seen: {ip_summary.primary_entity.metric.first_seen}")
print(f" Last Seen: {ip_summary.primary_entity.metric.last_seen}")
# ... explore ip_summary.related_entities, ip_summary.alert_counts etc.
# Let's also check for known IoC matches in the last 24h
print("\nChecking for IoC matches...")
iocs = chronicle.list_iocs(
start_time=start_time,
end_time=end_time,
max_matches=100,
add_mandiant_attributes=True, # Get extra context if available
prioritized_only=False
)
if iocs.get("matches"):
print(f"Found {len(iocs['matches'])} IoC matches:")
for ioc in iocs['matches']:
ioc_type = next(iter(ioc['artifactIndicator'].keys()))
ioc_value = next(iter(ioc['artifactIndicator'].values()))
sources = ', '.join(ioc['sources'])
print(f" - Type: {ioc_type}, Value: {ioc_value}, Sources: {sources}")
else:
print("No IoC matches found in this time range.")
Using the CLI (Terminal):
# Get entity summary for the IP
secops entity --value "8.8.8.8" --time-window 24 #we currently support hourly time windows
# List IoCs, perhaps only prioritized ones with Mandiant intel
secops iocs --time-window 24 --prioritized --mandiant
You realize logs from a critical custom application aren't in Google SecOps yet. Let's ingest a sample log that is already in JSON. This assumes we have already defined a custom log type (in this case “MY_APP_LOGS_JSON”) and that your user has appropriate IAM permissions.
Using the SDK (Python):
import json
from datetime import datetime, timezone
# Assume 'my_app_log' is a Python dict representing your custom log
current_time = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
my_app_log = {
"timestamp": current_time,
"hostname": "appserver-01",
"service": "auth_service",
"severity": "WARN",
"message": "Configuration file checksum mismatch",
"details": {"file": "/etc/app/config.ini", "expected": "abc", "actual": "def"}
}
# Use the appropriate Chronicle log type for your data
# Let's assume you have a custom type "MY_APP_LOGS_JSON" defined
log_type = "MY_APP_LOGS_JSON"
print(f"\nIngesting a single log (type: {log_type})...")
result = chronicle.ingest_log(
log_type=log_type,
log_message=json.dumps(my_app_log) # Convert dict to JSON string
)
print(f" Ingestion operation: {result.get('operation')}")
# Need to ingest multiple logs? Pass a list of JSON strings
# batch_logs = [json.dumps(log1), json.dumps(log2)]
# batch_result = chronicle.ingest_log(log_type=log_type, log_message=batch_logs)
# print(f"Batch ingestion operation: {batch_result.get('operation')}")
# Ingesting XML? Provide the raw XML string directly
# xml_content = "<Event>...</Event>"
# xml_result = chronicle.ingest_log(log_type="WINEVTLOG_XML", log_message=xml_content)
# print(f"XML ingestion operation: {xml_result.get('operation')}")
# You can also ingest fully structured UDM events directly
# network_event = { "metadata": { ... }, "principal": { ... }, ... }
# uevent_result = chronicle.ingest_udm(udm_events=network_event) # Pass dict or list of dicts
# print(f"UDM ingestion result: {uevent_result}")
Using the CLI (Terminal):
# Ingest from a file (assuming logs.json contains one log per line)
secops log ingest --type "MY_APP_LOGS_JSON" --file logs.json
# Ingest a single log message directly
secops log ingest --type "MY_APP_LOGS_JSON" --message '{"timestamp": "...", "message": "single log"}'
# Ingest a UDM event from a file
secops log ingest-udm --file "./udm_event.json"
Based on your findings, you've developed a YARA-L 2.0 rule. Let's deploy it.
Note: Rule naming collisions are handled by creating a unique rule id.
Using the SDK (Python):
# Your rule definition
rule_text = """
rule simple_network_rule {
meta:
description = "Example rule from SDK blog post"
author = "SecOps SDK User"
severity = "Medium"
priority = "Medium"
yara_version = "YL2.0" // Ensure this matches YARA-L 2.0 features used
rule_version = "1.0"
events:
$e.metadata.event_type = "NETWORK_CONNECTION"
$e.principal.hostname = "workstation-1" // Be specific or use patterns
$e.target.ip = "8.8.8.8"
condition:
$e
}
"""
print("\nValidating the rule before creation...")
validation = chronicle.validate_rule(rule_text)
if validation.success:
print(" Rule is valid. Creating...")
try:
rule = chronicle.create_rule(rule_text)
rule_id = rule.get("name", "").split("/")[-1] # Extract ID from resource name
print(f" Rule created successfully! ID: {rule_id}")
# Now enable the rule
print(f" Enabling rule {rule_id}...")
deployment = chronicle.enable_rule(rule_id, enabled=True)
print(f" Rule deployment status: {'Enabled' if deployment.get('enabled') else 'Disabled'}")
except Exception as e: # Catch potential API errors during creation/enablement
print(f" Error during rule creation/enablement: {e}")
else:
print(f" Rule validation failed: {validation.message}")
# Print error position if available
if validation.position:
print(f" Error near line {validation.position['startLine']}, column {validation.position['startColumn']}")
# Need help refining the rule or understanding a technique? Ask Gemini!
# Note: Requires Enterprise or Enterprise+ tier and opt-in (SDK attempts auto-opt-in)
try:
print("\nAsking Gemini about Windows Event ID 4625...")
response = chronicle.gemini("What is Windows event ID 4625?")
text_explanation = response.get_text_content() # Gets combined text/stripped HTML
print(f"Gemini Explanation: {text_explanation}")
# You can also ask Gemini to generate rules:
# rule_response = chronicle.gemini("Write a YARA-L rule to detect powershell downloading a file called gdp.zip")
# code_blocks = rule_response.get_code_blocks()
# if code_blocks:
# print("\nGemini Suggested Rule:\n", code_blocks[0].content)
# Gemini can also evaluate your rule and suggest improvements and refinements:
# print("\nAsking Gemini to evaluate our rule...")
# eval_response = chronicle.gemini(f"Please evaluate the following YL2 Rule: {rule_text}")
# text_explanation = eval_response.get_text_content() # Gets combined text/stripped HTML
# print(f"Gemini Explanation:\n {text_explanation}")
except Exception as e:
print(f"\nError querying Gemini (may require opt-in or entitlement): {e}")
Using the CLI (Terminal):
# Validate a rule file
secops rule validate --file "./my_rule.yaral"
# Create a rule from a file
secops rule create --file "./my_rule.yaral"
# (Note the rule ID returned)
# Enable the rule using its ID
secops rule enable --id "ru_abcdef123456" --enabled true
# Query Gemini
secops gemini --query "Tell me about CVE-2021-44228"
# Ask Gemini to evaluate your rule and suggest improvements
# Tweak the prompt to your heart's content
printf "Evaluate and suggest improvements: " && cat test_rule.yaral | xargs -0 secops gemini --query
Your new rule fires! Let's check the alerts and associated cases.
Using the SDK (Python):
print("\nChecking for recent, non-closed alerts...")
# Use the same start/end time or adjust as needed
alerts_response = chronicle.get_alerts(
start_time=start_time,
end_time=end_time,
snapshot_query='feedback_summary.status != "CLOSED"', # Example filter
max_alerts=50
)
alert_list = alerts_response.get('alerts', {}).get('alerts', [])
print(f"Found {len(alert_list)} non-closed alerts in the time range.")
# Extract unique case names (if any) associated with these alerts
case_names = {alert.get('caseName') for alert in alert_list if alert.get('caseName')}
if case_names:
print("\nFetching details for associated cases:")
cases_response = chronicle.get_cases(list(case_names)) # Pass list of case resource names
# Use helper methods on the returned CaseList object
open_cases = cases_response.filter_by_status("STATUS_OPEN")
print(f" Found {len(open_cases)} open cases associated with these alerts:")
for case in open_cases: # Iterate through the filtered list
print(f" - Case: {case.display_name} (Priority: {case.priority}, Status: {case.status})")
else:
print("No cases associated with the fetched alerts.")
Using the CLI (Terminal):
# Get alerts using a snapshot query
secops alert --snapshot-query 'feedback_summary.status != "CLOSED" AND feedback_summary.priority = "PRIORITY_HIGH"' --time-window 24
# Get details for specific case IDs (comma-separated)
secops case --ids "case-resource-name-1,case-resource-name-2"
The SDK and CLI also cover:
As you can see, the Google SecOps SDK and CLI provide a robust bridge between your code, your terminal, and the power of Google Security Operations. By replacing manual clicks with efficient scripts and commands, you can significantly speed up investigations, automate tedious tasks, ensure consistency in rule deployments, and integrate SecOps capabilities into your broader ecosystem.
Ready to ditch the clicks and embrace the code?
The Google SecOps SDK is licensed under the Apache License 2.0. Start automating today!