Skip to content
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.36.0] - 2026-03-10
### Added
- Raw log search functionality with `search_raw_logs()` method
- CLI command `secops search raw-logs` for searching raw logs

## [0.35.3] - 2026-03-03
### Updated
- Dashboard methods to use centralized `chronicle_request` helper function for improved code consistency and maintainability
Expand Down
15 changes: 15 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,21 @@ Search ingested UDM field values that match a query:
secops search udm-field-values --query "source" --page-size 10
```

### Search Raw Logs

Search for raw logs in Chronicle using the query language:

```bash
secops search raw-logs \
--query 'raw = \"authentication\"' \
--snapshot-query 'user != ""' \
--time-window 24 \
--case-sensitive \
--log-types "OKTA,AZURE_AD" \
--max-aggregations-per-field 100 \
--page-size 25
```

### Get Statistics

Run statistical analyses on your data:
Expand Down
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,27 @@ results = chronicle.find_udm_field_values(
}
```

### Raw Log Search

Search for raw logs in Chronicle using the query language:

```python
from datetime import datetime, timedelta, timezone

# Set time range for search
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24)

results = chronicle.search_raw_logs(
query='raw != "authentication"',
start_time=start_time,
end_time=end_time,
snapshot_query='status = "success"',
max_aggregations_per_field=100,
page_size=20
)
```

### Statistics Queries

Get statistics about network connections grouped by hostname:
Expand Down
2 changes: 1 addition & 1 deletion api_module_mapping.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/
|rules.retrohunts.list |v1alpha| | |
|rules.updateDeployment |v1alpha| | |
|searchEntities |v1alpha| | |
|searchRawLogs |v1alpha| | |
|searchRawLogs |v1alpha|chronicle.log_search.search_raw_logs |secops search raw-logs |
|summarizeEntitiesFromQuery |v1alpha|chronicle.entity.summarize_entity |secops entity |
|summarizeEntity |v1alpha|chronicle.entity.summarize_entity | |
|testFindingsRefinement |v1alpha| | |
Expand Down
79 changes: 79 additions & 0 deletions examples/log_search_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Example usage of raw log search functionality."""

import argparse
from datetime import datetime, timedelta
from pprint import pprint

from secops.chronicle import ChronicleClient
from secops.exceptions import APIError


def main():
"""Run raw log search example."""
parser = argparse.ArgumentParser(description="Chronicle Raw Log Search Example")
parser.add_argument("--project_id", required=True, help="GCP Project ID")
parser.add_argument("--customer_id", required=True, help="Chronicle Customer ID")
parser.add_argument("--region", default="us", help="Chronicle Region")
parser.add_argument("--query", default="user = \"user\"", help="Raw log search query")
parser.add_argument("--days", type=int, default=1, help="Search time range in days")

args = parser.parse_args()

client = ChronicleClient(
project_id=args.project_id,
customer_id=args.customer_id,
region=args.region,
)

end_time = datetime.utcnow()
start_time = end_time - timedelta(days=args.days)

print(f"Searching raw logs from {start_time} to {end_time}")
print(f"Query: {args.query}")

try:
# Example 1: Basic Search
results = client.search_raw_logs(
query=args.query,
start_time=start_time,
end_time=end_time,
page_size=10,
)

print("\nResults:")
pprint(results)

# Example 2: Filtering by Log Type (if available)
# Note: Replace 'OKTA' with a valid log type in your environment
# print("\nSearching with Log Type filter:")
# results_filtered = client.search_raw_logs(
# query=args.query,
# start_time=start_time,
# end_time=end_time,
# page_size=10,
# log_types=["OKTA"]
# )
# pprint(results_filtered)

except APIError as e:
print(f"API Error: {e}")
except Exception as e:
print(f"Error: {e}")


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "secops"
version = "0.35.3"
version = "0.36.0"
description = "Python SDK for wrapping the Google SecOps API for common use cases"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
2 changes: 2 additions & 0 deletions src/secops/chronicle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
RowLogFormat,
generate_udm_key_value_mappings,
)
from secops.chronicle.log_search import search_raw_logs
from secops.chronicle.udm_search import (
fetch_udm_search_csv,
fetch_udm_search_view,
Expand All @@ -210,6 +211,7 @@
"validate_query",
"get_stats",
"search_udm",
"search_raw_logs",
# Natural Language Search
"translate_nl_to_udm",
# Entity
Expand Down
43 changes: 43 additions & 0 deletions src/secops/chronicle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@
)
from secops.chronicle.rule_validation import validate_rule as _validate_rule
from secops.chronicle.search import search_udm as _search_udm
from secops.chronicle.log_search import search_raw_logs as _search_raw_logs
from secops.chronicle.stats import get_stats as _get_stats
from secops.chronicle.udm_mapping import RowLogFormat
from secops.chronicle.udm_mapping import (
Expand Down Expand Up @@ -910,6 +911,48 @@ def search_udm(
as_list,
)

def search_raw_logs(
self,
query: str,
start_time: datetime,
end_time: datetime,
snapshot_query: str | None = None,
case_sensitive: bool = False,
log_types: list[str] | None = None,
max_aggregations_per_field: int | None = None,
page_size: int | None = None,
) -> dict[str, Any]:
"""Search for raw logs in Chronicle.

