SecOps API Wrapper SDK for Python

Announcing the release of a simple SecOps API Wrapper SDK: https://pypi.org/project/secops/

 
now using the SecOps API is as easy as:
pip install secops
 
from secops import SecOpsClient
client = SecOpsClient()
chronicle = client.chronicle(
    customer_id="your-chronicle-instance-id",
    project_id="your-project-id",
    region="us"
)
 
Currently supported methods:
UDM Search
Stats Search
CSV Export
Entity Summaries
Entity Summary from UDM Search
List IOC Matches in Time Range
Get Cases
Get Alerts


Please let us know your feedback, and which other use cases you'd like to see supported.

8 17 1,129
17 REPLIES 17

New version released https://pypi.org/project/secops/0.1.2/

Features for Rule Management: 

Rule Management

The SDK provides comprehensive support for managing Chronicle detection rules:

Creating Rules

Create new detection rules using YARA-L 2.0 syntax:

rule_text = """
rule simple_network_rule {
    meta:
        description = "Example rule to detect network connections"
        author = "SecOps SDK Example"
        severity = "Medium"
        priority = "Medium"
        yara_version = "YL2.0"
        rule_version = "1.0"
    events:
        $e.metadata.event_type = "NETWORK_CONNECTION"
        $e.principal.hostname != ""
    condition:
        $e
}
"""

# Create the rule
rule = chronicle.create_rule(rule_text)
rule_id = rule.get("name", "").split("/")[-1]
print(f"Rule ID: {rule_id}")
 

Managing Rules

Retrieve, list, update, enable/disable, and delete rules:

# List all rules
rules = chronicle.list_rules()
for rule in rules.get("rules", []):
    rule_id = rule.get("name", "").split("/")[-1]
    enabled = rule.get("deployment", {}).get("enabled", False)
    print(f"Rule ID: {rule_id}, Enabled: {enabled}")

# Get specific rule
rule = chronicle.get_rule(rule_id)
print(f"Rule content: {rule.get('text')}")

# Update rule
updated_rule = chronicle.update_rule(rule_id, updated_rule_text)

# Enable/disable rule
deployment = chronicle.enable_rule(rule_id, enabled=True)  # Enable
deployment = chronicle.enable_rule(rule_id, enabled=False) # Disable

# Delete rule
chronicle.delete_rule(rule_id)
 

Retrohunts

Run rules against historical data to find past matches:

from datetime import datetime, timedelta, timezone

# Set time range for retrohunt
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7)  # Search past 7 days

# Create retrohunt
retrohunt = chronicle.create_retrohunt(rule_id, start_time, end_time)
operation_id = retrohunt.get("name", "").split("/")[-1]

# Check retrohunt status
retrohunt_status = chronicle.get_retrohunt(rule_id, operation_id)
is_complete = retrohunt_status.get("metadata", {}).get("done", False)
 

Detections and Errors

Monitor rule detections and execution errors:

# List detections for a rule
detections = chronicle.list_detections(rule_id)
for detection in detections.get("detections", []):
    detection_id = detection.get("id", "")
    event_time = detection.get("eventTime", "")
    alerting = detection.get("alertState", "") == "ALERTING"
    print(f"Detection: {detection_id}, Time: {event_time}, Alerting: {alerting}")

# List execution errors for a rule
errors = chronicle.list_errors(rule_id)
for error in errors.get("ruleExecutionErrors", []):
    error_message = error.get("error_message", "")
    create_time = error.get("create_time", "")
    print(f"Error: {error_message}, Time: {create_time}")
 

Rule Alerts

Search for alerts generated by rules:

# Set time range for alert search
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7)  # Search past 7 days

# Search for rule alerts
alerts_response = chronicle.search_rule_alerts(
    start_time=start_time,
    end_time=end_time,
    page_size=10
)

# The API returns a nested structure where alerts are grouped by rule
# Extract and process all alerts from this structure
all_alerts = []
too_many_alerts = alerts_response.get('tooManyAlerts', False)

# Process the nested response structure - alerts are grouped by rule
for rule_alert in alerts_response.get('ruleAlerts', []):
    # Extract rule metadata
    rule_metadata = rule_alert.get('ruleMetadata', {})
    rule_id = rule_metadata.get('properties', {}).get('ruleId', 'Unknown')
    rule_name = rule_metadata.get('properties', {}).get('name', 'Unknown')
    
    # Get alerts for this rule
    rule_alerts = rule_alert.get('alerts', [])
    
    # Process each alert
    for alert in rule_alerts:
        # Extract important fields
        alert_id = alert.get("id", "")
        detection_time = alert.get("detectionTimestamp", "")
        commit_time = alert.get("commitTimestamp", "")
        alerting_type = alert.get("alertingType", "")
        
        print(f"Alert ID: {alert_id}")
        print(f"Rule ID: {rule_id}")
        print(f"Rule Name: {rule_name}")
        print(f"Detection Time: {detection_time}")
        
        # Extract events from the alert
        if 'resultEvents' in alert:
            for var_name, event_data in alert.get('resultEvents', {}).items():
                if 'eventSamples' in event_data:
                    for sample in event_data.get('eventSamples', []):
                        if 'event' in sample:
                            event = sample['event']
                            # Process event data
                            event_type = event.get('metadata', {}).get('eventType', 'Unknown')
                            print(f"Event Type: {event_type}")
 

