Parse and process raw sensor data files from GEMS devices, converting JSON message formats into structured data.
# List all available packet type parsers
rtgs data-parser list-parsers# Basic parsing - parse all packet types
rtgs data-parser parse --input-file raw_data.csv --output-file parsed_data.csv
# Basic parsing - parse all packet types with verbose output
rtgs data-parser parse --input-file raw_data.csv --output-file parsed_data.csv --verbose
# Parse specific packet types
rtgs data-parser parse --input-file raw_data.csv --output-file parsed_data.csv --packet-types "data/v2,diagnostic/v2"
# Parse with output format selection
rtgs data-parser parse --input-file raw_data.csv --output-file parsed_data --output-format parquet
# Parse with custom output directory
rtgs data-parser parse --input-file raw_data.csv --output-dir ./processed_data/--input-file TEXT: Input CSV file with raw sensor data (required)--output-file TEXT: Output filename (without extension)--output-dir TEXT: Output directory (default: ./data/parsed)--output-format [csv|parquet]: Output format (default: csv)--packet-types TEXT: Comma-separated packet types to parse (default: "all")--verbose: Enable verbose output for debugging--skip-confirmation: Skip interactive confirmation prompts
from rtgs_lab_tools.data_parser import parse_file, list_available_parsers
# List available parsers
parsers = list_available_parsers()
for parser_name, description in parsers.items():
print(f"{parser_name}: {description}")
# Parse a data file
result = parse_file(
input_file="raw_sensor_data.csv",
output_file="parsed_data.csv",
packet_types=["data", "diagnostic"]
)
print(f"Parsed {result['records_processed']} records")
print(f"Output: {result['output_file']}")from rtgs_lab_tools.data_parser.parsers import DataParser, DiagnosticParser
from rtgs_lab_tools.data_parser.core import MessageProcessor
import pandas as pd
# Load raw data
df = pd.read_csv("raw_data.csv")
# Initialize message processor
processor = MessageProcessor()
# Register specific parsers
processor.register_parser("data", DataParser())
processor.register_parser("diagnostic", DiagnosticParser())
# Process messages
parsed_data = []
for _, row in df.iterrows():
try:
parsed = processor.process_message(row['message'], row.get('packet_type'))
parsed_data.append(parsed)
except Exception as e:
print(f"Failed to parse message: {e}")
# Convert to DataFrame
parsed_df = pd.DataFrame(parsed_data)from rtgs_lab_tools.data_parser.parsers.base import BaseParser
import json
class CustomSensorParser(BaseParser):
"""Custom parser for specific sensor type."""
def get_packet_type(self) -> str:
return "custom_sensor"
def parse_message(self, message: str) -> dict:
"""Parse custom sensor message format."""
try:
data = json.loads(message)
return {
'timestamp': data.get('ts'),
'node_id': data.get('node'),
'temperature': data.get('temp'),
'humidity': data.get('hum'),
'battery_voltage': data.get('batt')
}
except Exception as e:
raise ValueError(f"Failed to parse custom sensor message: {e}")
# Register and use custom parser
from rtgs_lab_tools.data_parser.core import MessageProcessor
processor = MessageProcessor()
processor.register_parser("custom_sensor", CustomSensorParser())
# Process with custom parser
result = processor.process_message(message_string, "custom_sensor")from rtgs_lab_tools.data_parser import parse_file
import os
# Process multiple files
input_dir = "./raw_data/"
output_dir = "./processed_data/"
for filename in os.listdir(input_dir):
if filename.endswith('.csv'):
input_path = os.path.join(input_dir, filename)
output_name = filename.replace('.csv', '_parsed')
result = parse_file(
input_file=input_path,
output_file=output_name,
output_dir=output_dir,
packet_types=["data"]
)
print(f"Processed {filename}: {result['records_processed']} records")- Packet Type:
data - Description: Parses main sensor measurement messages
- Output Fields: Timestamp, node_id, measurements, environmental data
- Packet Type:
diagnostic - Description: Parses device diagnostic and status messages
- Output Fields: System status, error codes, battery levels, signal strength
- Packet Type:
error - Description: Parses error and fault condition messages
- Output Fields: Error codes, error descriptions, timestamps, affected components
- Packet Type:
metadata - Description: Parses device configuration and metadata messages
- Output Fields: Device settings, firmware versions, configuration parameters
- Packet Type:
json - Description: Generic JSON message parser for structured data
- Output Fields: Flattened JSON structure with dot notation keys
- Packet Type:
csv - Description: Handles CSV-formatted message payloads
- Output Fields: Column-based data extraction
{
"timestamp": "2023-06-15T14:30:00Z",
"node_id": "LCCMR_01",
"packet_type": "data",
"message": {
"Data": {
"Devices": [
{
"Temperature": 23.5,
"Humidity": 65.2,
"PORT_V": [3.3, 5.0, 12.0]
}
]
}
}
}{
"timestamp": "2023-06-15T14:30:00Z",
"node_id": "LCCMR_01",
"packet_type": "diagnostic",
"message": {
"Diagnostic": {
"BatteryVoltage": 3.7,
"SignalStrength": -65,
"ErrorCodes": ["0x00000000"],
"SystemStatus": "normal"
}
}
}- Structure: Tabular data with flattened JSON paths
- Columns: timestamp, node_id, measurement_name, value, units
- Benefits: Excel-compatible, human-readable
- Structure: Columnar binary format
- Benefits: Efficient storage, faster I/O, preserves data types
- Use Case: Large datasets, data analysis workflows
- Numeric values converted to appropriate types
- Timestamps parsed to datetime objects
- Arrays handled as separate columns or JSON arrays
- Malformed JSON messages logged but don't stop processing
- Partial parsing continues when possible
- Detailed error reporting with line numbers and context
- Streaming processing for large files
- Configurable batch sizes
- Efficient memory usage for massive datasets
- Schema validation for known message types
- Range checking for sensor values
- Duplicate detection and handling
from rtgs_lab_tools.data_parser.core import MessageProcessor
# Configure processor options
processor = MessageProcessor(
batch_size=1000, # Process in batches for memory efficiency
validate_json=True, # Validate JSON schema
strict_mode=False, # Continue on errors vs. strict validation
include_raw_message=False # Include original message in output
)# CSV output options
csv_options = {
'index': False,
'encoding': 'utf-8',
'float_format': '%.6f'
}
# Parquet output options
parquet_options = {
'compression': 'snappy',
'engine': 'pyarrow'
}from rtgs_lab_tools import sensing_data, data_parser, visualization
# 1. Extract raw data
raw_results = sensing_data.extract_data(
project="Sensor Network Study",
start_date="2023-06-01",
end_date="2023-06-30"
)
# 2. Parse the raw sensor messages
parsed_results = data_parser.parse_file(
input_file=raw_results['output_file'],
packet_types=["data", "diagnostic"],
output_format="parquet"
)
# 3. Create visualizations from parsed data
plot_path = visualization.create_time_series_plot(
df=parsed_results['data'],
measurement_name="Temperature",
title="Sensor Network Temperature Analysis"
)
print(f"Analysis complete: {plot_path}")from rtgs_lab_tools.data_parser import parse_file
import pandas as pd
# Parse with quality control
result = parse_file(
input_file="field_data.csv",
packet_types=["data", "diagnostic"]
)
# Load parsed data for analysis
df = pd.read_csv(result['output_file'])
# Quality control checks
print(f"Total records: {len(df)}")
print(f"Date range: {df['timestamp'].min()} to {df['timestamp'].max()}")
print(f"Unique nodes: {df['node_id'].nunique()}")
print(f"Missing values: {df.isnull().sum().sum()}")
# Identify potential data quality issues
temp_data = df[df['measurement_name'] == 'Temperature']
outliers = temp_data[(temp_data['value'] < -40) | (temp_data['value'] > 60)]
print(f"Temperature outliers: {len(outliers)}")from rtgs_lab_tools import sensing_data, data_parser
# Extract and parse in one workflow
raw_data = sensing_data.extract_data(project="My Project")
parsed_data = data_parser.parse_file(raw_data['output_file'])from rtgs_lab_tools import data_parser, visualization
# Parse and visualize
parsed_data = data_parser.parse_file("raw_data.csv")
plot = visualization.create_time_series_plot(
df=parsed_data['data'],
measurement_name="Temperature"
)- Malformed JSON: Invalid JSON in message field
- Unknown packet types: Unregistered packet type encountered
- Schema mismatches: Message doesn't match expected format
- Large file processing: Memory issues with very large files
# Enable detailed logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Parse with error details
result = data_parser.parse_file(
input_file="problematic_data.csv",
packet_types=["data"]
)
# Check parsing statistics
print(f"Success rate: {result['success_rate']:.2%}")
print(f"Errors: {result['error_count']}")
print(f"Error details: {result['error_summary']}")