Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ repos:
- id: check-docstring-first
- id: detect-private-key

# - repo: https://github.com/pre-commit/mirrors-mypy
# rev: 'v1.10.1'
# hooks:
# - id: mypy
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v1.10.1'
hooks:
- id: mypy
additional_dependencies:
- pytest

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: 'v0.5.1'
Expand Down
18 changes: 9 additions & 9 deletions src/python_s7comm/async_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime
from typing import List, Optional, Tuple

from .s7comm import AsyncS7Comm, enums
from .s7comm.packets.variable_address import VariableAddress
Expand Down Expand Up @@ -50,31 +49,32 @@ async def get_cpu_state(self) -> enums.CPUStatus:
async def read_area(self, address: str) -> bytes:
return await self.s7comm.read_area(VariableAddress.from_string(address))

async def write_area(self, address: str, data: bytes) -> int:
return await self.s7comm.write_area(address=VariableAddress.from_string(address), data=data)
async def write_area(self, address: str, data: bytes) -> None:
await self.s7comm.write_area(address=VariableAddress.from_string(address), data=data)

async def get_order_code(self) -> Optional[str]:
async def get_order_code(self) -> str | None:
response = await self.s7comm.read_szl(szl_id=0x0011, szl_index=0x0000)
szl_data = SZLResponseData.parse(response.data)
szl_data = SZLResponseData.parse(response.data.data)
for date_tree in szl_data.szl_data_tree_list:
module_identification = ModuleIdentificationDataTree.parse(date_tree)
if module_identification.index == ModuleIdentificationIndex.MODULE_IDENTIFICATION:
return module_identification.order_number
return None

async def read_szl(self, szl_id: int, szl_index: int = 0x0000) -> SZLResponseData:
return await self.s7comm.read_szl(szl_id=szl_id, szl_index=szl_index)
response_data = await self.s7comm.read_szl(szl_id=szl_id, szl_index=szl_index)
return SZLResponseData.parse(response_data.data.data)

async def read_szl_list(self) -> SZLResponseData:
response_data = await self.s7comm.read_szl(szl_id=0x0000, szl_index=0x0000)
return SZLResponseData.parse(response_data.data.data)

async def read_multi_vars(self, items: List[str]) -> List[bytes]:
async def read_multi_vars(self, items: list[str]) -> list[bytes]:
vars_ = [VariableAddress.from_string(item) for item in items]
response = await self.s7comm.read_multi_vars(items=vars_)
return response.values()

async def write_multi_vars(self, items: List[Tuple[str, bytes]]) -> bool:
async def write_multi_vars(self, items: list[tuple[str, bytes]]) -> bool:
vars_ = [(VariableAddress.from_string(address), data) for address, data in items]
response = await self.s7comm.write_multi_vars(items=vars_)
return response.check_result()
Expand All @@ -85,7 +85,7 @@ async def set_plc_system_datetime(self) -> int:
async def delete(self, block_type: str, block_num: int) -> int:
raise NotImplementedError

async def full_upload(self, _type: str, block_num: int) -> Tuple[bytearray, int]:
async def full_upload(self, _type: str, block_num: int) -> tuple[bytearray, int]:
raise NotImplementedError

async def upload(self, block_num: int) -> bytearray:
Expand Down
3 changes: 2 additions & 1 deletion src/python_s7comm/s7comm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .async_client import AsyncS7Comm
from .client import S7Comm
from .exceptions import PacketLostError, StalePacketError


__all__ = ["S7Comm", "AsyncS7Comm"]
__all__ = ["S7Comm", "AsyncS7Comm", "PacketLostError", "StalePacketError"]
10 changes: 7 additions & 3 deletions src/python_s7comm/s7comm/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
from typing import cast

