Upgrade to Python 3.11 for the latest security and functionality updates

Hello Google SecOps Users,

On July 14, 2024, Python 3.11 with migration best practices was made available in Google SecOps Platform platform alongside a new feature called Staging Mode.

Weโ€™re writing to inform you that on June 1, 2025, Python 3.7 will be decommissioned from the Google SecOps Platform and the Marketplace. Users will no longer be able to run Python 3.7 integrations in the Google SecOps Platform after this date.

We understand that migrating to Python 3.11 may require some planning, but this upgrade is designed to help you stay up to date with the latest security and functionality features.

What you need to know
Starting June 1, 2025, our platform will not support Python 3.7. We will also roll out updates for each integration according to our top used integrations to ensure most customers get the updates as quickly as possible.

We recommend updating integrations as they become available.

Custom code written in Python 3.7 will have dedicated enablement documents to help you understand the migration process and ensure it properly runs in the Google SecOps Platform.

What you need to do
To ensure continued functionality, migrate your workloads to Python 3.11 before June 1, 2025.

If you use Remote Agent to run your integrations, update to version 2.0 or higher to make sure it can run integrations written in Python 3.11.

We're committed to working closely with you to ensure a smooth transition. If you have questions or need assistance, please contact Google SecOps Support or use this Community post for discussion.

Additionally, sharing the link to the Upgrade the Python version to 3.11 article we've written on this topic.

Thank you!

 

5 6 787
6 REPLIES 6

Is there any useful, further guidance or tools than this? For example:

  • A way to determine which actions are used by which playbooks so we know what will need to be tested when we upgrade an integration.
  • A way to safely test playbooks once integrations have been updated without affecting production ones, like the staging mode.
  • A way to rollback an integration upgrade if it goes very wrong and a fix isn't in sight.

Thanks.

For anyone else who's in this situation, I've created Python scripts to solve these problems:

`chronicle_soar_report_playbook-integration_usage_v1.0.py` (to get the input for this, see the end of this comment):

"""
Description:
    This takes the .ZIP file output of "chronicle_soar_export_playbooks_v*.py" and generates an Excel report of which actions are used where.
    
Version history:
    โ€ข v1.0:
        - Author: [redacted]
        - Date: 2025/02/06
        - Changes:
            = Base functionality.
            
Improvement opportunities:
    โ€ข Support input of multiple .ZIP files that contain .JSON files, rather than one .ZIP file that contains many .ZIP files that contain .JSON files.
"""



import zipfile
import io
import json
from openpyxl import Workbook # Shell command "pip install openpyxl"
from openpyxl.styles import Font, Alignment



def output_to_excel(list_of_dicts, outputfile_path):
    workbook = Workbook()
    worksheet = workbook.active

    headers = list_of_dicts[0].keys()
    worksheet.append(list(headers))

    for cell in worksheet["1:1"]: # First row (headers)
        cell.font = Font(bold=True)

    # Write data rows
    for item in list_of_dicts:
        worksheet.append(list(item.values()))

    # Enable column filtering for all
    worksheet.auto_filter.ref = worksheet.dimensions
    
    worksheet.freeze_panes = "A2"
    
    # Filter specific columns
    headers_index_integration_name = list(headers).index("integration_name")
    python_v3_7_integrations = ["EmailUtilities", "FileUtilities", "Functions", "HTTPV2", "Lists", "SiemplifyUtilities", "TemplateEngine", "Tools"]
    worksheet.auto_filter.add_filter_column(headers_index_integration_name, python_v3_7_integrations)

    # Set column widths, enable word wrap, and set alignments
    for col_idx, column_cells in enumerate(worksheet.columns, start=1):
        max_length = max(len(str(cell.value)) if cell.value is not None else 0 for cell in column_cells)
        # adjusted_width = min(50, max_length + 2)  # Add a bit of extra space, max width of 50
        # worksheet.column_dimensions[worksheet.cell(row=1, column=col_idx).column_letter].width = adjusted_width
        max_length = max(len(str(cell.value)) if cell.value is not None else 0 for cell in column_cells)
        worksheet.column_dimensions[worksheet.cell(row=1, column=col_idx).column_letter].width = max_length + 2  # Auto size based on content
        for cell in column_cells:
            cell.alignment = Alignment(horizontal="left", vertical="center", wrap_text=True)

    workbook.save(outputfile_path)

def main():
    input_zip_file_path = input("Enter the path to the export_chronicle_soar_playbooks output .ZIP file: ").strip("'").strip('"')
    print()
    
    playbooks = []

    with zipfile.ZipFile(input_zip_file_path, "r") as input_zipfile:
        for current_inzip_file_path in input_zipfile.namelist():
            if current_inzip_file_path.endswith(".zip"):
                with input_zipfile.open(current_inzip_file_path) as current_inzip_zipextfile:
                    with zipfile.ZipFile(io.BytesIO(current_inzip_zipextfile.read()), "r") as current_inzip_zipfile:
                        for current_inzip_json_file_path in current_inzip_zipfile.namelist():
                            with current_inzip_zipfile.open(current_inzip_json_file_path) as current_inzip_json_file:
                                current_inzip_playbook = json.load(current_inzip_json_file)
                                
                                playbooks.append(current_inzip_playbook)
    
    rows_original = []
    
    for current_playbook in playbooks:
        playbook_or_block_name = current_playbook["Definition"]["Name"]
        environment_names = ", ".join(current_playbook["Definition"]["Environments"])
        folder_name = current_playbook["CategoryName"]
        
        for step in current_playbook["Definition"]["Steps"]:
            integration_name = step["Integration"]
            
            if integration_name is not None and integration_name != "Flow": # The former is the case for things like parallel action containers. The latter is the case for things like conditions / if statements.
                action_name = step["ActionName"].replace(f"{integration_name}_", "")
                
                rows_original.append(
                    {
                        "integration_name": integration_name,
                        "action_name": action_name,
                        "playbook_or_block_name": playbook_or_block_name,
                        "environment_names": environment_names,
                        "folder_name": folder_name
                    }
                )
    
    rows_deduplicated = list({tuple(current_dict.items()): current_dict for current_dict in rows_original}.values())
    rows_sorted = sorted(rows_deduplicated, key=lambda dict: (dict["integration_name"], dict["action_name"], dict["playbook_or_block_name"]))
    
    output_xlsx_file_path = input_zip_file_path + " integration usage.xlsx"
    
    print(f"Outputting to file '{output_xlsx_file_path}'...")
    print("(For the filtering options to apply, you will need to simply open the filter and then click on 'OK'.)")
    
    output_to_excel(rows_sorted, output_xlsx_file_path)

if __name__ == "__main__":
    main()

 `chronicle_soar_rename_integration_v1.3.py`:

"""
Description:
    Takes the export .ZIP file for an integration, prompts for the current and new name, changes these in all files, and outputs back to a .ZIP file.
    
Version history:
    โ€ข v1.3:
        - Author: [redacted]
        - Date: 2025/02/17
        - Changes:
            = Added logic to skip non-UTF-8 / -Unicode files (e.g., .WHL binary files in folder "Dependencies") because they were causing script failures.
    โ€ข v1.2:
        - Author: [redacted]
        - Date: 2025/02/10
        - Changes:
            = New .DEF file will have the integration name replaced too, as this is required to prevent SOAR import error "Found more than 1 integration def file in > /tmp/Package_*".
    โ€ข v1.1:
        - Author: [redacted]
        - Date: 2025/02/10
        - Changes:
            = Removed restriction to just .DEF files, as some .PY files directly referenced the integration name.
    โ€ข v1.0:
        - Author: [redacted]
        - Date: 2025/02/07
        - Changes:
            = Base functionality.
            
Improvement opportunities:
    โ€ข Support for multiple input .ZIP files, current integration names, and new integration names. 
"""



