49 lines
1.7 KiB
Python
49 lines
1.7 KiB
Python
# command_08.py
|
||
import struct
|
||
import logging
|
||
|
||
class Command08:
|
||
def build_08h_request(self, pile_id):
|
||
frame = bytearray([0x4A, 0x58, 0x08]) # 帧头 + 命令
|
||
frame.extend(pile_id) # 桩号
|
||
frame.append(0x01) # 数据加密方式(不加密)
|
||
|
||
# 数据域:时间标识 + 查询类型
|
||
data = bytearray()
|
||
current_time = bytearray([0x19, 0x03, 0x0A, 0x13, 0x14, 0x00]) # 示例:2025-03-10 13:14:00
|
||
data.extend(current_time)
|
||
data.append(0x01) # 查询类型(0x01: 实时状态)
|
||
frame.extend(struct.pack('<H', len(data))) # 数据域长度
|
||
frame.extend(data)
|
||
|
||
# 计算校验码
|
||
checksum = 0
|
||
for b in frame[2:-1]:
|
||
checksum ^= b
|
||
frame.append(checksum)
|
||
return frame
|
||
|
||
def process_08h_response(self, data):
|
||
logging.debug(f"处理08H响应: {data.hex().upper()}")
|
||
if len(data) < 14:
|
||
logging.warning("08H数据长度不足")
|
||
return
|
||
|
||
# 解析数据域
|
||
data_start = 14
|
||
data_len = struct.unpack('<H', data[12:14])[0]
|
||
data_end = data_start + data_len
|
||
if len(data) < data_end:
|
||
logging.warning("08H数据域长度不匹配")
|
||
return
|
||
|
||
# 提取字段
|
||
timestamp = data[data_start:data_start + 6]
|
||
query_type = data[data_start + 6]
|
||
|
||
# 转换为键值对
|
||
parsed_data = {
|
||
"timestamp": f"20{timestamp[0]:02X}-{timestamp[1]:02X}-{timestamp[2]:02X} {timestamp[3]:02X}:{timestamp[4]:02X}:{timestamp[5]:02X}",
|
||
"query_type": f"0x{query_type:02X}"
|
||
}
|
||
logging.info(f"08H解析结果: {parsed_data}") |