from .base import BaseS7Comm
from .enums import HeaderError, ItemReturnCode, MessageType, SubfunctionCode, UserdataFunction
from .enums import HeaderError, ItemReturnCode, MessageType, SubfunctionCode, UserdataFunction, UserdataLastPDU
from .error_messages import ERROR_MESSAGES
from .exceptions import PacketLostError, StalePacketError
from .packets import (
RequestPLCStop,
S7AckDataHeader,
S7Packet,
SetupCommunicationRequest,
SZLResponseData,
UserDataContinuationRequest,
UserDataRequest,
UserDataResponse,
Expand All @@ -38,6 +39,7 @@ def __init__(
transport: AsyncTransport | None = None,
):
super().__init__(pdu_length=pdu_length)
self.transport: AsyncTransport
if transport is None:
self.transport = AsyncCOTP(tpdu_size=tpdu_size, source_tsap=source_tsap, dest_tsap=dest_tsap)
else:
Expand Down Expand Up @@ -132,7 +134,7 @@ async def read_area(self, address: VariableAddress) -> bytes:
result += response_item.data
return result

async def write_area(self, address: VariableAddress, data: bytes) -> int:
async def write_area(self, address: VariableAddress, data: bytes) -> WriteVariableResponse:
"""
Writes data to PLC memory area with automatic splitting into multiple requests
if data exceeds PDU size.
Expand Down Expand Up @@ -177,7 +179,7 @@ async def write_multi_vars(self, items: list[tuple[VariableAddress, bytes]]) ->
raise ValueError("Invalid response class")
return response

async def read_szl(self, szl_id: int, szl_index: int) -> S7Packet:
async def read_szl(self, szl_id: int, szl_index: int) -> UserDataResponse:
data = struct.pack("!HH", szl_id, szl_index)
request = UserDataRequest.create(
function_group=UserdataFunction.CPU_FUNCTION,
Expand All @@ -186,6 +188,8 @@ async def read_szl(self, szl_id: int, szl_index: int) -> S7Packet:
data=data,
)
response = await self.send(request=request)
if not isinstance(response, UserDataResponse):
raise ValueError("Invalid response class")
return response

async def plc_stop(self) -> S7Packet:
Expand Down
1 change: 1 addition & 0 deletions src/python_s7comm/s7comm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def _create_packet(request: S7Packet, pdu_reference: int) -> bytes:

def _validate_pdu_reference(self, response: S7Packet) -> None:
"""Raises if PDU reference is invalid."""
assert response.header is not None
if response.header.pdu_reference > self._pdu_reference:
raise PacketLostError(f"Expected {self._pdu_reference}, got {response.header.pdu_reference}")
elif response.header.pdu_reference < self._pdu_reference:
Expand Down
1 change: 1 addition & 0 deletions src/python_s7comm/s7comm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(
transport: Transport | None = None,
) -> None:
super().__init__(pdu_length=pdu_length)
self.transport: Transport
if transport is None:
self.transport = COTP(tpdu_size=tpdu_size, source_tsap=source_tsap, dest_tsap=dest_tsap)
else:
Expand Down
4 changes: 4 additions & 0 deletions src/python_s7comm/s7comm/packets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
from .data_item import DataItem
from .headers import S7AckDataHeader, S7Header
from .packet import S7Packet
from .plc_command import RequestPLCStop
from .rw_variable import VariableReadRequest, VariableWriteRequest
from .setup_communication import SetupCommunicationParameter, SetupCommunicationRequest
from .szl import SZLResponseData
from .user_data import UserDataContinuationRequest, UserDataRequest, UserDataResponse
from .variable_address import VariableAddress


__all__ = [
"DataItem",
"S7AckDataHeader",
"S7Parameter",
"RequestPLCStop",
"SetupCommunicationRequest",
"SetupCommunicationParameter",
"S7Packet",
"SZLResponseData",
"UserDataContinuationRequest",
"UserDataRequest",
"UserDataResponse",
Expand Down
4 changes: 4 additions & 0 deletions src/python_s7comm/s7comm/packets/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ def serialize_parameter(self) -> bytes:

def serialize_data(self) -> bytes:
return b""

@classmethod
def parse(cls, packet: bytes) -> "S7Error":
raise NotImplementedError("S7Error is created from header, not parsed from bytes")
13 changes: 10 additions & 3 deletions src/python_s7comm/s7comm/packets/packet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, ClassVar, Protocol
from typing import Any, ClassVar, Protocol, Self

from .headers import S7AckDataHeader, S7Header

Expand All @@ -7,16 +7,23 @@ class S7Data(Protocol):
def serialize(self) -> bytes: ...


class S7Parameter(Protocol):
def serialize(self) -> bytes: ...


class S7Packet(Protocol):
MESSAGE_TYPE: ClassVar
header: S7Header | None
parameter: Any # Optional[S7Parameter]
data: Any # Optional[S7Data] # Union[DataItem, List[DataItem], List[enums.ItemReturnCode], None]
parameter: Any # S7Parameter | None
data: Any # S7Data | list[S7Data] | None

def serialize_parameter(self) -> bytes: ...

def serialize_data(self) -> bytes: ...

@classmethod
def parse(cls, packet: bytes) -> "S7Packet": ...


class S7Response(S7Packet):
def create_packet(self, pdu_reference: int) -> bytes:
Expand Down
8 changes: 4 additions & 4 deletions src/python_s7comm/s7comm/packets/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
from .user_data import UserDataResponse


response_code_to_packet: dict[FunctionCode, S7Packet] = {
response_code_to_packet: dict[FunctionCode, type[S7Packet]] = {
FunctionCode.SetupCommunication: SetupCommunicationRequest,
FunctionCode.ReadVariable: ReadVariableResponse,
FunctionCode.WriteVariable: WriteVariableResponse,
}

request_code_to_packet: dict[FunctionCode, S7Packet] = {
request_code_to_packet: dict[FunctionCode, type[S7Packet]] = {
FunctionCode.SetupCommunication: SetupCommunicationRequest,
FunctionCode.ReadVariable: VariableReadRequest,
FunctionCode.WriteVariable: VariableWriteRequest,
Expand All @@ -47,12 +47,12 @@ def parse(packet: bytes) -> S7Packet:

# parse parameter
parameter = packet[header.LENGTH :]
response = response_code_to_packet.get(function_code).parse(parameter)
response = response_code_to_packet[function_code].parse(parameter)
elif message_type == MessageType.JobRequest:
header = S7Header.parse(packet)
function_code = struct.unpack_from("!B", packet, header.LENGTH)[0]
parameter = packet[header.LENGTH :]
response = request_code_to_packet.get(function_code).parse(parameter)
response = request_code_to_packet[function_code].parse(parameter)
elif message_type == MessageType.Userdata:
header = S7Header.parse(packet)
response = UserDataResponse.parse(packet[header.LENGTH :])
Expand Down
4 changes: 4 additions & 0 deletions src/python_s7comm/s7comm/packets/plc_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ def serialize_parameter(self) -> bytes:
def serialize_data(self) -> bytes:
return b""

@classmethod
def parse(cls, packet: bytes) -> "RequestPLCStop":
raise NotImplementedError("RequestPLCStop is created from constructor, not parsed from bytes")


class PIServiceRequest:
UNKNOWN_BYTES = b"\x00\x00\x00\x00\x00\x00\xfd"
Expand Down
32 changes: 20 additions & 12 deletions src/python_s7comm/s7comm/packets/rw_variable.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import struct
from dataclasses import dataclass
from typing import AsyncGenerator, ClassVar, Generator, List, Tuple
from typing import AsyncGenerator, ClassVar, Generator

from ..enums import (
Area,
Expand Down Expand Up @@ -105,9 +105,9 @@ def parse(cls, packet: bytes) -> "RequestParameterItem":
class VariableRequestParameter:
LENGTH = 2

def __init__(self, function_code: FunctionCode, items: List[RequestParameterItem]):
def __init__(self, function_code: FunctionCode, items: list[RequestParameterItem]):
self.function_code = function_code
self.items: List[RequestParameterItem] = items
self.items: list[RequestParameterItem] = items

def serialize(self) -> bytes:
parameter = struct.pack("!BB", self.function_code, len(self.items))
Expand All @@ -127,13 +127,13 @@ def __init__(self, parameter: VariableRequestParameter) -> None:
self.data = None

@classmethod
def create(cls, items: List[VariableAddress]) -> "VariableReadRequest":
def create(cls, items: list[VariableAddress]) -> "VariableReadRequest":
parameter_items = cls._create_parameter_item_list(items=items)
parameter = VariableRequestParameter(function_code=cls.FUNCTION_CODE, items=parameter_items)
return cls(parameter=parameter)

@staticmethod
def _create_parameter_item_list(items: List[VariableAddress]) -> List[RequestParameterItem]:
def _create_parameter_item_list(items: list[VariableAddress]) -> list[RequestParameterItem]:
parameter_items = []
for item in items:
parameter_item = RequestParameterItem(address=item)
Expand Down Expand Up @@ -194,13 +194,13 @@ class VariableWriteRequest(S7Packet):
MESSAGE_TYPE = MessageType.JobRequest
FUNCTION_CODE = FunctionCode.WriteVariable

def __init__(self, parameter: VariableRequestParameter, data: List[DataItem]) -> None:
def __init__(self, parameter: VariableRequestParameter, data: list[DataItem]) -> None:
self.header = None
self.parameter = parameter
self.data = data

@classmethod
def create(cls, items: List[Tuple[VariableAddress, bytes]]) -> "VariableWriteRequest":
def create(cls, items: list[tuple[VariableAddress, bytes]]) -> "VariableWriteRequest":
parameter_items = []
data_items = []
for item, data in items:
Expand Down Expand Up @@ -314,7 +314,8 @@ def serialize(self) -> bytes:
class ReadVariableResponse(S7Packet):
MESSAGE_TYPE = MessageType.Response

def __init__(self, parameter: VariableParameter, data: List[DataItem]) -> None:
def __init__(self, parameter: VariableParameter, data: list[DataItem]) -> None:
self.header = None
self.parameter = parameter
self.data = data

Expand Down Expand Up @@ -346,7 +347,7 @@ def parse(cls, packet: bytes) -> "ReadVariableResponse":
parameter = VariableParameter(function_code=function_code, item_count=item_count)
return cls(parameter=parameter, data=items)

def values(self) -> List[bytes]:
def values(self) -> list[bytes]:
"""Возвращает только значения в виде списка, после проверки на результат ответа"""
result = []
for item in self.data:
Expand All @@ -355,20 +356,27 @@ def values(self) -> List[bytes]:
result.append(item.data)
return result

def serialize_parameter(self) -> bytes:
return self.parameter.serialize()

def serialize_data(self) -> bytes:
return b"".join(item.serialize() for item in self.data)


class WriteVariableResponse(S7Packet):
MESSAGE_TYPE = MessageType.Response

def __init__(self, parameter: VariableParameter, data: List[ItemReturnCode]) -> None:
def __init__(self, parameter: VariableParameter, data: list[ItemReturnCode]) -> None:
self.header = None
self.parameter = parameter
self.data: List[ItemReturnCode] = data
self.data: list[ItemReturnCode] = data

@classmethod
def parse(cls, packet: bytes) -> "WriteVariableResponse":
function_code, item_count = struct.unpack_from("!BB", packet)
offset = 2
parameter = VariableParameter(function_code=function_code, item_count=item_count)
data: List[ItemReturnCode] = [ItemReturnCode(item) for item in packet[offset:item_count]]
data: list[ItemReturnCode] = [ItemReturnCode(item) for item in packet[offset:item_count]]
return cls(parameter=parameter, data=data)

def serialize_parameter(self) -> bytes:
Expand Down
3 changes: 1 addition & 2 deletions src/python_s7comm/s7comm/packets/szl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import struct
from enum import IntEnum
from typing import List

from ..enums import CPUStatus

Expand Down Expand Up @@ -81,7 +80,7 @@ def parse(cls, packet: bytes) -> "ModuleIdentificationDataTree":
class SZLResponseData:
DATA_TREE_OFFSET = 8

def __init__(self, szl_id: int, szl_index: int, szl_data_tree_list: List[bytes]) -> None:
def __init__(self, szl_id: int, szl_index: int, szl_data_tree_list: list[bytes]) -> None:
self.szl_id = szl_id
self.szl_index = szl_index
self.szl_data_tree_list = szl_data_tree_list
Expand Down
4 changes: 2 additions & 2 deletions src/python_s7comm/s7comm/packets/user_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(
self.subfunction = subfunction
self.sequence_number = sequence_number
self.data_unit_reference = 0
self.last_data_unit = 0
self.last_data_unit = UserdataLastPDU.YES
self.error_code = 0

def serialize(self) -> bytes:
Expand All @@ -82,7 +82,7 @@ class UserDataResponseParameter(UserDataRequestParameter):
def __init__(
self,
data_unit_reference: int,
last_data_unit: bool,
last_data_unit: UserdataLastPDU,
error_code: int,
method: UserdataMethod,
type: TransmissionType,
Expand Down
Loading