If tooManyAlerts is True in the response, consider narrowing your search criteria using a smaller time window or more specific filters.

Rule Sets

Manage curated rule sets:

# Define deployments for rule sets
deployments = [
    {
        "category_id": "category-uuid",
        "rule_set_id": "ruleset-uuid",
        "precision": "broad",
        "enabled": True,
        "alerting": False
    }
]

# Update rule set deployments
chronicle.batch_update_curated_rule_set_deployments(deployments)
 

Rule Validation

Validate a YARA-L2 rule before creating or updating it:

# Example rule
rule_text = """
rule test_rule {
    meta:
        description = "Test rule for validation"
        author = "Test Author"
        severity = "Low"
        yara_version = "YL2.0"
        rule_version = "1.0"
    events:
        $e.metadata.event_type = "NETWORK_CONNECTION"
    condition:
        $e
}
"""

# Validate the rule
result = chronicle.validate_rule(rule_text)

if result.success:
    print("Rule is valid")
else:
    print(f"Rule is invalid: {result.message}")
    if result.position:
        print(f"Error at line {result.position['startLine']}, column {result.position['startColumn']}")
 
 

Looks very interesting @raybrian - I'm about to start testing the use of the get_stats method which I can see results in a json output. 

I noticed you had a separate action that returns the results of a udm query in csv format - are we able to run stats searches and have them returned in csv format?

Awesome. We would love your feedback. The CSV Format is it's own API for UDM search that doesn't support stats search today. It would be really easy to wrap the stats query to get you what you're looking for: 

#!/usr/bin/env python3
"""
Example script showing how to export Chronicle stats search results to CSV format.

This script demonstrates how to use the Chronicle SDK to perform a stats search
and export the results to a CSV file without modifying the SDK.
"""

import csv
import sys
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, Tuple

from secops.chronicle.client import ChronicleClient


def get_client() -> ChronicleClient:
    """
    Initialize and return a Chronicle client.
    
    Returns:
        ChronicleClient: An authenticated Chronicle client
    """
    # Replace with your own authentication method
    return ChronicleClient()


def get_time_range(days_back: int = 7) -> Tuple[datetime, datetime]:
    """
    Calculate a time range for the query.
    
    Args:
        days_back: Number of days to look back from now
        
    Returns:
        Tuple containing start_time and end_time
    """
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=days_back)
    return start_time, end_time


def export_stats_to_csv(
    query: str,
    output_file: str,
    days_back: int = 7,
    max_events: int = 1000,
    max_values: int = 100
) -> None:
    """
    Run a Chronicle stats query and export results to CSV.
    
    Args:
        query: Chronicle search query in YARA-L format
        output_file: Path to the output CSV file
        days_back: Number of days to look back from now
        max_events: Maximum number of events to process
        max_values: Maximum number of values to return per field
    """
    chronicle = get_client()
    start_time, end_time = get_time_range(days_back)
    
    print(f"Running query for time range: {start_time} to {end_time}")
    
    try:
        # Execute the stats query
        stats = chronicle.get_stats(
            query=query,
            start_time=start_time,
            end_time=end_time,
            max_events=max_events,
            max_values=max_values
        )
        
        # Write results to CSV
        with open(output_file, 'w', newline='') as csvfile:
            if not stats['columns'] or not stats['rows']:
                print("No results found")
                csvfile.write("No results found\n")
                return
                
            writer = csv.DictWriter(csvfile, fieldnames=stats['columns'])
            writer.writeheader()
            writer.writerows(stats['rows'])
            
        print(f"Successfully exported {stats['total_rows']} rows to {output_file}")
        
    except Exception as e:
        print(f"Error performing stats query: {e}")


def main() -> None:
    """Main function to run the example."""
    # Example query that counts network connections by hostname
    query = """metadata.event_type = "NETWORK_CONNECTION"
match:
    target.hostname
outcome:
    $count = count(metadata.id)
order:
    $count desc"""
    
    output_file = "chronicle_stats_results.csv"
    
    print("=== Chronicle Stats to CSV Example ===")
    export_stats_to_csv(
        query=query,
        output_file=output_file,
        days_back=7,
        max_events=1000,
        max_values=20
    )
    
    # Example of another query - top source IPs by count
    query2 = """metadata.event_type = "NETWORK_CONNECTION"
match:
    principal.ip
outcome:
    $count = count(metadata.id)
order:
    $count desc"""
    
    output_file2 = "chronicle_source_ips.csv"
    
    print("\n=== Top Source IPs Export Example ===")
    export_stats_to_csv(
        query=query2,
        output_file=output_file2,
        days_back=7,
        max_events=1000,
        max_values=20
    )