Args:
query: Query to search for raw logs.
start_time: Search start time (inclusive).
end_time: Search end time (exclusive).
snapshot_query: Optional. Query to filter results.
case_sensitive: Optional. Whether search is case-sensitive.
log_types: Optional. Limit results to specific log types
by display name (e.g. ["OKTA"]).
max_aggregations_per_field: Optional. Max values for a UDM field.
page_size: Optional. Maximum number of results to return.

Returns:
Dictionary containing search results.

Raises:
APIError: If the API request fails.
"""
return _search_raw_logs(
self,
query=query,
start_time=start_time,
end_time=end_time,
snapshot_query=snapshot_query,
case_sensitive=case_sensitive,
log_types=log_types,
max_aggregations_per_field=max_aggregations_per_field,
page_size=page_size,
)

def find_udm_field_values(
self, query: str, page_size: int | None = None
) -> dict[str, Any]:
Expand Down
86 changes: 86 additions & 0 deletions src/secops/chronicle/log_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Raw log search functionality for Chronicle."""

from datetime import datetime
from typing import TYPE_CHECKING, Any

from secops.chronicle.models import APIVersion
from secops.chronicle.utils.request_utils import chronicle_request

if TYPE_CHECKING:
from secops.chronicle.client import ChronicleClient


def search_raw_logs(
client: "ChronicleClient",
query: str,
start_time: datetime,
end_time: datetime,
snapshot_query: str | None = None,
case_sensitive: bool = False,
log_types: list[str] | None = None,
max_aggregations_per_field: int | None = None,
page_size: int | None = None,
) -> dict[str, Any]:
"""Search for raw logs in Chronicle.

Args:
client: The ChronicleClient instance.
query: Query to search for raw logs.
start_time: Search start time (inclusive).
end_time: Search end time (exclusive).
snapshot_query: Optional. Query to filter results.
case_sensitive: Optional. Whether search is case-sensitive.
log_types: Optional. Limit results to specific log types
(e.g. ["OKTA"]).
max_aggregations_per_field: Optional. Max values for a UDM field.
page_size: Optional. Maximum number of results to return.

Returns:
Dictionary containing search results.

Raises:
APIError: If the API request fails.
"""
search_query: dict[str, Any] = {
"baselineQuery": query,
"baselineTimeRange": {
"startTime": start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"endTime": end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
},
"caseSensitive": case_sensitive,
}

if snapshot_query:
search_query["snapshotQuery"] = snapshot_query

if log_types:
# The API expects a list of LogType objects, filtering by displayName
search_query["logTypes"] = [{"displayName": lt} for lt in log_types]

if max_aggregations_per_field is not None:
search_query["maxAggregationsPerField"] = max_aggregations_per_field

if page_size is not None:
search_query["pageSize"] = page_size

return chronicle_request(
client,
method="POST",
endpoint_path=":searchRawLogs",
api_version=APIVersion.V1ALPHA,
json=search_query,
)
Loading
Loading