Hello,
We have a product that ships log messages to a file in a top level nested JSON array - example:
[{"test": "testing","hostname":"hostname1"},{"test":"testing","hostname":"hostname2"}]
We need a parser that can break out each individual event delimited by the comma to its own event. The built-in JSON parser only recognizes JSON events individually it seems and does not recognize the top level array even though a top level array JSON is valid JSON.
Please assist or point me to a good direction. In Logstash we can use the "split" function to assist with this.
You're facing a common challenge with ingesting logs that have a top-level JSON array into Google Cloud's SecOps. SecOps's built-in JSON parser expects individual JSON objects, not an array of them. Here's how we might be able to tackle this:
1. Pre-processing is Key and preferred
The most efficient and recommended approach is to pre-process your logs before they reach SecOps SIEM. This reduces the processing load on SecOps and makes ingestion smoother. Here are a few ways to do that:
Modify the Source: If you have control over the product generating these logs, the ideal solution is to modify it to output each JSON object as a separate event.
Leverage Log Shippers: Use a log shipper like Fluentd or Filebit to split the log entries. These tools offer powerful parsing and transformation capabilities:
record_transformer
plugin with the jq
filter to split the array.2. SecOps-Specific Options (Less Ideal)
While pre-processing is preferred, SecOps does provide some options for handling this, although they might be less efficient:
UDM Parser: You could potentially create a custom Unified Data Model (UDM) parser in SecOps to handle the array. However, this can be complex and may not be as performant as pre-processing. You would need to use REPEATED
fields in your UDM schema and carefully craft the parser to extract the array elements into separate events. You might want to request this through the official channels to Google Cloud support. It might take a while to have it built.
SecOps Normalization: SecOps's normalization process might be able to help to some extent. If you can configure the normalization to recognize the array structure and extract the individual objects, that could work. However, this depends heavily on the specific structure of your JSON and might not be reliable. Also you might want to request Google Cloud Support to see what it could be done.
3. Example using Fluentd and jq
Here's a basic example of how you can use Fluentd with the jq
filter to pre-process your logs:
<source>
@type tail
path /path/to/your/logfile.json
tag mylogs
format json
</source>
<filter mylogs>
@type record_transformer
<record>
log ${record["message"]}
</record>
enable_ruby true
code <<-EOF
require 'json'
begin
log_array = JSON.parse(log)
log_array.each do |log_obj|
record["message"] = log_obj.to_json
router.emit(tag, Time.now.to_i, record)
end
rescue JSON::ParserError => e
router.emit_error_event(tag, Time.now.to_i, record, e)
end
EOF
</filter>
<match mylogs>
@type google_cloud
# ... your Google Cloud output configuration ...
</match>
This configuration reads the log file, parses the JSON array using jq
, and then emits each object as a separate event to Google Cloud.
Important Notes:
By pre-processing your logs with a tool like Fluentd and jq
, you can efficiently split the top-level JSON array and send individual events to SecOps SIEM, ensuring optimal performance and easier analysis.