if __name__ == "__main__":
    main()

 

I did do a test and I'm getting an error when running the get_stats method:

 <p>The requested URL <code>/v1alpha/projects/<project_id>/locations/<region>/instances/<customer_id>/legacy:legacyFetchUdmSearchView</code> was not found on this server.  <ins>That’s all we know.</ins>

 I have of course removed project_id, customer_id and region values from the url. 

The one thing I'm unsure of is region - is there a list of potential region values (the wrapper uses "us" as an example)?

It depends on which region your instance is running in, the choices are:

US

EUROPE

ASIA_SOUTHEAST1

EUROPE_WEST2

AUSTRALIA_SOUTHEAST1

ME_WEST1

EUROPE_WEST6

EUROPE_WEST3

ME_CENTRAL2

ASIA_SOUTH1

ASIA_NORTHEAST1

NORTHAMERICA_NORTHEAST2

EUROPE_WEST12

ME_CENTRAL1

SOUTHAMERICA_EAST1

EUROPE_WEST9

New update for 0.1.3. Now you can pass a natural language search and get UDM events back

from secops import SecOpsClient
from datetime import datetime, timedelta, timezone

client = SecOpsClient()
chronicle = client.chronicle(
    customer_id="c3c6260c1c9340dcbbb802603bbf8888",
    project_id="725716774999",
    region="us"
)

end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=6)

results = chronicle.nl_search(
    text="show me dns lookups",
    start_time=start_time,
    end_time=end_time,
    max_events=1
)

print(results)

 

New update for 0.1.4. Now supporting log ingestion, forwarder management, and log type discovery.
https://github.com/google/secops-wrapper?tab=readme-ov-file#log-ingestion

@raybrian I have performed a test stats search using a service account that was created to integrate Google Sec Ops SIEM To SOAR. I'm seeing an IAM permissions error being returned:

status": "PERMISSION_DENIED", "details": [ { "@type": "type.googleapis.com/google.rpc.ErrorInfo", "reason": "IAM_PERMISSION_DENIED", "domain": "chronicle.googleapis.com", "metadata": { "resource": "projects/xxxxxx", "permission": "chronicle.legacies.legacyFetchUdmSearchView"

Are there specific permissions that must be provided to the service account in order for the various actions to work?

The service account needs permission to read Chronicle API in the "BYOP" project associated with your account. Some tenants might not have migrated to the new API, or you could be using a service account associated with the older APIs. If you DM me your chronicle tenant URL and region, I can look at the details for you.

Thanks @raybrian  - I dont seem to have the ability to DM which I've created a separate post about. Once that is resolved, I'll reach out, thanks!

Thanks so much for this, @raybrian. The wrapper and your examples are very useful.

Just wanted to call out that if a service account needs to be created to use the wrapper I think the minimal role required for the service account is Chronicle API Viewer. @raybrian please correct me if I'm wrong.

Version 0.1.5 released with UDM Event Ingestion

result = chronicle.ingest_udm(udm_events=network_event)

 

New update

## [0.1.6] - 2025-04-10

### Added
- Enhanced log ingestion with batch processing capability for improved performance
- Support for ingesting multiple logs in a single API call through the existing `ingest_log` method
- Backward compatibility maintained for single log ingestion
- New Data Export API integration for exporting Chronicle logs to Google Cloud Storage
- Methods for creating, monitoring, and canceling data exports
- Support for exporting specific log types or all logs within a time range
- Comprehensive documentation and examples for Data Export functionality

### Fixed
- Resolved issues with entity summary functionality for improved entity lookups and correlation
- Fixed incorrect handling of entity relationships in entity summaries
- Corrected statistics query processing bug that affected aggregation results
- Improved error handling for statistics queries with complex aggregations

New update today includes access to specialized Gemini AI functions to get security insights, generate detection rules, explain security concepts, and more.

https://github.com/google/secops-wrapper?tab=readme-ov-file#gemini-ai

# Generate a rule to detect potential security issues
rule_response = chronicle.gemini("Write a rule to detect powershell downloading a file called gdp.zip")

# Extract the generated rule(s)
code_blocks = rule_response.get_code_blocks()
if code_blocks:
    rule = code_blocks[0].content
    print("Generated rule:", rule)

# Query Gemini with a security question
response = chronicle.gemini("What is Windows event ID 4625?")

# Get text content (combines TEXT blocks and stripped HTML content)
text_explanation = response.get_text_content()
print("Explanation:", text_explanation)

@raybrian I just read in this blog that that the UDM Entity Search is coming into preview: UDM Entity Search. In this post, I’ll explore the new… | by Chris Martin (@thatsiemguy) | Apr, 2025 ...

I know the SecOps SDK wrapper support stats UDM Events searches, but does it support stats UDM Entity Searches?

donkos_0-1745603404186.png

 

It uses the same API as entity search, so it should work. Want to test and let us know? Otherwise I can try it later this week. (Super slammed with RSA this week)