from pathlib import Path
import zipfile



def main():
    input_zip_file_path = input("Enter the path to the integration export .ZIP file: ").strip("'").strip('"')
    input_zip_file_name_no_extension = Path(input_zip_file_path).stem
    
    integration_name_old = input(f"Enter the old integration name (leave blank to use '{input_zip_file_name_no_extension}'): ")
    if not integration_name_old:
        integration_name_old = input_zip_file_name_no_extension
    
    integration_name_new_predefined = f"{integration_name_old} Test"
    integration_name_new = input(f"Enter the new integration name (leave blank to use '{integration_name_new_predefined}'): ")
    if not integration_name_new:
        integration_name_new = integration_name_new_predefined
    
    print()
    
    input_zip_folder_path = Path(input_zip_file_path).parent
    input_zip_file_extension = Path(input_zip_file_path).suffix
    
    output_zip_file_name = f"{integration_name_new}{input_zip_file_extension}"
    output_zip_file_path = Path(input_zip_folder_path).joinpath(output_zip_file_name)
    
    with zipfile.ZipFile(input_zip_file_path, "r") as input_zipfile:
        with zipfile.ZipFile(output_zip_file_path, mode="w", compression=zipfile.ZIP_DEFLATED) as output_zipfile:
            for inzip_file_current_path in input_zipfile.namelist():
                with input_zipfile.open(inzip_file_current_path) as current_inzip_file:
                    inzip_file_current_content = current_inzip_file.read()
                    
                    try:
                        inzip_file_current_string = inzip_file_current_content.decode("utf-8")
                        
                        inzip_file_new_string = inzip_file_current_string.replace(f'"{integration_name_old}"', f'"{integration_name_new}"')
                        
                        if inzip_file_current_path.endswith(".def"):
                            inzip_file_new_path = inzip_file_current_path.replace(integration_name_old, integration_name_new)
                        else:
                            inzip_file_new_path = inzip_file_current_path
                        
                        output_zipfile.writestr(inzip_file_new_path, inzip_file_new_string)
                        
                    except UnicodeDecodeError:
                        output_zipfile.writestr(inzip_file_current_path, inzip_file_current_content)
    
    print(f"Outputted to file '{output_zip_file_path}'...")

if __name__ == "__main__":
    main()

 `chronicle_soar_change_playbook_integrations_v1.4.py`:

"""
Description:
    Takes the export .ZIP file for one or more playbooks, prompts for current and new integration instance names, reconfigures all appropriate steps, and outputs back to a .ZIP file.
    
Version history:
    โ€ข v1.4:
        - Author: [redacted]
        - Date: 2025/02/13
        - Changes:
            = Changed the output mode to create 1 .ZIP file for each input .ZIP โ†’ .JSON file, as we probably want to import one playbook at a time, and it's easier to see which .ZIP file is for which playbook this way.
    โ€ข v1.3:
        - Author: [redacted]
        - Date: 2025/02/12
        - Changes:
            = Added support for parallel actions.
            = Added option to skip bundled blocks because they get duplicated on import.
    โ€ข v1.2:
        - Author: [redacted]
        - Date: 2025/02/12
        - Changes:
            = Added option to process more than one .ZIP file. 
            = Removed commented-out code from v1.1.
    โ€ข v1.1:
        - Author: [redacted]
        - Date: 2025/02/11
        - Changes:
            = Added option to change the environment.
            = I also started work to retrieve the new integration instance ID, but I didn't complete this as it seemed unnecessary.
    โ€ข v1.0:
        - Author: [redacted]
        - Date: 2025/02/11
        - Changes:
            = Base functionality.
            
Improvement opportunities:
    โ€ข None known.
"""



from pathlib import Path
from datetime import datetime
import zipfile
import json



def replace_integration_in_step(step_current, integration_names_current_list, integration_names_new_list):
    step_integration_name_current = step_current["Integration"]
    
    if step_integration_name_current in integration_names_current_list:
        integration_index = integration_names_current_list.index(step_integration_name_current)
        step_integration_name_new = integration_names_new_list[integration_index]
        
        step_new = step_current.copy()
        
        step_new["Name"] = step_new["Name"].replace(step_integration_name_current, step_integration_name_new)
        step_new["ActionName"] = step_new["ActionName"].replace(step_integration_name_current, step_integration_name_new)
        step_new["Integration"] = step_new["Integration"].replace(step_integration_name_current, step_integration_name_new)
        
        for step_parameter in step_new["Parameters"]:
            if step_parameter["Name"] == "ScriptName":
                step_parameter["Value"] = step_parameter["Value"].replace(step_integration_name_current, step_integration_name_new)
        
        return step_new
    else:
        return step_current

