Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 40 additions & 21 deletions can/io/asc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
import logging
import re
from collections.abc import Generator
from datetime import datetime
from datetime import datetime, timezone, tzinfo
from typing import Any, Final, TextIO

from ..message import Message
from ..typechecking import StringPathLike
from ..util import channel2int, dlc2len, len2dlc
from .generic import TextIOMessageReader, TextIOMessageWriter

_LOCAL_TZ: Final = datetime.now(timezone.utc).astimezone().tzinfo

CAN_MSG_EXT = 0x80000000
CAN_ID_MASK = 0x1FFFFFFF
BASE_HEX = 16
Expand Down Expand Up @@ -44,24 +46,31 @@ def __init__(
file: StringPathLike | TextIO,
base: str = "hex",
relative_timestamp: bool = True,
tz: tzinfo | None = _LOCAL_TZ,
**kwargs: Any,
) -> None:
"""
:param file: a path-like object or as file-like object to read from
If this is a file-like object, is has to opened in text
read mode, not binary read mode.
:param base: Select the base(hex or dec) of id and data.
If the header of the asc file contains base information,
this value will be overwritten. Default "hex".
:param relative_timestamp: Select whether the timestamps are
`relative` (starting at 0.0) or `absolute` (starting at
the system time). Default `True = relative`.
:param file:
a path-like object or a file-like object to read from.
If this is a file-like object, it must be opened in text
read mode, not binary read mode.
:param base:
Select the base ('hex' or 'dec') for CAN IDs and data bytes.
If the header of the ASC file contains base information,
this value will be overwritten. Default is "hex".
:param relative_timestamp:
Select whether the timestamps are
`relative` (starting at 0.0) or `absolute` (starting at
the system time). Default is `True` (relative).
:param tz:
Timezone for absolute timestamps. Defaults to local timezone.
"""
super().__init__(file, mode="r")

if not self.file:
raise ValueError("The given file cannot be None")
self.base = base
self._timezone = tz
self._converted_base = self._check_base(base)
self.relative_timestamp = relative_timestamp
self.date: str | None = None
Expand Down Expand Up @@ -93,7 +102,7 @@ def _extract_header(self) -> None:
self.start_time = (
0.0
if self.relative_timestamp
else self._datetime_to_timestamp(self.date)
else self._datetime_to_timestamp(self.date, self._timezone)
)
continue

Expand All @@ -115,7 +124,7 @@ def _extract_header(self) -> None:
break

