Announcing the release of a simple SecOps API Wrapper SDK: https://pypi.org/project/secops/
from secops import SecOpsClient
client = SecOpsClient()
chronicle = client.chronicle(
customer_id="your-chronicle-instance-id",
project_id="your-project-id",
region="us"
)
New version released https://pypi.org/project/secops/0.1.2/
Features for Rule Management:
The SDK provides comprehensive support for managing Chronicle detection rules:
Create new detection rules using YARA-L 2.0 syntax:
rule_text = """
rule simple_network_rule {
meta:
description = "Example rule to detect network connections"
author = "SecOps SDK Example"
severity = "Medium"
priority = "Medium"
yara_version = "YL2.0"
rule_version = "1.0"
events:
$e.metadata.event_type = "NETWORK_CONNECTION"
$e.principal.hostname != ""
condition:
$e
}
"""
# Create the rule
rule = chronicle.create_rule(rule_text)
rule_id = rule.get("name", "").split("/")[-1]
print(f"Rule ID: {rule_id}")
Retrieve, list, update, enable/disable, and delete rules:
# List all rules
rules = chronicle.list_rules()
for rule in rules.get("rules", []):
rule_id = rule.get("name", "").split("/")[-1]
enabled = rule.get("deployment", {}).get("enabled", False)
print(f"Rule ID: {rule_id}, Enabled: {enabled}")
# Get specific rule
rule = chronicle.get_rule(rule_id)
print(f"Rule content: {rule.get('text')}")
# Update rule
updated_rule = chronicle.update_rule(rule_id, updated_rule_text)
# Enable/disable rule
deployment = chronicle.enable_rule(rule_id, enabled=True) # Enable
deployment = chronicle.enable_rule(rule_id, enabled=False) # Disable
# Delete rule
chronicle.delete_rule(rule_id)
Run rules against historical data to find past matches:
from datetime import datetime, timedelta, timezone
# Set time range for retrohunt
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7) # Search past 7 days
# Create retrohunt
retrohunt = chronicle.create_retrohunt(rule_id, start_time, end_time)
operation_id = retrohunt.get("name", "").split("/")[-1]
# Check retrohunt status
retrohunt_status = chronicle.get_retrohunt(rule_id, operation_id)
is_complete = retrohunt_status.get("metadata", {}).get("done", False)
Monitor rule detections and execution errors:
# List detections for a rule
detections = chronicle.list_detections(rule_id)
for detection in detections.get("detections", []):
detection_id = detection.get("id", "")
event_time = detection.get("eventTime", "")
alerting = detection.get("alertState", "") == "ALERTING"
print(f"Detection: {detection_id}, Time: {event_time}, Alerting: {alerting}")
# List execution errors for a rule
errors = chronicle.list_errors(rule_id)
for error in errors.get("ruleExecutionErrors", []):
error_message = error.get("error_message", "")
create_time = error.get("create_time", "")
print(f"Error: {error_message}, Time: {create_time}")
Search for alerts generated by rules:
# Set time range for alert search
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7) # Search past 7 days
# Search for rule alerts
alerts_response = chronicle.search_rule_alerts(
start_time=start_time,
end_time=end_time,
page_size=10
)
# The API returns a nested structure where alerts are grouped by rule
# Extract and process all alerts from this structure
all_alerts = []
too_many_alerts = alerts_response.get('tooManyAlerts', False)
# Process the nested response structure - alerts are grouped by rule
for rule_alert in alerts_response.get('ruleAlerts', []):
# Extract rule metadata
rule_metadata = rule_alert.get('ruleMetadata', {})
rule_id = rule_metadata.get('properties', {}).get('ruleId', 'Unknown')
rule_name = rule_metadata.get('properties', {}).get('name', 'Unknown')
# Get alerts for this rule
rule_alerts = rule_alert.get('alerts', [])
# Process each alert
for alert in rule_alerts:
# Extract important fields
alert_id = alert.get("id", "")
detection_time = alert.get("detectionTimestamp", "")
commit_time = alert.get("commitTimestamp", "")
alerting_type = alert.get("alertingType", "")
print(f"Alert ID: {alert_id}")
print(f"Rule ID: {rule_id}")
print(f"Rule Name: {rule_name}")
print(f"Detection Time: {detection_time}")
# Extract events from the alert
if 'resultEvents' in alert:
for var_name, event_data in alert.get('resultEvents', {}).items():
if 'eventSamples' in event_data:
for sample in event_data.get('eventSamples', []):
if 'event' in sample:
event = sample['event']
# Process event data
event_type = event.get('metadata', {}).get('eventType', 'Unknown')
print(f"Event Type: {event_type}")
If tooManyAlerts
is True in the response, consider narrowing your search criteria using a smaller time window or more specific filters.
Manage curated rule sets:
# Define deployments for rule sets
deployments = [
{
"category_id": "category-uuid",
"rule_set_id": "ruleset-uuid",
"precision": "broad",
"enabled": True,
"alerting": False
}
]
# Update rule set deployments
chronicle.batch_update_curated_rule_set_deployments(deployments)
Validate a YARA-L2 rule before creating or updating it:
# Example rule
rule_text = """
rule test_rule {
meta:
description = "Test rule for validation"
author = "Test Author"
severity = "Low"
yara_version = "YL2.0"
rule_version = "1.0"
events:
$e.metadata.event_type = "NETWORK_CONNECTION"
condition:
$e
}
"""
# Validate the rule
result = chronicle.validate_rule(rule_text)
if result.success:
print("Rule is valid")
else:
print(f"Rule is invalid: {result.message}")
if result.position:
print(f"Error at line {result.position['startLine']}, column {result.position['startColumn']}")
Looks very interesting @raybrian - I'm about to start testing the use of the get_stats method which I can see results in a json output.
I noticed you had a separate action that returns the results of a udm query in csv format - are we able to run stats searches and have them returned in csv format?
Awesome. We would love your feedback. The CSV Format is it's own API for UDM search that doesn't support stats search today. It would be really easy to wrap the stats query to get you what you're looking for:
#!/usr/bin/env python3
"""
Example script showing how to export Chronicle stats search results to CSV format.
This script demonstrates how to use the Chronicle SDK to perform a stats search
and export the results to a CSV file without modifying the SDK.
"""
import csv
import sys
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, Tuple
from secops.chronicle.client import ChronicleClient
def get_client() -> ChronicleClient:
"""
Initialize and return a Chronicle client.
Returns:
ChronicleClient: An authenticated Chronicle client
"""
# Replace with your own authentication method
return ChronicleClient()
def get_time_range(days_back: int = 7) -> Tuple[datetime, datetime]:
"""
Calculate a time range for the query.
Args:
days_back: Number of days to look back from now
Returns:
Tuple containing start_time and end_time
"""
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days_back)
return start_time, end_time
def export_stats_to_csv(
query: str,
output_file: str,
days_back: int = 7,
max_events: int = 1000,
max_values: int = 100
) -> None:
"""
Run a Chronicle stats query and export results to CSV.
Args:
query: Chronicle search query in YARA-L format
output_file: Path to the output CSV file
days_back: Number of days to look back from now
max_events: Maximum number of events to process
max_values: Maximum number of values to return per field
"""
chronicle = get_client()
start_time, end_time = get_time_range(days_back)
print(f"Running query for time range: {start_time} to {end_time}")
try:
# Execute the stats query
stats = chronicle.get_stats(
query=query,
start_time=start_time,
end_time=end_time,
max_events=max_events,
max_values=max_values
)
# Write results to CSV
with open(output_file, 'w', newline='') as csvfile:
if not stats['columns'] or not stats['rows']:
print("No results found")
csvfile.write("No results found\n")
return
writer = csv.DictWriter(csvfile, fieldnames=stats['columns'])
writer.writeheader()
writer.writerows(stats['rows'])
print(f"Successfully exported {stats['total_rows']} rows to {output_file}")
except Exception as e:
print(f"Error performing stats query: {e}")
def main() -> None:
"""Main function to run the example."""
# Example query that counts network connections by hostname
query = """metadata.event_type = "NETWORK_CONNECTION"
match:
target.hostname
outcome:
$count = count(metadata.id)
order:
$count desc"""
output_file = "chronicle_stats_results.csv"
print("=== Chronicle Stats to CSV Example ===")
export_stats_to_csv(
query=query,
output_file=output_file,
days_back=7,
max_events=1000,
max_values=20
)
# Example of another query - top source IPs by count
query2 = """metadata.event_type = "NETWORK_CONNECTION"
match:
principal.ip
outcome:
$count = count(metadata.id)
order:
$count desc"""
output_file2 = "chronicle_source_ips.csv"
print("\n=== Top Source IPs Export Example ===")
export_stats_to_csv(
query=query2,
output_file=output_file2,
days_back=7,
max_events=1000,
max_values=20
)
if __name__ == "__main__":
main()
I did do a test and I'm getting an error when running the get_stats method:
<p>The requested URL <code>/v1alpha/projects/<project_id>/locations/<region>/instances/<customer_id>/legacy:legacyFetchUdmSearchView</code> was not found on this server. <ins>Thatโs all we know.</ins>
I have of course removed project_id, customer_id and region values from the url.
The one thing I'm unsure of is region - is there a list of potential region values (the wrapper uses "us" as an example)?
It depends on which region your instance is running in, the choices are:
US
EUROPE
ASIA_SOUTHEAST1
EUROPE_WEST2
AUSTRALIA_SOUTHEAST1
ME_WEST1
EUROPE_WEST6
EUROPE_WEST3
ME_CENTRAL2
ASIA_SOUTH1
ASIA_NORTHEAST1
NORTHAMERICA_NORTHEAST2
EUROPE_WEST12
ME_CENTRAL1
SOUTHAMERICA_EAST1
EUROPE_WEST9
New update for 0.1.3. Now you can pass a natural language search and get UDM events back
from secops import SecOpsClient
from datetime import datetime, timedelta, timezone
client = SecOpsClient()
chronicle = client.chronicle(
customer_id="c3c6260c1c9340dcbbb802603bbf8888",
project_id="725716774999",
region="us"
)
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=6)
results = chronicle.nl_search(
text="show me dns lookups",
start_time=start_time,
end_time=end_time,
max_events=1
)
print(results)
New update for 0.1.4. Now supporting log ingestion, forwarder management, and log type discovery.
https://github.com/google/secops-wrapper?tab=readme-ov-file#log-ingestion
@raybrian I have performed a test stats search using a service account that was created to integrate Google Sec Ops SIEM To SOAR. I'm seeing an IAM permissions error being returned:
status": "PERMISSION_DENIED", "details": [ { "@type": "type.googleapis.com/google.rpc.ErrorInfo", "reason": "IAM_PERMISSION_DENIED", "domain": "chronicle.googleapis.com", "metadata": { "resource": "projects/xxxxxx", "permission": "chronicle.legacies.legacyFetchUdmSearchView"
Are there specific permissions that must be provided to the service account in order for the various actions to work?
The service account needs permission to read Chronicle API in the "BYOP" project associated with your account. Some tenants might not have migrated to the new API, or you could be using a service account associated with the older APIs. If you DM me your chronicle tenant URL and region, I can look at the details for you.
Thanks @raybrian - I dont seem to have the ability to DM which I've created a separate post about. Once that is resolved, I'll reach out, thanks!
Thanks so much for this, @raybrian. The wrapper and your examples are very useful.
Just wanted to call out that if a service account needs to be created to use the wrapper I think the minimal role required for the service account is Chronicle API Viewer. @raybrian please correct me if I'm wrong.
Version 0.1.5 released with UDM Event Ingestion
result = chronicle.ingest_udm(udm_events=network_event)
New update
New update today includes access to specialized Gemini AI functions to get security insights, generate detection rules, explain security concepts, and more.
https://github.com/google/secops-wrapper?tab=readme-ov-file#gemini-ai
# Generate a rule to detect potential security issues
rule_response = chronicle.gemini("Write a rule to detect powershell downloading a file called gdp.zip")
# Extract the generated rule(s)
code_blocks = rule_response.get_code_blocks()
if code_blocks:
rule = code_blocks[0].content
print("Generated rule:", rule)
# Query Gemini with a security question
response = chronicle.gemini("What is Windows event ID 4625?")
# Get text content (combines TEXT blocks and stripped HTML content)
text_explanation = response.get_text_content()
print("Explanation:", text_explanation)
@raybrian I just read in this blog that that the UDM Entity Search is coming into preview: UDM Entity Search. In this post, Iโll explore the newโฆ | by Chris Martin (@thatsiemguy) | Apr, 2025 ...
I know the SecOps SDK wrapper support stats UDM Events searches, but does it support stats UDM Entity Searches?
It uses the same API as entity search, so it should work. Want to test and let us know? Otherwise I can try it later this week. (Super slammed with RSA this week)