2025-01-18 09:10:52 +08:00

214 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import struct
import logging
import binascii
class Command0B0CH:
def __init__(self):
self.command_0b = 0x0B # 平台心跳命令
self.command_0c = 0x0C # 桩心跳命令
def parse_0c_heartbeat(self, data):
"""
解析0CH桩心跳命令
:param data: 完整的0CH命令报文
:return: 解析后的字典或None
"""
try:
# 验证基本帧格式
if len(data) < 14 or data[0:2] != b'JX' or data[2] != 0x0C:
logging.warning(f"0CH命令帧格式不正确原始报文: {binascii.hexlify(data)}")
return None
# 打印完整的原始报文以便调试
print(f"完整原始报文: {binascii.hexlify(data)}")
# 提取桩号
pile_id_bytes = data[3:11]
# 提取时间标识
time_bytes = data[14:20]
year = time_bytes[0] + 2000
month, day, hour, minute, second = time_bytes[1:6]
timestamp = f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
# 解析充电枪数量
gun_count = data[20]
# 解析充电枪状态
gun_states = []
current_index = 21
for i in range(gun_count):
try:
# 每个充电枪的状态信息占2个字节
if current_index + 1 < len(data):
gun_state = data[current_index]
gun_work_mode = data[current_index + 1]
gun_states.append({
"gun_index": i + 1,
"state": gun_state,
"state_text": self.get_gun_state_text(gun_state),
"work_mode": gun_work_mode,
"work_mode_text": self.get_work_mode_text(gun_work_mode)
})
current_index += 2
except Exception as gun_parse_error:
logging.warning(f"解析第 {i + 1} 个充电枪状态时出错: {gun_parse_error}")
# 打印解析结果
print("\n0CH桩心跳命令解析结果:")
print(f"桩号: {pile_id_bytes.hex()}")
print(f"时间标识: {timestamp}")
print(f"充电枪数量: {gun_count}")
for gun_state in gun_states:
print(f"{gun_state['gun_index']}:")
print(f" 状态: {gun_state['state_text']} (0x{gun_state['state']:02X})")
print(f" 工作模式: {gun_state['work_mode_text']} (0x{gun_state['work_mode']:02X})")
return {
"pile_id": pile_id_bytes.hex(),
"timestamp": timestamp,
"gun_count": gun_count,
"gun_states": gun_states
}
except Exception as e:
logging.error(f"解析0CH命令失败: {str(e)}")
logging.error(f"原始报文: {binascii.hexlify(data)}")
return None
def generate_0b_heartbeat_response(self, pile_id_bytes):
"""
生成0BH平台心跳响应
:param pile_id_bytes: 充电桩桩号字节
:return: 0BH心跳响应报文
"""
try:
# 构建帧
frame = bytearray()
frame.extend(b'JX') # 帧起始标志
frame.append(self.command_0b) # 命令码
frame.extend(pile_id_bytes) # 桩号
frame.append(0x01) # 数据加密方式
# 构建数据域
data = bytearray()
# 时间标识(当前时间)
from datetime import datetime
now = datetime.now()
data.extend(struct.pack("<BBBBBB",
now.year - 2000, now.month, now.day,
now.hour, now.minute, now.second))
# 心跳超时次数这里固定为0
data.append(0x00)
# 数据域长度
frame.extend(struct.pack("<H", len(data)))
# 加入数据域
frame.extend(data)
# 计算校验码
check = 0
for b in frame[2:]:
check ^= b
frame.append(check)
print("0BH心跳响应数据构建成功:")
print(f"数据内容: {frame.hex()}")
print(f"数据长度: {len(frame)}字节")
return bytes(frame)
except Exception as e:
logging.error(f"生成0BH心跳响应出错: {str(e)}")
return None
def process_0c_heartbeat(self, data):
"""
处理0CH桩心跳命令
:param data: 完整的0CH命令报文
:return: 是否成功处理
"""
try:
parsed_data = self.parse_0c_heartbeat(data)
if parsed_data is None:
logging.warning("0CH命令解析失败")
return False
# 记录心跳信息日志
logging.info(f"收到桩号 {parsed_data['pile_id']} 的心跳, 充电枪数量 {parsed_data['gun_count']}")
return True
except Exception as e:
logging.error(f"处理0CH命令出错: {str(e)}")
return False
def get_gun_state_text(self, state):
"""
解析充电枪状态
:param state: 充电枪状态字节
:return: 状态文本描述
"""
state_map = {
0x01: "待机",
0x02: "等待连接",
0x03: "启动中",
0x04: "充电中",
0x05: "停止中",
0x06: "预约中",
0x07: "占用中",
0x08: "测试中",
0x09: "故障中",
0x0A: "定时充电",
0x0B: "充电完成",
0x0C: "升级中"
}
return state_map.get(state, f"未知状态 (0x{state:02X})")
def get_work_mode_text(self, mode):
"""
解析工作模式
:param mode: 工作模式字节
:return: 工作模式文本描述
"""
mode_map = {
0x01: "普通充电",
0x02: "轮充",
0x03: "大功率",
0x04: "超级充",
0x05: "电池维护",
0x06: "柔性充"
}
return mode_map.get(mode, f"未知模式 (0x{mode:02X})")
# 测试用示例
if __name__ == "__main__":
# 0C命令测试报文
test_0c_data = bytes.fromhex("4A 58 0C 03 17 67 63 11 36 06 57 01 0C 00 19 01 09 09 37 3B 01 02 01 01 01 01 70")
# 0B命令测试报文
test_0b_data = bytes.fromhex("4A 58 0B 03 17 67 63 11 36 06 57 01 07 00 19 01 09 09 38 00 00 4B")
parser = Command0B0CH()
# 测试解析0C心跳
parser.process_0c_heartbeat(test_0c_data)
# 测试生成0B心跳响应
pile_id_bytes = bytes.fromhex("0317676311360657")
response = parser.generate_0b_heartbeat_response(pile_id_bytes)
print("\n0B心跳响应:")
print(response.hex())