def main():
    suffix_predefined = "Test"
    
    integration_names_current_string = input("Enter the current integration names (not instance names), separated by commas.\n")
    integration_names_current_list = [temp_string.strip().strip("'").strip('"') for temp_string in integration_names_current_string.split(",") if temp_string]
    
    integration_names_new_string = input(f"\nEnter the new integration names (not instance names), separated by commas and in the same order. Leave blank to suffix '{suffix_predefined}' to each current one.\n")
    if integration_names_new_string:
        integration_names_new_list = [temp_string.strip().strip("'").strip('"') for temp_string in integration_names_new_string.split(",") if temp_string]
    else:
        integration_names_new_list = [f"{temp_string} {suffix_predefined}" for temp_string in integration_names_current_list]
        print(f"Integration name suffix will be '{suffix_predefined}'.")
    if len(integration_names_current_list) == 0 or len(integration_names_current_list) != len(integration_names_new_list):
        raise ValueError("Incorrect number of integration names given. Exiting...")
    if integration_names_current_list == integration_names_new_list:
        raise ValueError("Current and new integration names are the same. Exiting...")
    
    print("\nIntegration name mapping:")
    for integration_name_current, integration_name_new in zip(integration_names_current_list, integration_names_new_list):
        print(f"\t{integration_name_current} โ†’ {integration_name_new}")
    integration_names_correct = input("Confirm that the above is correct (y/n)?\n").lower()
    if not integration_names_correct.startswith("y"):
        quit()
    
    input_zip_file_paths_string = input("\nEnter the paths to the playbook export .ZIP files, separated by commas.\n")
    if input_zip_file_paths_string:
        input_zip_file_paths_list = [temp_string.strip().strip("'").strip('"') for temp_string in input_zip_file_paths_string.split(",") if temp_string]
    else:
        raise ValueError("At least one path needed. Exiting...")
    
    playbook_name_new_suffix = input(f"\nEnter a suffix for the new playbook names. Leave blank to use '{suffix_predefined}'.\n").strip()
    if not playbook_name_new_suffix:
        playbook_name_new_suffix = suffix_predefined
        print(f"Playbook suffix will be '{suffix_predefined}'.")
    
    environment_new = input("\nEnter the name of the new environment to use. Leave blank to leave unchanged.\n")
    if not environment_new:
        print("Environments will be unchanged.")
    
    skip_blocks = input("\nSkip bundled blocks (y/n)? This avoids duplication if the blocks have already been imported. Leave blank to default to yes.\n").lower()
    if not skip_blocks or skip_blocks.startswith("y"):
        skip_blocks = True
        print("Bundled blocks will be skipped.")
    elif skip_blocks.startswith("n"):
        skip_blocks = False
        print("Bundled blocks will be processed.")
    else:
        raise ValueError("Incorrect value given. Exiting...")
            
    print()
    
    for input_zip_file_path in input_zip_file_paths_list:
        print(f"Processing input file '{input_zip_file_path}'...")
        
        input_zip_folder_path = Path(input_zip_file_path).parent
        input_zip_file_extension = Path(input_zip_file_path).suffix
        input_zip_file_datetime_modified_original = Path(input_zip_file_path).stat().st_mtime
        input_zip_file_datetime_modified_readable = datetime.fromtimestamp(input_zip_file_datetime_modified_original).strftime("%Y-%m-%d %H-%M-%S")
        
        with zipfile.ZipFile(input_zip_file_path, "r") as input_zipfile:
            inzip_files_list = input_zipfile.namelist()
            inzip_files_count = len(inzip_files_list)
            
            for inzip_file_current_path in inzip_files_list:
               if inzip_file_current_path.endswith(".json"):
                    with input_zipfile.open(inzip_file_current_path) as playbook_file_current:
                        playbook_file_current_string = playbook_file_current.read().decode("utf-8")
                        playbook_file_current_dict = json.loads(playbook_file_current_string)
                        
                        if playbook_file_current_dict["Definition"]["PlaybookType"] == 0:
                            playbook_type = "playbook"
                        elif playbook_file_current_dict["Definition"]["PlaybookType"] == 1:
                            playbook_type = "block"
                        
                        if skip_blocks == True and playbook_type == "block" and inzip_files_count > 1:
                            print(f"\tSkipping block file '{inzip_file_current_path}' as it's a bundled with a playbook file.")
                            continue
                        
                        playbook_name_current = playbook_file_current_dict["Definition"]["Name"]
                        playbook_name_new = f"{playbook_name_current} {playbook_name_new_suffix}"
                        
                        playbook_file_new_dict = playbook_file_current_dict.copy()
                        
                        if environment_new:
                            playbook_file_new_dict["Definition"]["Environments"] = [environment_new]
                        
                        for step_index, step_dict in enumerate(playbook_file_new_dict["Definition"]["Steps"]):
                            step_integration_name_current = step_dict["Integration"]
                            
                            if step_integration_name_current == "Flow":
                                continue
                            else:
                                if step_dict["ActionName"] == "ParallelActionsContainer":
                                    for parallel_action_index, parallel_action_dict in enumerate(step_dict["ParallelActions"]):
                                        step_dict["ParallelActions"][parallel_action_index] = replace_integration_in_step(parallel_action_dict, integration_names_current_list, integration_names_new_list)
                                else:
                                    playbook_file_new_dict["Definition"]["Steps"][step_index] = replace_integration_in_step(step_dict, integration_names_current_list, integration_names_new_list)
                                    
                        playbook_file_new_string = json.dumps(playbook_file_new_dict).replace(f'"{playbook_name_current}"', f'"{playbook_name_new}"')
                        
                        playbook_file_new_path = inzip_file_current_path.replace(playbook_name_current, playbook_name_new)
                        playbook_file_new_name_no_extension = Path(playbook_file_new_path).stem
                        output_zip_file_name = f"{playbook_file_new_name_no_extension} ({input_zip_file_datetime_modified_readable}) reconfigured{input_zip_file_extension}"
                        output_zip_file_path = Path(input_zip_folder_path).joinpath(output_zip_file_name)
                        
                        with zipfile.ZipFile(output_zip_file_path, mode="w", compression=zipfile.ZIP_DEFLATED) as output_zipfile:
                            output_zipfile.writestr(playbook_file_new_path, playbook_file_new_string)
                            
                            print(f"\tConverted {playbook_type} file '{inzip_file_current_path}' to file '{playbook_file_new_path}'.")
        
                        print(f"\tOutputted to file '{output_zip_file_path}'.\n")

if __name__ == "__main__":
    main()

 

As a bonus, below are my scripts to back up content.

`chronicle_soar_export_integrations_v1.8.py`:

"""
Description:
    This exports / backs up Chronicle (now Google SecOps) SOAR integrations (managers, connectors, jobs, actions, etc).
    
Version history:
    โ€ข v1.8:
        - Author: [redacted]
        - Date: 2025/02/14
        - Changes:
            = Refactored to use new best practices template.
            = Tweaked output and get_credential() making them a bit more user friendly.
    โ€ข v1.7:
        - Author: [redacted]
        - Date: 2024/06/05
        - Changes:
            = Removed unnecessary import of zlib, added use of "with" for creating ZIP file so lock is removed if anything goes wrong, various minor refactors.
    โ€ข v1.6:
        - Author: [redacted]
        - Date: 2024/05/31
        - Changes:
            = Added code to ensure that integrations_failed is extended when the ZIP write fails too, various minor refactors, added note file inside of .ZIP file to say which script version and who created it.
    โ€ข v1.5:
        - Author: [redacted]
        - Date: 2024/05/29
        - Changes:
            = Parameterised get_credential().
    โ€ข v1.4:
        - Author: [redacted]
        - Date: 2024/04/09
        - Changes:
            = Added output of any failed exports at the end.
    โ€ข v1.3:
        - Author: [redacted]
        - Date: 2024/04/05
        - Changes:
            = Added instructions on setting up the environment variable for get_credential(), various minor refactors.
    โ€ข v1.2:
        - Author: [redacted]
        - Date: 2024/01/30
        - Changes:
            = Implemented get_credential().
    โ€ข v1.1:
        - Author: [redacted]
        - Date: 2024/01/29
        - Changes:
            = Implemented saving of all .ZIP files into a single .ZIP file.
    โ€ข v1.0:
        - Author: [redacted]
        - Date: 2024/01/26
        - Changes:
            = Base functionality.
            
Improvement opportunities:
    โ€ข None known.
"""



# Getting credentials
import os
import getpass
# Calling APIs
import requests
import urllib.parse
# Outputting files
import os.path
from datetime import datetime
import zipfile


def get_credential(api_credential_environmentvariable_key):
    if api_credential_environmentvariable_key in os.environ:
        api_credential_value = os.environ[api_credential_environmentvariable_key]
    else:
        print(f"Credential not found in environment variable '{api_credential_environmentvariable_key}'. To set this up for your user profile:")
        print(f"\t1. Run the following PowerShell command: $API_Key_Secure = Read-Host -Prompt 'Enter your API key' -AsSecureString; [Environment]::SetEnvironmentVariable('{api_credential_environmentvariable_key}', $([System.Net.NetworkCredential]::new('', $API_Key_Secure).Password), 'User')")
        print("\t2. Restart this shell / app so that it's loaded into memory and accessible.")
        print(f"\tTo revert the above, run the following PowerShell command: [Environment]::SetEnvironmentVariable('{api_credential_environmentvariable_key}', $null, 'User')")
        print()
        api_credential_value = getpass.getpass(prompt=f"If you just want to run this session, enter the credential: ")
        print()
    
    return api_credential_value