@staticmethod
def _datetime_to_timestamp(datetime_string: str) -> float:
def _datetime_to_timestamp(datetime_string: str, tz: tzinfo | None) -> float:
month_map = {
"jan": 1,
"feb": 2,
Expand Down Expand Up @@ -155,7 +164,11 @@ def _datetime_to_timestamp(datetime_string: str) -> float:

for format_str in datetime_formats:
try:
return datetime.strptime(datetime_string, format_str).timestamp()
return (
datetime.strptime(datetime_string, format_str)
.replace(tzinfo=tz)
.timestamp()
)
except ValueError:
continue

Expand Down Expand Up @@ -279,7 +292,7 @@ def __iter__(self) -> Generator[Message, None, None]:
self.start_time = (
0.0
if self.relative_timestamp
else self._datetime_to_timestamp(datetime_str)
else self._datetime_to_timestamp(datetime_str, self._timezone)
)
continue

Expand Down Expand Up @@ -358,14 +371,19 @@ def __init__(
self,
file: StringPathLike | TextIO,
channel: int = 1,
tz: tzinfo | None = _LOCAL_TZ,
**kwargs: Any,
) -> None:
"""
:param file: a path-like object or as file-like object to write to
If this is a file-like object, is has to opened in text
write mode, not binary write mode.
:param channel: a default channel to use when the message does not
have a channel set
:param file:
a path-like object or a file-like object to write to.
If this is a file-like object, it must be opened in text
write mode, not binary write mode.
:param channel:
a default channel to use when the message does not
have a channel set. Default is 1.
:param tz:
Timezone for timestamps in the log file. Defaults to local timezone.
"""
if kwargs.get("append", False):
raise ValueError(
Expand All @@ -374,10 +392,11 @@ def __init__(
)
super().__init__(file, mode="w")

self._timezone = tz
self.channel = channel

# write start of file header
start_time = self._format_header_datetime(datetime.now())
start_time = self._format_header_datetime(datetime.now(tz=self._timezone))
self.file.write(f"date {start_time}\n")
self.file.write("base hex timestamps absolute\n")
self.file.write("internal events logged\n")
Expand Down Expand Up @@ -417,7 +436,7 @@ def log_event(self, message: str, timestamp: float | None = None) -> None:
if not self.header_written:
self.started = self.last_timestamp = timestamp or 0.0

start_time = datetime.fromtimestamp(self.last_timestamp)
start_time = datetime.fromtimestamp(self.last_timestamp, tz=self._timezone)
formatted_date = self._format_header_datetime(start_time)

self.file.write(f"Begin Triggerblock {formatted_date}\n")
Expand Down
1 change: 1 addition & 0 deletions doc/changelog.d/2035.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add new timezone parameter `tz` to `can.io.asc.ASCReader` and `can.io.asc.ASCWriter`.
111 changes: 66 additions & 45 deletions test/logformats_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
import unittest
from abc import ABCMeta, abstractmethod
from contextlib import contextmanager
from datetime import datetime
from datetime import datetime, timedelta, timezone
from itertools import zip_longest
from pathlib import Path
from unittest.mock import patch

from parameterized import parameterized

import can
from can.io import blf
import can.io
from can.io import asc, blf

from .data.example_data import (
TEST_COMMENTS,
TEST_MESSAGES_BASE,
Expand Down Expand Up @@ -427,9 +428,11 @@ def _read_log_file(self, filename, **kwargs):

def test_read_absolute_time(self):
time_from_file = "Sat Sep 30 10:06:13.191 PM 2017"
start_time = datetime.strptime(
time_from_file, self.FORMAT_START_OF_FILE_DATE
).timestamp()
start_time = (
datetime.strptime(time_from_file, self.FORMAT_START_OF_FILE_DATE)
.replace(tzinfo=asc._LOCAL_TZ)
.timestamp()
)

expected_messages = [
can.Message(
Expand Down Expand Up @@ -629,51 +632,55 @@ def test_read_error_frame_channel(self):
os.unlink(temp_file.name)

def test_write_millisecond_handling(self):
tz = asc._LOCAL_TZ
now = datetime(
year=2017, month=9, day=30, hour=15, minute=6, second=13, microsecond=191456
year=2017,
month=9,
day=30,
hour=15,
minute=6,
second=13,
microsecond=191456,
tzinfo=tz,
)

# We temporarily set the locale to C to ensure test reproducibility
with override_locale(category=locale.LC_TIME, locale_str="C"):
# We mock datetime.now during ASCWriter __init__ for reproducibility
# Unfortunately, now() is a readonly attribute, so we mock datetime
with patch("can.io.asc.datetime") as mock_datetime:
mock_datetime.now.return_value = now
writer = can.ASCWriter(self.test_file_name)

msg = can.Message(
timestamp=now.timestamp(), arbitration_id=0x123, data=b"h"
)
writer.on_message_received(msg)
with patch("can.io.asc.datetime") as mock_datetime:
mock_datetime.now.return_value = now
writer = can.ASCWriter(self.test_file_name, tz=tz)

writer.stop()
msg = can.Message(timestamp=now.timestamp(), arbitration_id=0x123, data=b"h")
writer.on_message_received(msg)
writer.stop()

actual_file = Path(self.test_file_name)
expected_file = self._get_logfile_location("single_frame_us_locale.asc")

self.assertEqual(expected_file.read_text(), actual_file.read_text())

def test_write(self):
now = datetime(
year=2017, month=9, day=30, hour=15, minute=6, second=13, microsecond=191456
)

# We temporarily set the locale to C to ensure test reproducibility
with override_locale(category=locale.LC_TIME, locale_str="C"):
# We mock datetime.now during ASCWriter __init__ for reproducibility
# Unfortunately, now() is a readonly attribute, so we mock datetime
with patch("can.io.asc.datetime") as mock_datetime:
mock_datetime.now.return_value = now
writer = can.ASCWriter(self.test_file_name)

msg = can.Message(
timestamp=now.timestamp(),
arbitration_id=0x123,
data=range(64),
tz = asc._LOCAL_TZ
with patch("can.io.asc.datetime") as mock_datetime:
now = datetime(
year=2017,
month=9,
day=30,
hour=15,
minute=6,
second=13,
microsecond=191456,
tzinfo=tz,
)
mock_datetime.now.return_value = now
writer = can.ASCWriter(self.test_file_name, tz=tz)

with writer:
writer.on_message_received(msg)
msg = can.Message(
timestamp=now.timestamp(),
arbitration_id=0x123,
data=range(64),
)

with writer:
writer.on_message_received(msg)

actual_file = Path(self.test_file_name)
expected_file = self._get_logfile_location("single_frame.asc")
Expand All @@ -684,34 +691,48 @@ def test_write(self):
[
(
"May 27 04:09:35.000 pm 2014",
datetime(2014, 5, 27, 16, 9, 35, 0).timestamp(),
datetime(
2014, 5, 27, 16, 9, 35, 0, tzinfo=timezone(timedelta(hours=5))
).timestamp(),
),
(
"Mai 27 04:09:35.000 pm 2014",
datetime(2014, 5, 27, 16, 9, 35, 0).timestamp(),
datetime(
2014, 5, 27, 16, 9, 35, 0, tzinfo=timezone(timedelta(hours=5))
).timestamp(),
),
(
"Apr 28 10:44:52.480 2022",
datetime(2022, 4, 28, 10, 44, 52, 480000).timestamp(),
datetime(
2022, 4, 28, 10, 44, 52, 480000, tzinfo=timezone(timedelta(hours=5))
).timestamp(),
),
(
"Sep 30 15:06:13.191 2017",
datetime(2017, 9, 30, 15, 6, 13, 191000).timestamp(),
datetime(
2017, 9, 30, 15, 6, 13, 191000, tzinfo=timezone(timedelta(hours=5))
).timestamp(),
),
(
"Sep 30 15:06:13.191 pm 2017",
datetime(2017, 9, 30, 15, 6, 13, 191000).timestamp(),
datetime(
2017, 9, 30, 15, 6, 13, 191000, tzinfo=timezone(timedelta(hours=5))
).timestamp(),
),
(
"Sep 30 15:06:13.191 am 2017",
datetime(2017, 9, 30, 15, 6, 13, 191000).timestamp(),
datetime(
2017, 9, 30, 15, 6, 13, 191000, tzinfo=timezone(timedelta(hours=5))
).timestamp(),
),
]
)
def test_datetime_to_timestamp(
self, datetime_string: str, expected_timestamp: float
):
timestamp = can.ASCReader._datetime_to_timestamp(datetime_string)
timestamp = can.ASCReader._datetime_to_timestamp(
datetime_string, tz=timezone(timedelta(hours=5))
)
self.assertAlmostEqual(timestamp, expected_timestamp)


Expand Down