import struct import logging class Command09: def __init__(self): self.command = 0x09 # 09H命令码 def parse_09h(self, data): """ 解析09H遥信命令 :param data: 完整的09H命令报文 :return: 解析后的字典或None """ try: # 验证基本帧格式 if len(data) < 14 or data[0:2] != b'JX' or data[2] != 0x09: logging.warning("09H命令帧格式不正确") return None # 提取桩号 pile_id_bytes = data[3:11] # 提取时间标识 time_bytes = data[14:20] year = time_bytes[0] + 2000 month, day, hour, minute, second = time_bytes[1:6] timestamp = f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}" # 提取充电枪数量 gun_count = data[20] # 存储充电枪状态的列表 gun_states = [] # 从第21字节开始解析充电枪状态 current_index = 21 for i in range(gun_count): # 提取充电枪状态 gun_state = data[current_index] gun_state_text = self.get_gun_state_text(gun_state) # 工作模式 current_index += 1 work_mode = data[current_index] work_mode_text = self.get_work_mode_text(work_mode) gun_states.append({ "gun_index": i + 1, "state": gun_state, "state_text": gun_state_text, "work_mode": work_mode, "work_mode_text": work_mode_text }) current_index += 1 # 打印解析结果 print("\n09H命令解析结果:") print(f"桩号: {pile_id_bytes.hex()}") print(f"时间标识: {timestamp}") print(f"充电枪数量: {gun_count}") print("充电枪状态:") for gun in gun_states: print(f" 枪 {gun['gun_index']}:") print(f" 状态: {gun['state_text']} (0x{gun['state']:02X})") print(f" 工作模式: {gun['work_mode_text']} (0x{gun['work_mode']:02X})") return { "pile_id": pile_id_bytes.hex(), "timestamp": timestamp, "gun_count": gun_count, "gun_states": gun_states } except Exception as e: logging.error(f"解析09H命令失败: {str(e)}") return None def get_gun_state_text(self, state): """ 解析充电枪状态 :param state: 充电枪状态字节 :return: 状态文本描述 """ state_map = { 0x01: "待机", 0x02: "等待连接", 0x03: "启动中", 0x04: "充电中", 0x05: "停止中", 0x06: "预约中", 0x07: "占用中", 0x08: "测试中", 0x09: "故障中", 0x0A: "定时充电", 0x0B: "充电完成", 0x0C: "升级中" } return state_map.get(state, f"未知状态 (0x{state:02X})") def get_work_mode_text(self, mode): """ 解析工作模式 :param mode: 工作模式字节 :return: 工作模式文本描述 """ mode_map = { 0x01: "普通充电", 0x02: "轮充", 0x03: "大功率", 0x04: "超级充", 0x05: "电池维护", 0x06: "柔性充" } return mode_map.get(mode, f"未知模式 (0x{mode:02X})") def process_09h(self, data): """ 处理09H遥信命令 :param data: 完整的09H命令报文 :return: 是否成功处理 """ try: parsed_data = self.parse_09h(data) if parsed_data is None: logging.warning("09H命令解析失败") return False # 记录遥信信息日志 logging.info(f"成功处理09H遥信命令: 桩号 {parsed_data['pile_id']}, 充电枪数量 {parsed_data['gun_count']}") return True except Exception as e: logging.error(f"处理09H命令出错: {str(e)}") return False # 测试用示例 if __name__ == "__main__": # 示例报文 test_data = bytes.fromhex( "4A 58 09 03 17 67 63 11 36 06 57 01 13 00 19 01 09 09 37 1F 00 00 02 01 01 0A 00 00 01 01 0A 00 00 4F") parser = Command09() parser.process_09h(test_data)