def main():    
    api_general_url = "https://[redacted].siemplify-soar.com"
    api_general_key = get_credential("chronicle_soar_api_credential")
    api_general_request_headers = {
        "AppKey": api_general_key,
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

    try:
        api_getinstalledintegrations_url = api_general_url + "/api/external/v1/integrations/GetInstalledIntegrations"
        print(f"Getting details of all installed integrations from {api_getinstalledintegrations_url} ...")
        api_getinstalledintegrations_response = requests.get(url=api_getinstalledintegrations_url, headers=api_general_request_headers)

        if (api_getinstalledintegrations_response.status_code == 200):
            api_getinstalledintegrations_response_json = api_getinstalledintegrations_response.json()
            
        else:
            raise Exception(f"API call to {api_getinstalledintegrations_url} failed with status code {api_getinstalledintegrations_response.status_code}.")
            
        integrations_failed = []

        datetime_now_iso8601 = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        zip_main_file_name = f"Chronicle SOAR integrations ({datetime_now_iso8601}).zip"
        zip_main_folder_path = os.path.join(os.environ["USERPROFILE"], "Downloads")
        zip_main_file_path = os.path.join(zip_main_folder_path, zip_main_file_name)
        
        with zipfile.ZipFile(zip_main_file_path, mode="a") as zip_main_object: # Mode is create and/or append.
            print(f"\nCreated and opened file '{zip_main_file_path}' for writing.")
            
            for index, integration in enumerate(api_getinstalledintegrations_response_json):
                integration_number_current = index + 1
                integration_number_total = len(api_getinstalledintegrations_response_json)
                integration_name_original = integration["identifier"]
                
                print(f"\nProcessing integration {integration_number_current} of {integration_number_total}: '{integration_name_original}'...")
                
                integration_name_urlencoded = urllib.parse.quote(integration_name_original)
                api_exportpackage_url = api_general_url + "/api/external/v1/ide/ExportPackage/" + integration_name_urlencoded
                print(f"\tExporting integration from {api_exportpackage_url} ...")
                api_exportpackage_response = requests.get(url=api_exportpackage_url, headers=api_general_request_headers)

                if (api_exportpackage_response.status_code == 200):
                    zip_integration_file_name = f"{integration_name_original} ({datetime_now_iso8601}).zip"
                    
                    try:
                        zip_main_object.writestr(zip_integration_file_name, api_exportpackage_response.content)
                    except Exception as error:
                        print(f"\tError adding file '{zip_integration_file_name}' to file '{zip_main_file_path}'. Details:")
                        print(error)
                        integrations_failed.extend(integration_name_original)
                    else:
                        print(f"\tSuccessfully added file '{zip_integration_file_name}' to .ZIP.")
                    
                else:
                    print(f"\tAPI call to {api_exportpackage_url} failed with status code {api_exportpackage_response.status_code}.")
                    integrations_failed.extend(integration_name_original)

            zip_main_object.writestr(f"Created by script {os.path.basename(__file__)}, run by user {os.environ['username']}", "")

    except Exception as error:
        print("\nGeneral error running script. Details:")
        print(error)
        raise

    else:
        if (len(integrations_failed) != 0):  
            print("\nWARNING: Export failed for the following integrations:")
            print("\n".join(integrations_failed))
            print()
        
        print(f"\nSaved integrations' .ZIP files to main file '{zip_main_file_path}'.")

if __name__ == "__main__":
    main()

`chronicle_soar_export_playbooks_v1.7.py`:

"""
Description:
    This exports / backs up Chronicle (now Google SecOps) SOAR playbooks and blocks.
    
Version history:
    โ€ข v1.7:
        - Author: [redacted]
        - Date: 2025/02/14
        - Changes:
            = Refactored to use new best practices template.
            = Tweaked output and get_credential() making them a bit more user friendly.
            = Now differentiates between playbooks and blocks.
    โ€ข v1.6:
        - Author: [redacted]
        - Date: 2024/06/05
        - Changes:
            = Removed unnecessary import of zlib, added use of "with" for creating ZIP file so lock is removed if anything goes wrong, various minor refactors.
    โ€ข v1.5:
        - Author: [redacted]
        - Date: 2024/05/31
        - Changes:
            = Added note file inside of .ZIP file to say which script version and who created it.
    โ€ข v1.4:
        - Author: [redacted]
        - Date: 2024/05/31
        - Changes:
            = Added code to ensure that playbooks_ids_and_names_failed is updated when the ZIP write fails too, various minor refactors.
    โ€ข v1.3:
        - Author: [redacted]
        - Date: 2024/05/29
        - Changes:
            = Parameterised get_credential(), adjusted output slightly.
    โ€ข v1.2:
        - Author: [redacted]
        - Date: 2024/04/18
        - Changes:
            = Added preservation of folder structure.
    โ€ข v1.1:
        - Author: [redacted]
        - Date: 2024/04/05
        - Changes:
            = Added instructions on setting up the environment variable for get_credential(), various minor refactors.
    โ€ข v1.0:
        - Author: [redacted]
        - Date: 2024/04/02
        - Changes:
            = Base functionality.
            
Improvement opportunities:
    โ€ข None known.
"""



# Getting credentials
import os
import getpass
# Calling APIs
import requests
import json
# Outputting files
import os.path
from datetime import datetime
import zipfile
import base64



def get_credential(api_credential_environmentvariable_key):
    if api_credential_environmentvariable_key in os.environ:
        api_credential_value = os.environ[api_credential_environmentvariable_key]
    else:
        print(f"Credential not found in environment variable '{api_credential_environmentvariable_key}'. To set this up for your user profile:")
        print(f"\t1. Run the following PowerShell command: $API_Key_Secure = Read-Host -Prompt 'Enter your API key' -AsSecureString; [Environment]::SetEnvironmentVariable('{api_credential_environmentvariable_key}', $([System.Net.NetworkCredential]::new('', $API_Key_Secure).Password), 'User')")
        print("\t2. Restart this shell / app so that it's loaded into memory and accessible.")
        print(f"\tTo revert the above, run the following PowerShell command: [Environment]::SetEnvironmentVariable('{api_credential_environmentvariable_key}', $null, 'User')")
        print()
        api_credential_value = getpass.getpass(prompt=f"If you just want to run this session, enter the credential: ")
        print()
    
    return api_credential_value

def main():    
    api_general_url = "https://[redacted].siemplify-soar.com"
    api_general_key = get_credential("chronicle_soar_api_credential")
    api_general_request_headers = {
        "AppKey": api_general_key,
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

    try:
        api_getplaybooks_url = api_general_url + "/api/external/v1/playbooks/GetWorkflowMenuCardsWithEnvFilter"
        print(f"Getting details of all playbooks and blocks from {api_getplaybooks_url} ...")
        api_getplaybooks_response = requests.post(url=api_getplaybooks_url, headers=api_general_request_headers, data="[1,0]") # This API endpoint is entirely undocumented (at /swagger/index.html and Google), so I had to rely on reverse engineering via Chromium โ†’ DevTools โ†’ Network. Changing the data to 0 OR 1 does affect and reduce the output, but in no way that seems to correlate to assigned environments or whether it's enabled or something. Changing the data to include higher numbers breaks it. So I've simply mirrored what the web UI does.

        if (api_getplaybooks_response.status_code == 200):
            api_getplaybooks_response_json = api_getplaybooks_response.json()
            
        else:
            raise Exception(f"API call to {api_getplaybooks_url} failed with status code {api_getplaybooks_response.status_code}.")

        api_exportplaybooks_url = api_general_url + "/api/external/v1/playbooks/ExportDefinitions"
        # Some items can fail to export with HTTP status code 500, no given reason, and no information on which couldn't be exported, so we export one at a time to work around this and report the problematic ones.
        items_ids_and_names_failed = {}

        datetime_now_iso8601 = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        zip_main_file_name = f"Chronicle SOAR playbooks and blocks ({datetime_now_iso8601}).zip"
        zip_main_folder_path = os.path.join(os.environ["USERPROFILE"], "Downloads")
        zip_main_file_path = os.path.join(zip_main_folder_path, zip_main_file_name)
        
        with zipfile.ZipFile(zip_main_file_path, mode="a") as zip_main_object: # Mode is create and/or append.
            print(f"\nCreated and opened file '{zip_main_file_path}' for writing.")
            
            for index, item in enumerate(api_getplaybooks_response_json):
                item_number_current = index + 1
                item_number_total = len(api_getplaybooks_response_json)
                item_id = item["identifier"]
                item_name = item["name"]
                item_folder = item["categoryName"]
                item_type = item["playbookType"]
                if item_type == 0:
                    item_type = "playbook"
                elif item_type == 1:
                    item_type = "block"
                
                print(f"\nProcessing item {item_number_current} of {item_number_total}: '{item_name}'...")
                
                print(f"\tExporting {item_type}...")
                api_exportplaybooks_request_body = {
                    "identifiers": [item_id] # This needs to be a list / array.
                }
                api_exportplaybooks_response = requests.post(url=api_exportplaybooks_url, headers=api_general_request_headers, json=api_exportplaybooks_request_body)

                if (api_exportplaybooks_response.status_code == 200):
                    zip_item_file_name = f"{item_type.title()} - {item_name} ({item_id}).zip" # Chronicle SOAR supports multiple playbooks with the same name, but OSes don't support multiple files with the same name, so the GUID is included to resolve this.
                    zip_item_file_path_relative = os.path.join(item_folder, zip_item_file_name)

                    item_blob_decoded = base64.b64decode(api_exportplaybooks_response.json()["blob"]) # API response is '{"fileName": "<number>_playbook(s)_<date>", "blob": "<Base64-encoded binary"}', so this extracts and decodes it.
                    
                    try:
                        # .ZIP files are added as-is, rather adding their contents, because some contain multiple, related files like blocks.
                        zip_main_object.writestr(zip_item_file_path_relative, item_blob_decoded)
                    except Exception as error:
                        print(f"\tError adding file '{zip_item_file_name}' to .ZIP โ†’ folder '{item_folder}'. Details:")
                        print(error)
                        items_ids_and_names_failed.update({item_id: item_name})
                    else:
                        print(f"\tSuccessfully added file '{zip_item_file_name}' to .ZIP โ†’ folder '{item_folder}'.")
                    
                else:
                    print(f"\tERROR: API call failed with status code {api_exportplaybooks_response.status_code}. Item: '{item_name}' ({item_id}).")
                    items_ids_and_names_failed.update({item_id: item_name})

            zip_main_object.writestr(f"Created by script {os.path.basename(__file__)}, run by user {os.environ['username']}", "")

    except Exception as error:
        print("\nGeneral error running script. Details:")
        print(error)
        raise

    else:
        if (len(items_ids_and_names_failed) != 0):  
            print("\nWARNING: Export failed for the following items (one possible cause is playbooks referencing missing blocks):")
            print(json.dumps(items_ids_and_names_failed, indent=4))
        
        print(f"\nSaved output to file '{zip_main_file_path}'.")

if __name__ == "__main__":
    main()

 

Significantly updated versions:

chronicle_soar_report_object_usage_v2.0.py:

"""
Description:
    This takes the .ZIP file output of "chronicle_soar_export_playbooks_v*.py" / "chronicle_soar_export_content_v*" and generates an Excel report of which integrations, blocks, and actions are used where.
    
Version history:
    โ€ข v2.0:
        - Author: [redacted]
        - Date: 2025/03/13
        - Changes:
            = Split playbooks and blocks into separate sheets so you can also see where blocks are used.
            = Changed the output filename to exclude ".zip".
    โ€ข v1.1:
        - Author: [redacted]
        - Date: 2025/03/12
        - Changes:
            = Fixed issue where steps inside parallel actions containers weren't being reported on.
            = Added information on creator.
            = Added information on whether the object is a playbook or a block.
    โ€ข v1.0:
        - Author: [redacted]
        - Date: 2025/02/06
        - Changes:
            = Base functionality.
            
Improvement opportunities:
    โ€ข Support input of multiple .ZIP files that contain .JSON files, rather than one .ZIP file that contains many .ZIP files that contain .JSON files.
"""



import zipfile
import io
import json
from openpyxl import Workbook # Shell command "pip install openpyxl"
from openpyxl.styles import Font, Alignment



def output_to_excel(worksheets, outputfile_path):
    workbook = Workbook()
    
    default_sheet = workbook.active
    workbook.remove(default_sheet)

    for worksheet_name, worksheet_rows in worksheets.items():
        worksheet = workbook.create_sheet(title=worksheet_name)

        headers = worksheet_rows[0].keys()
        worksheet.append(list(headers))

        for cell in worksheet["1:1"]: # First row (headers)
            cell.font = Font(bold=True)

        # Write rows
        for item in worksheet_rows:
            worksheet.append(list(item.values()))

        # Enable column filtering for all
        worksheet.auto_filter.ref = worksheet.dimensions
        
        worksheet.freeze_panes = "B2"
        
        if worksheet_name == "Integrations":
            # Filter specific columns
            headers_index_integration_name = list(headers).index("integration_name")
            python_v3_7_integrations = ["Chronicle SOAR", "EmailUtilities", "FileUtilities", "Functions", "HTTPV2", "Lists", "MicrosoftAzureSentinel", "SiemplifyUtilities", "TemplateEngine", "Tools"] 
            worksheet.auto_filter.add_filter_column(headers_index_integration_name, python_v3_7_integrations)

        # Set column widths, enable word wrap, and set alignments
        for col_idx, column_cells in enumerate(worksheet.columns, start=1):
            max_length = max(len(str(cell.value)) if cell.value is not None else 0 for cell in column_cells)
            # adjusted_width = min(50, max_length + 2)  # Add a bit of extra space, max width of 50
            # worksheet.column_dimensions[worksheet.cell(row=1, column=col_idx).column_letter].width = adjusted_width
            worksheet.column_dimensions[worksheet.cell(row=1, column=col_idx).column_letter].width = max_length + 2  # Auto size based on content
            for cell in column_cells:
                cell.alignment = Alignment(horizontal="left", vertical="center", wrap_text=True)

    workbook.save(outputfile_path)
    
def build_row_block(creator_id, block_name, playbook_name, environment_names, folder_name):    
    row_new = {
        "block_name": block_name,
        "playbook_name": playbook_name,
        "playbook_environment_names": environment_names,
        "playbook_folder_name": folder_name,
        "playbook_creator": creator_id
    }
    
    return row_new
    
def build_row_integration(creator_id, playbook_or_block, playbook_or_block_name, environment_names, folder_name, step):
    integration_name = step["Integration"]
    
    action_name = step["ActionName"].replace(f"{integration_name}_", "")
    
    row_new = {
        "integration_name": integration_name,
        "action_name": action_name,
        "used_by": playbook_or_block,
        "playbook_or_block_name": playbook_or_block_name,
        "playbook_or_block_environment_names": environment_names,
        "playbook_or_block_folder_name": folder_name,
        "playbook_or_block_creator": creator_id
    }
    
    return row_new

def main():
    input_zip_file_path = input("Enter the path to the .ZIP file containing playbook / block .ZIP files: ").strip("'").strip('"')
    print()
    
    creator_guid_name_map = {
        "12345678-1234-1234-1234-123456789012": "Firstname Lastname",
        "12345678-1234-1234-1234-123456789012": "Firstname Lastname",
        "12345678-1234-1234-1234-123456789012": "Firstname Lastname"
    }
    
    playbooks_and_blocks_definitions = []

    with zipfile.ZipFile(input_zip_file_path, "r") as input_zipfile:
        for current_inzip_file_path in input_zipfile.namelist():
            if current_inzip_file_path.endswith(".zip"):
                with input_zipfile.open(current_inzip_file_path) as current_inzip_zipextfile:
                    with zipfile.ZipFile(io.BytesIO(current_inzip_zipextfile.read()), "r") as current_inzip_zipfile:
                        for current_inzip_json_file_path in current_inzip_zipfile.namelist():
                            with current_inzip_zipfile.open(current_inzip_json_file_path) as current_inzip_json_file:
                                current_inzip_playbook = json.load(current_inzip_json_file)
                                
                                playbooks_and_blocks_definitions.append(current_inzip_playbook)
    
    worksheets = {
        "Integrations": [],
        "Blocks": []
    }
    for current_playbook in playbooks_and_blocks_definitions:
        if current_playbook["Definition"]["PlaybookType"] == 1:
            playbook_or_block = "block"
        else: # 0
            playbook_or_block = "playbook"
        
        playbook_or_block_name = current_playbook["Definition"]["Name"]
        
        environment_names = ", ".join(current_playbook["Definition"]["Environments"])
        
        folder_name = current_playbook["CategoryName"]
        
        creator_id = current_playbook["Definition"]["Creator"]
        if creator_id in creator_guid_name_map:
            creator_id = creator_guid_name_map[creator_id]
        
        for step in current_playbook["Definition"]["Steps"]:
            if step["Integration"] == "Flow": # This is the case for things like conditions / if statements.
                continue
            else:
                # Normal actions
                if step["ActionName"] == "ParallelActionsContainer":
                    for parallel_action in step["ParallelActions"]:
                        row_new = build_row_integration(creator_id, playbook_or_block, playbook_or_block_name, environment_names, folder_name, parallel_action)
                        if row_new not in worksheets["Integrations"]:
                            worksheets["Integrations"].append(row_new)
                else:
                    row_new = build_row_integration(creator_id, playbook_or_block, playbook_or_block_name, environment_names, folder_name, step)
                    if row_new not in worksheets["Integrations"]:
                        worksheets["Integrations"].append(row_new)
                
                # Blocks
                if step["ActionName"] == "NestedAction": # Blocks can't be added to parallel action containers, so this is fine to do here.
                    block_name = step["Name"]
                    
                    row_new = build_row_block(creator_id, block_name, playbook_or_block_name, environment_names, folder_name)
                    if row_new not in worksheets["Blocks"]:
                        worksheets["Blocks"].append(row_new)
    
    worksheets_sorted = {}
    for worksheet_name, worksheet_rows in worksheets.items():
        if worksheet_name == "Integrations":
            worksheets_sorted[worksheet_name] = sorted(worksheet_rows, key=lambda dict: (dict["integration_name"], dict["action_name"], dict["playbook_or_block_name"]))
        elif worksheet_name == "Blocks":
            worksheets_sorted[worksheet_name] = sorted(worksheet_rows, key=lambda dict: (dict["block_name"]))
    
    output_xlsx_file_path = input_zip_file_path.rsplit(".")[0] + " integration usage.xlsx"
    
    print(f"Outputting to file '{output_xlsx_file_path}'...")
    print("(For the filtering options to apply, you will need to simply open the filter and then click on 'OK'.)")
    
    output_to_excel(worksheets_sorted, output_xlsx_file_path)

if __name__ == "__main__":
    main()

chronicle_soar_export_content_v2.0.py:

"""
Description:
    This exports / backs up all Chronicle (now Google SecOps) SOAR content:
        โ€ข Playbooks / blocks.
        โ€ข Integrations (managers, connectors, jobs, actions, etc).
        โ€ข Custom lists.
        โ€ข Jobs.
        โ€ข Connectors.
    
Version history:
    โ€ข v2.0:
        - Author: [redacted]
        - Date: 2025/02/28 to 2025/03/03
        - Changes:
            = Merged "chronicle_soar_export_integrations_v1.8" and "chronicle_soar_export_playbooks_v1.7".
            = Added code to export custom lists.
            = Added code to export jobs.
            = Added code to export connectors.

"chronicle_soar_export_integrations" version history:
    โ€ข v1.8:
        - Author: [redacted]
        - Date: 2025/02/14
        - Changes:
            = Refactored to use new best practices template.
            = Tweaked output and get_credential() making them a bit more user friendly.
    โ€ข v1.7:
        - Author: [redacted]
        - Date: 2024/06/05
        - Changes:
            = Removed unnecessary import of zlib, added use of "with" for creating ZIP file so lock is removed if anything goes wrong, various minor refactors.
    โ€ข v1.6:
        - Author: [redacted]
        - Date: 2024/05/31
        - Changes:
            = Added code to ensure that integrations_failed is extended when the ZIP write fails too, various minor refactors, added note file inside of .ZIP file to say which script version and who created it.
    โ€ข v1.5:
        - Author: [redacted]
        - Date: 2024/05/29
        - Changes:
            = Parameterised get_credential().
    โ€ข v1.4:
        - Author: [redacted]
        - Date: 2024/04/09
        - Changes:
            = Added output of any failed exports at the end.
    โ€ข v1.3:
        - Author: [redacted]
        - Date: 2024/04/05
        - Changes:
            = Added instructions on setting up the environment variable for get_credential(), various minor refactors.
    โ€ข v1.2:
        - Author: [redacted]
        - Date: 2024/01/30
        - Changes:
            = Implemented get_credential().
    โ€ข v1.1:
        - Author: [redacted]
        - Date: 2024/01/29
        - Changes:
            = Implemented saving of all .ZIP files into a single .ZIP file.
    โ€ข v1.0:
        - Author: [redacted]
        - Date: 2024/01/26
        - Changes:
            = Base functionality.
    
"chronicle_soar_export_playbooks" version history:
    โ€ข v1.7:
        - Author: 
        - Date: 2025/02/14
        - Changes:
            = Refactored to use new best practices template.
            = Tweaked output and get_credential() making them a bit more user friendly.
            = Now differentiates between playbooks and blocks.
    โ€ข v1.6:
        - Author: 
        - Date: 2024/06/05
        - Changes:
            = Removed unnecessary import of zlib, added use of "with" for creating ZIP file so lock is removed if anything goes wrong, various minor refactors.
    โ€ข v1.5:
        - Author: 
        - Date: 2024/05/31
        - Changes:
            = Added note file inside of .ZIP file to say which script version and who created it.
    โ€ข v1.4:
        - Author: 
        - Date: 2024/05/31
        - Changes:
            = Added code to ensure that playbooks_ids_and_names_failed is updated when the ZIP write fails too, various minor refactors.
    โ€ข v1.3:
        - Author: 
        - Date: 2024/05/29
        - Changes:
            = Parameterised get_credential(), adjusted output slightly.
    โ€ข v1.2:
        - Author: 
        - Date: 2024/04/18
        - Changes:
            = Added preservation of folder structure.
    โ€ข v1.1:
        - Author: 
        - Date: 2024/04/05
        - Changes:
            = Added instructions on setting up the environment variable for get_credential(), various minor refactors.
    โ€ข v1.0:
        - Author: 
        - Date: 2024/04/02
        - Changes:
            = Base functionality.
            
Improvement opportunities:
    โ€ข None known.
"""



# Getting credentials
import os
import getpass
# Calling APIs
import requests
import urllib.parse
# Outputting files
import os.path
from datetime import datetime
import zipfile
import base64
import json
import csv



def get_credential(api_credential_environmentvariable_key):
    if api_credential_environmentvariable_key in os.environ:
        api_credential_value = os.environ[api_credential_environmentvariable_key]
    else:
        print(f"Credential not found in environment variable '{api_credential_environmentvariable_key}'. To set this up for your user profile:")
        print(f"\t1. Run the following PowerShell command: $API_Key_Secure = Read-Host -Prompt 'Enter your API key' -AsSecureString; [Environment]::SetEnvironmentVariable('{api_credential_environmentvariable_key}', $([System.Net.NetworkCredential]::new('', $API_Key_Secure).Password), 'User')")
        print("\t2. Restart this shell / app so that it's loaded into memory and accessible.")
        print(f"\tTo revert the above, run the following PowerShell command: [Environment]::SetEnvironmentVariable('{api_credential_environmentvariable_key}', $null, 'User')")
        print()
        api_credential_value = getpass.getpass(prompt=f"If you just want to run this session, enter the credential: ")
        print()
    
    return api_credential_value

def main():    
    general_api_url = "https://yourinstancename.siemplify-soar.com"
    general_api_key = get_credential("chronicle_soar_api_credential")
    general_api_request_headers = {
        "AppKey": general_api_key,
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    
    general_datetime_format_iso8601 = "%Y-%m-%d %H-%M-%S"
    
    general_output_folder_path = os.path.join(os.environ["USERPROFILE"], "Downloads")

    try:
        # Integrations
        
        integrations_api_get_url = general_api_url + "/api/external/v1/integrations/GetInstalledIntegrations"
        print(f"Getting details of all installed integrations from {integrations_api_get_url} ...")
        integrations_api_get_response = requests.get(url=integrations_api_get_url, headers=general_api_request_headers)

        if (integrations_api_get_response.status_code == 200):
            integrations_api_get_response_json = integrations_api_get_response.json()
        else:
            raise Exception(f"API call to {integrations_api_get_url} failed with status code {integrations_api_get_response.status_code}.")
            
        integrations_failed = []

        datetime_now = datetime.now().strftime(general_datetime_format_iso8601)
        integrations_zip_main_file_name = f"Chronicle SOAR integrations ({datetime_now}).zip"
        integrations_zip_main_file_path = os.path.join(general_output_folder_path, integrations_zip_main_file_name)
        
        with zipfile.ZipFile(integrations_zip_main_file_path, mode="a") as integrations_zip_main_object: # Mode is create and/or append.
            print(f"\nCreated and opened file '{integrations_zip_main_file_path}' for writing.")
            
            for index, integration in enumerate(integrations_api_get_response_json):
                integration_number_current = index + 1
                integration_number_total = len(integrations_api_get_response_json)
                integration_name_original = integration["identifier"]
                
                print(f"\nProcessing integration {integration_number_current} of {integration_number_total}: '{integration_name_original}'...")
                
                integration_name_urlencoded = urllib.parse.quote(integration_name_original)
                api_exportpackage_url = general_api_url + "/api/external/v1/ide/ExportPackage/" + integration_name_urlencoded
                print(f"\tExporting integration from {api_exportpackage_url} ...")
                api_exportpackage_response = requests.get(url=api_exportpackage_url, headers=general_api_request_headers)

                if (api_exportpackage_response.status_code == 200):
                    zip_integration_file_name = f"{integration_name_original} ({datetime_now}).zip"
                    
                    try:
                        integrations_zip_main_object.writestr(zip_integration_file_name, api_exportpackage_response.content)
                    except Exception as error:
                        print(f"\tError adding file '{zip_integration_file_name}' to file '{integrations_zip_main_file_path}'. Details:")
                        print(error)
                        integrations_failed.extend(integration_name_original)
                    else:
                        print(f"\tSuccessfully added file '{zip_integration_file_name}' to main .ZIP.")
                    
                else:
                    print(f"\tAPI call to {api_exportpackage_url} failed with status code {api_exportpackage_response.status_code}.")
                    integrations_failed.extend(integration_name_original)

            integrations_zip_main_object.writestr(f"Created by script {os.path.basename(__file__)}, run by user {os.environ['username']}", "")
            
        if (len(integrations_failed) != 0):  
            print("\nWARNING: Export failed for the following integrations:")
            print("\n".join(integrations_failed))
            print()
        
        print(f"\nSaved integrations' .ZIP files to main file '{integrations_zip_main_file_path}'.")
        
        print(f"\n------------------------------------------------------------------------------\n")
        
        # Playbooks
        
        playbooks_api_get_url = general_api_url + "/api/external/v1/playbooks/GetWorkflowMenuCardsWithEnvFilter"
        print(f"Getting details of all playbooks and blocks from {playbooks_api_get_url} ...")
        playbooks_api_get_response = requests.post(url=playbooks_api_get_url, headers=general_api_request_headers, data="[1,0]") # This API endpoint is entirely undocumented (at /swagger/index.html and Google), so I had to rely on reverse engineering via Chromium โ†’ DevTools โ†’ Network. Changing the data to 0 OR 1 does affect and reduce the output, but in no way that seems to correlate to assigned environments or whether it's enabled or something. Changing the data to include higher numbers breaks it. So I've simply mirrored what the web UI does.

        if (playbooks_api_get_response.status_code == 200):
            playbooks_api_get_response_json = playbooks_api_get_response.json()
        else:
            raise Exception(f"API call to {playbooks_api_get_url} failed with status code {playbooks_api_get_response.status_code}.")

        api_exportplaybooks_url = general_api_url + "/api/external/v1/playbooks/ExportDefinitions"
        # Some items can fail to export with HTTP status code 500, no given reason, and no information on which couldn't be exported, so we export one at a time to work around this and report the problematic ones.
        items_ids_and_names_failed = {}

        datetime_now = datetime.now().strftime(general_datetime_format_iso8601)
        playbooks_zip_main_file_name = f"Chronicle SOAR playbooks and blocks ({datetime_now}).zip"
        playbooks_zip_main_file_path = os.path.join(general_output_folder_path, playbooks_zip_main_file_name)
        
        with zipfile.ZipFile(playbooks_zip_main_file_path, mode="a") as playbooks_zip_main_object: # Mode is create and/or append.
            print(f"\nCreated and opened file '{playbooks_zip_main_file_path}' for writing.")
            
            playbook_number_total = len(playbooks_api_get_response_json)
            for index, item in enumerate(playbooks_api_get_response_json):
                item_number_current = index + 1
                item_id = item["identifier"]
                item_name = item["name"]
                item_folder = item["categoryName"]
                item_type = item["playbookType"]
                if item_type == 0:
                    item_type = "playbook"
                elif item_type == 1:
                    item_type = "block"
                
                print(f"\nProcessing item {item_number_current} of {playbook_number_total}: '{item_name}'...")
                
                print(f"\tExporting {item_type}...")
                api_exportplaybooks_request_body = {
                    "identifiers": [item_id] # This needs to be a list / array.
                }
                api_exportplaybooks_response = requests.post(url=api_exportplaybooks_url, headers=general_api_request_headers, json=api_exportplaybooks_request_body)

                if (api_exportplaybooks_response.status_code == 200):
                    zip_item_file_name = f"{item_type.title()} - {item_name} ({item_id}).zip" # Chronicle SOAR supports multiple playbooks with the same name, but OSes don't support multiple files with the same name, so the GUID is included to resolve this.
                    zip_item_file_path_relative = os.path.join(item_folder, zip_item_file_name)

                    item_blob_decoded = base64.b64decode(api_exportplaybooks_response.json()["blob"]) # API response is '{"fileName": "<number>_playbook(s)_<date>", "blob": "<Base64-encoded binary"}', so this extracts and decodes it.
                    
                    try:
                        # .ZIP files are added as-is, rather adding their contents, because some contain multiple, related files like blocks.
                        playbooks_zip_main_object.writestr(zip_item_file_path_relative, item_blob_decoded)
                    except Exception as error:
                        print(f"\tError adding file '{zip_item_file_name}' to main .ZIP โ†’ folder '{item_folder}'. Details:")
                        print(error)
                        items_ids_and_names_failed.update({item_id: item_name})
                    else:
                        print(f"\tSuccessfully added file '{zip_item_file_name}' to main .ZIP โ†’ folder '{item_folder}'.")
                    
                else:
                    print(f"\tERROR: API call failed with status code {api_exportplaybooks_response.status_code}. Item: '{item_name}' ({item_id}).")
                    items_ids_and_names_failed.update({item_id: item_name})

            playbooks_zip_main_object.writestr(f"Created by script {os.path.basename(__file__)}, run by user {os.environ['username']}", "")
            
        if (len(items_ids_and_names_failed) != 0):  
            print("\nWARNING: Export failed for the following items (one possible cause is playbooks referencing missing blocks):")
            print(json.dumps(items_ids_and_names_failed, indent=4))
        
        print(f"\nSaved output to file '{playbooks_zip_main_file_path}'.")
        
        print(f"\n------------------------------------------------------------------------------\n")
        
        # Custom lists
        
        # The API returns the data in JSON format, but importing requires a .CSV file, so we'll convert it.
        
        lists_api_get_url = general_api_url + "/api/external/v1/settings/GetTrackingListRecords"
        print(f"Getting details of all custom lists from {lists_api_get_url} ...")
        lists_api_get_response = requests.get(url=lists_api_get_url, headers=general_api_request_headers)
        
        if (lists_api_get_response.status_code == 200):
            lists_api_get_response_json = lists_api_get_response.json()
        else:
            raise Exception(f"API call to {lists_api_get_url} failed with status code {lists_api_get_response.status_code}.")
            
        datetime_now = datetime.now().strftime(general_datetime_format_iso8601)
        lists_csv_file_name = f"Chronicle SOAR custom lists ({datetime_now}).csv"
        lists_csv_file_path = os.path.join(general_output_folder_path, lists_csv_file_name)
        
        csv_rows = []
        
        with open(lists_csv_file_path, "w", newline="") as csv_file:
            for item in lists_api_get_response_json:
                csv_rows.append(
                    {
                        "Identifier (Free Text)": item["entityIdentifier"],
                        "Category (Free Text)": item["category"],
                        "Environment (List of environments separated with |)": "|".join(item["environments"])
                    }
                )
            
            csv_writer = csv.DictWriter(csv_file, fieldnames=csv_rows[0].keys())
            csv_writer.writeheader()
            csv_writer.writerows(csv_rows)
        
        print(f"\nSaved output to file '{lists_csv_file_path}'.")
        
        print(f"\n------------------------------------------------------------------------------\n")
        
        # Jobs
        
        # The API for adding jobs (/api/external/v1/jobs/SaveOrUpdateJobData) appears to use the same JSON structure as the export, so we'll just store as-is.
        
        jobs_api_get_url = general_api_url + "/api/external/v1/jobs/GetInstalledJobs"
        print(f"Getting details of all jobs from {jobs_api_get_url} ...")
        jobs_api_get_response = requests.get(url=jobs_api_get_url, headers=general_api_request_headers)
        
        if (jobs_api_get_response.status_code == 200):
            jobs_api_get_response_json = jobs_api_get_response.json()
        else:
            raise Exception(f"API call to {jobs_api_get_url} failed with status code {jobs_api_get_response.status_code}.")
            
        datetime_now = datetime.now().strftime(general_datetime_format_iso8601)
        jobs_json_file_name = f"Chronicle SOAR jobs ({datetime_now}).json"
        jobs_json_file_path = os.path.join(general_output_folder_path, jobs_json_file_name)
        
        with open(jobs_json_file_path, "w", newline="") as json_file:
            json_file.write(json.dumps(jobs_api_get_response_json))
        
        print(f"\nSaved output to file '{jobs_json_file_path}'.")
        
        print(f"\n------------------------------------------------------------------------------\n")
        
        # Connectors
        
        # The API for adding connectors (/api/external/v1/connectors) appears to use the same JSON structure as the export, so we'll just store as-is. However, it seems to only accept one at a time, so we'll store all in one list in one list so that we can just read the file once and loop through them.
        
        connectors_api_getidentifiers_url = general_api_url + "/api/external/v1/connectors/cards"
        print(f"Getting identifiers of all connectors from {connectors_api_getidentifiers_url} ...")
        connectors_api_getidentifiers_response = requests.get(url=connectors_api_getidentifiers_url, headers=general_api_request_headers)
        
        if (connectors_api_getidentifiers_response.status_code == 200):
            connectors_api_getidentifiers_response_json = connectors_api_getidentifiers_response.json()
        else:
            raise Exception(f"API call to {connectors_api_getidentifiers_url} failed with status code {connectors_api_getidentifiers_response.status_code}.")
        
        connectors_identifiers = []
        
        for connector in connectors_api_getidentifiers_response_json:
            for card in connector["cards"]:
                connectors_identifiers.append(card["identifier"])
        
        connectors = []
        connectors_identifiers_count = len(connectors_identifiers)
        for index, connector_identifier in enumerate(connectors_identifiers):
            item_number_current = index + 1
            print(f"\nProcessing item {item_number_current} of {connectors_identifiers_count}: '{connector_identifier}'...")
            
            connectors_api_getdetails_url = general_api_url + "/api/external/v1/connectors/" + urllib.parse.quote(connector_identifier)
            print(f"\tExporting integration from {connectors_api_getdetails_url} ...")
            connectors_api_getidentifiers_response = requests.get(url=connectors_api_getdetails_url, headers=general_api_request_headers)
            
            if (connectors_api_getidentifiers_response.status_code == 200):
                print(f"\tSuccess.")
                connectors.append(connectors_api_getidentifiers_response.json())
            else:
                print(f"\tERROR: API call failed with status code {connectors_api_getidentifiers_response.status_code}.")
            
        datetime_now = datetime.now().strftime(general_datetime_format_iso8601)
        connectors_json_file_name = f"Chronicle SOAR connectors ({datetime_now}).json"
        connectors_json_file_path = os.path.join(general_output_folder_path, connectors_json_file_name)
        
        with open(connectors_json_file_path, "w", newline="") as json_file:
            json_file.write(json.dumps(connectors))
        
        print(f"\nSaved output to file '{connectors_json_file_path}'.")

    except Exception as error:
        print("\nGeneral error running script. Details:")
        print(error)
        raise

if __name__ == "__main__":
    main()

Hi @bheu,

we suggest starting with the documentation we have on the topic:

Upgrade the Python version to 3.11

Test integrations in staging mode

We've already reviewed that documentation but some of our questions do remain:

1. There's no guidance on how to easily identify which playbooks use which actions and integrations.

2. We can test individual actions in the staging mode and use representative inputs but we have no way to run full playbooks to give us greater sense of assurance that the update wont break anything.

3. Is there any way for us to know without asking Google support whether a recent update to an integration has caused bugs that should have us reconsider performing an upgrade now and waiting for a fix later?

Hi @donkos 

Great questions. I'll try to assist with some more context:

1. You are right, identifying it from the playbooks themselves isn't possible at the moment. I would highly recommend to work with the IDE page, where you have the indications next to each integration, suggesting which integrations needs to be updated. In addition, you can get a high level context of the integrations requiring an update from the top bar in the IDE, playbooks, and Marketplace pages.

2. That is also true. We might get to working with playbooks in staging in the future, but it's not available at the moment. Our recommendation will be similar to 1 - work on the integration level, check a few actions (preferably ones that you know you are using) to get a sense of validity, and then upgrade the integration. You can still use the same test cases on both staging and production, on the same actions, to test the output in different versions, but on the same input .

3. Generally speaking - The release notes in the marketplace page indicate what was actually changed. when jumping 3 or more versions in one update (e.g - Upgrading an integration from version 23 to version 26) I would highly recommend testing it carefully in staging. 
In the case you mentioned, where a newer version has introduced a bug - we are doing our best to release an update as soon as we can to the integration (mostly matter of hours or very low number of days) so that customers will have less chances to upgrade to it. 

Hope the above helps.