2025-01-18 13:41:57 +08:00

168 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import struct
import logging
from datetime import datetime
class Command25:
def __init__(self):
self.command = 0x25 # 25H命令码
def parse_25h(self, data):
"""解析25H充电信息命令"""
try:
print("\n开始解析25H充电信息命令...")
print(f"接收数据: {data.hex().upper()}")
# 基础校验
if len(data) < 14 or data[0:2] != b'JX' or data[2] != self.command:
logging.warning("25H命令帧格式不正确")
return None
# 基础信息解析
pile_id = data[3:11] # 桩号
encrypt_mode = data[11] # 加密方式
data_len = struct.unpack("<H", data[12:14])[0] # 数据长度
data_field = data[14:14 + data_len] # 数据域
# 正确的时间解析函数
def parse_time(time_bytes):
"""
解析BCD格式的时间
示例: [0x19, 0x01, 0x09, 0x0C, 0x15, 0x2C] -> 2025-01-09 12:15:44
"""
def bcd_to_int(byte):
"""BCD转换为整数"""
return ((byte >> 4) * 10) + (byte & 0x0F)
try:
# 解析时间字段
year = bcd_to_int(time_bytes[0]) + 2000 # BCD年转换 (0x19 -> 25 -> 2025年)
month = bcd_to_int(time_bytes[1]) # BCD月 (0x01 -> 1月)
day = bcd_to_int(time_bytes[2]) # BCD日 (0x09 -> 9日)
hour = bcd_to_int(time_bytes[3]) # BCD时 (0x0C -> 12时)
minute = bcd_to_int(time_bytes[4]) # BCD分 (0x16 -> 22分)
second = bcd_to_int(time_bytes[5]) # BCD秒 (0x12 -> 18秒)
# 调试输出查看BCD解码后的结果
print(f"Debug - Raw bytes: {[hex(b) for b in time_bytes]}")
print(f"Debug - Decoded: {year}-{month}-{day} {hour}:{minute}:{second}")
# 返回格式化时间字符串
return f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
except Exception as e:
print(f"时间解析错误: {e}")
print(f"错误的时间字节: {[hex(b) for b in time_bytes]}")
return "Invalid time"
# 解析每个字段
parsed_data = {
# 基础信息
"pile_id": pile_id.hex().upper(),
"timestamp": parse_time(data_field[0:6]),
"gun_no": data_field[6],
# 电压电流
"voltage": struct.unpack("<H", data_field[7:9])[0] * 0.1, # 分辨率0.1V
"current": struct.unpack("<H", data_field[9:11])[0] * 0.1, # 分辨率0.1A
# 电量
"charging_kwh": struct.unpack("<I", data_field[11:15])[0] * 0.01, # 分辨率0.01kWh
# 时长和费用
"charging_time": struct.unpack("<I", data_field[15:19])[0], # 秒
"total_amount": struct.unpack("<I", data_field[19:23])[0] * 0.01, # 分辨率0.01元
# 模块信息
"module_count": data_field[23], # 充电模块接入数量
# 费用明细
"power_amount": struct.unpack("<I", data_field[24:28])[0] * 0.01, # 电费金额
"service_amount": struct.unpack("<I", data_field[28:32])[0] * 0.01, # 服务费金额
# 订单信息
"order_no": data_field[32:64].decode('ascii').rstrip('\x00'), # 订单号
# 开始和结束时间
"start_time": parse_time(data_field[64:70]), # 应该解析为 2025-01-09 12:21:44
"end_time": parse_time(data_field[70:76]) # 应该解析为 2025-01-09 12:22:18
}
# 打印解析结果
print("\n=== 25H充电信息解析结果 ===")
print(f"基本信息:")
print(f" 桩号: {parsed_data['pile_id']}")
print(f" 时间标识: {parsed_data['timestamp']}")
print(f" 枪号: {parsed_data['gun_no']}")
print(f"\n充电参数:")
print(f" 充电电压: {parsed_data['voltage']:.1f}V")
print(f" 充电电流: {parsed_data['current']:.1f}A")
print(f" 充电电量: {parsed_data['charging_kwh']:.2f}kWh")
print(f" 充电时长: {parsed_data['charging_time']}")
print(f"\n费用信息:")
print(f" 总金额: {parsed_data['total_amount']:.2f}")
print(f" 电费金额: {parsed_data['power_amount']:.2f}")
print(f" 服务费金额: {parsed_data['service_amount']:.2f}")
print(f"\n时间信息:")
print(f" 开始时间: {parsed_data['start_time']}")
print(f" 结束时间: {parsed_data['end_time']}")
print(f"\n订单信息:")
print(f" 订单号: {parsed_data['order_no']}")
return parsed_data
except Exception as e:
logging.error(f"解析25H充电信息命令失败: {str(e)}")
print(f"解析失败: {str(e)}")
import traceback
traceback.print_exc()
return None
def process_25h(self, data):
"""处理25H充电信息命令"""
try:
print("\n处理充电信息命令...")
# 解析命令
result = self.parse_25h(data)
if not result:
return False
# 可以在这里添加额外处理逻辑
# 例如:保存到数据库、发送到其他系统等
return True
except Exception as e:
logging.error(f"处理25H充电信息命令失败: {str(e)}")
print(f"处理失败: {str(e)}")
return False
def test_charge_info():
"""测试充电信息命令处理"""
print("开始测试充电信息命令处理...")
# 创建处理器
handler = Command25()
# 测试数据
test_data = bytes.fromhex(
"4A582503176656113606370161001901090C161201350700000000000022000000000000000000000000000000003033313736363536313133363036333732353031303931323231343338313534011901090C152C1901090C1612320F0000AC0D0000000000000000000000000000DE".replace(
" ", ""))
print("\n测试数据:")
print(f"25H数据: {test_data.hex().upper()}")
# 测试处理流程
result = handler.process_25h(test_data)
print(f"\n最终处理结果: {'成功' if result else '失败'}")
if __name__ == "__main__":
test_charge_info()