import socket import struct import logging import time from datetime import datetime import binascii class Command02: def __init__(self): self.command = 0x02 # 回复命令码02H self.qr_fixed = "https://platform.enneagon.cn/ScanCharging?connectorCode=" def parse_pile_id(self, pile_id_bytes): """解析桩号""" try: vendor_id = struct.unpack(" expected_total_len: print(f"截断多余数据") data = data[:expected_total_len] else: print("数据不完整") return False # 3. 显示数据结构 print(f"\n数据结构分析:") print(f"起始标识: {data[0:2].hex().upper()}") print(f"命令字: {data[2]:02X}") print(f"桩号: {data[3:11].hex().upper()}") print(f"加密方式: {data[11]:02X}") print(f"数据长度: {data_len}") print(f"数据域: {data[14:14 + data_len].hex().upper()}") print(f"校验码: {data[-1]:02X}") # 4. 校验码验证 check_data = data[2:-1] calculated_check = 0 for b in check_data: calculated_check ^= b if calculated_check != data[-1]: print(f"校验码不匹配: 计算值={calculated_check:02X}, 接收值={data[-1]:02X}") return False return True except Exception as e: print(f"验证帧格式异常: {str(e)}") return False def debug_print_fields(self, data): """打印数据包的详细字段""" print("\n===== 数据包解析 =====") print("1. 固定头部:") print(f" 起始标识 (2字节): {data[0:2].hex().upper()}") print(f" 命令码 (1字节): {data[2]:02X}") print(f" 桩号 (8字节): {data[3:11].hex().upper()}") print(f" 加密方式 (1字节): {data[11]:02X}") print(f" 数据域长度 (2字节): {struct.unpack(' expected_len: print(f"\n警告: 发现多余字节:") print(f"多余字节内容: {data[expected_len:].hex().upper()}") def parse_01h(self, data): """解析01H命令数据""" try: print("\n开始解析01H命令...") if not self.validate_frame(data): raise ValueError("帧格式验证失败") # 提取基本信息 command = data[2] pile_id = data[3:11] encrypt_mode = data[11] data_field = data[14:-1] # 从数据域开始到校验码之前 # 解析数据域内容 time_bytes = data_field[0:6] year = time_bytes[0] + 2000 month, day, hour, minute, second = time_bytes[1:6] timestamp = f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}" key_version = struct.unpack(" 600: # 超过10分钟 logging.warning(f"时间差异过大: {time_diff}秒") response = self.build_02h_response(pile_id, False, 6) # 拒绝原因6-时差过大 else: # 这里可以添加更多的验证逻辑 response = self.build_02h_response(pile_id, True, 0) if response and hasattr(sock, 'send'): sock.send(response) logging.info(f"成功发送02H响应, 长度: {len(response)}字节") return True return False except Exception as e: logging.error(f"处理01H命令失败: {str(e)}") return False def test_command(): """测试函数""" print("开始测试01H/02H命令处理...") # 创建响应处理器 handler = Command02() # 原始测试数据 hex_string = "4A5801031767631136065701100019010909371501000000000000000000004D" # 解析数据长度字段以确定正确的数据包长度 expected_len = 14 + 16 + 1 # 头部(14字节) + 数据域(16字节) + 校验码(1字节) # 确保只取需要的字节 test_data = bytes.fromhex(hex_string[:expected_len * 2]) # *2是因为hex字符串中一个字节用两个字符表示 print("\n测试数据:") print(f"原始数据: {hex_string}") print(f"处理后数据: {test_data.hex().upper()}") print(f"数据长度: {len(test_data)}字节") # 打印详细字段解析 handler.debug_print_fields(test_data) # 创建模拟socket class MockSocket: def send(self, data): print(f"\n模拟发送响应数据:") print(f"数据内容: {data.hex().upper()}") print(f"数据长度: {len(data)}字节") mock_sock = MockSocket() # 测试完整处理流程 result = handler.process_and_respond(test_data, mock_sock) print(f"\n最终处理结果: {'成功' if result else '失败'}") if __name__ == "__main__": test_command()