import socket import struct import logging import time from datetime import datetime import binascii class Command02: def __init__(self): self.command = 0x02 # 回复命令码02H self.qr_fixed = "https://platform.enneagon.cn/ScanCharging?connectorCode=" def parse_pile_id(self, pile_id_bytes): """解析桩号""" try: vendor_id = struct.unpack(" expected_total_len: print(f"截断多余数据") data = data[:expected_total_len] else: print("数据不完整") return False # 3. 显示数据结构 print(f"\n数据结构分析:") print(f"起始标识: {data[0:2].hex().upper()}") print(f"命令字: {data[2]:02X}") print(f"桩号: {data[3:11].hex().upper()}") print(f"加密方式: {data[11]:02X}") print(f"数据长度: {data_len}") print(f"数据域: {data[14:14 + data_len].hex().upper()}") print(f"校验码: {data[-1]:02X}") # 4. 校验码验证 check_data = data[2:-1] calculated_check = 0 for b in check_data: calculated_check ^= b if calculated_check != data[-1]: print(f"校验码不匹配: 计算值={calculated_check:02X}, 接收值={data[-1]:02X}") return False return True except Exception as e: print(f"验证帧格式异常: {str(e)}") return False def debug_print_fields(self, data): """打印数据包的详细字段""" print("\n===== 数据包解析 =====") print("1. 固定头部:") print(f" 起始标识 (2字节): {data[0:2].hex().upper()}") print(f" 命令码 (1字节): {data[2]:02X}") print(f" 桩号 (8字节): {data[3:11].hex().upper()}") print(f" 加密方式 (1字节): {data[11]:02X}") print(f" 数据域长度 (2字节): {struct.unpack(' expected_len: print(f"\n警告: 发现多余字节:") print(f"多余字节内容: {data[expected_len:].hex().upper()}") def parse_01h(self, data): """解析01H命令数据""" try: print("\n开始解析01H命令...") if not self.validate_frame(data): raise ValueError("帧格式验证失败") # 提取基本信息 command = data[2] pile_id = data[3:11] encrypt_mode = data[11] data_field = data[14:-1] # 从数据域开始到校验码之前 # 解析数据域内容 time_bytes = data_field[0:6] year = time_bytes[0] + 2000 month, day, hour, minute, second = time_bytes[1:6] timestamp = f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}" key_version = struct.unpack(" 600: # 超过10分钟 logging.warning(f"时间差异过大: {time_diff}秒") response = self.build_02h_response(pile_id, False, 6) # 拒绝原因6-时差过大 else: # 这里可以添加更多的验证逻辑 response = self.build_02h_response(pile_id, True, 0) if response and hasattr(sock, 'send'): sock.send(response) logging.info(f"成功发送02H响应, 长度: {len(response)}字节") return True return False except Exception as e: logging.error(f"处理01H命令失败: {str(e)}") return False def test_command(): """测试函数""" print("开始测试01H/02H命令处理...") # 创建响应处理器 handler = Command02() # 直接使用完整的测试数据,不进行裁剪 test_data = bytes.fromhex("4A5801031767631136065701100019010909371501000000000000000000004D") print("\n测试数据:") print(f"数据内容: {test_data.hex().upper()}") print(f"数据长度: {len(test_data)}字节") # 修改validate_frame方法来处理多余字节 def validate_frame(self, data): """验证帧格式""" try: print("\n开始验证帧格式:") print(f"数据内容: {data.hex().upper()}") print(f"数据长度: {len(data)}字节") # 1. 基本格式检查 if len(data) < 14: print("数据长度不足14字节") return False if data[0:2] != b'JX': print("帧起始标志不是'JX'") return False # 2. 获取数据域长度 data_len = struct.unpack("