167 lines
6.3 KiB
Python
167 lines
6.3 KiB
Python
import struct
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
class Command25:
|
|
def __init__(self):
|
|
self.command = 0x25 # 25H命令码
|
|
|
|
def parse_25h(self, data):
|
|
"""解析25H充电信息命令"""
|
|
try:
|
|
print("\n开始解析25H充电信息命令...")
|
|
print(f"接收数据: {data.hex().upper()}")
|
|
|
|
# 基础校验
|
|
if len(data) < 14 or data[0:2] != b'JX' or data[2] != self.command:
|
|
logging.warning("25H命令帧格式不正确")
|
|
return None
|
|
|
|
# 基础信息解析
|
|
pile_id = data[3:11] # 桩号
|
|
encrypt_mode = data[11] # 加密方式
|
|
data_len = struct.unpack("<H", data[12:14])[0] # 数据长度
|
|
data_field = data[14:14 + data_len] # 数据域
|
|
|
|
def parse_time(time_bytes):
|
|
"""
|
|
解析时间字节
|
|
"""
|
|
try:
|
|
# 打印时间字节用于调试
|
|
print(f"Parsing time bytes: {[hex(b) for b in time_bytes]}")
|
|
|
|
year = time_bytes[0] + 2000 # 25 + 2000 = 2025
|
|
month = time_bytes[1] # 1
|
|
day = time_bytes[2] # 9
|
|
hour = time_bytes[3] # 12
|
|
minute = time_bytes[4] # 15/16
|
|
second = time_bytes[5] # 44/18
|
|
|
|
return f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
|
|
except Exception as e:
|
|
print(f"时间解析错误: {e}")
|
|
print(f"错误的时间字节: {[hex(b) for b in time_bytes]}")
|
|
return "Invalid time"
|
|
|
|
# 打印整个数据域的十六进制,用于调试
|
|
print("Full data field:")
|
|
print(' '.join(hex(b) for b in data_field))
|
|
|
|
# 找到订单号后的时间字段
|
|
order_no = data_field[32:64].decode('ascii').rstrip('\x00')
|
|
order_end_pos = 64
|
|
|
|
# 开始和结束时间应该在订单号之后
|
|
start_time_pos = order_end_pos + 1 # 跳过一个字节
|
|
start_time_bytes = data_field[start_time_pos:start_time_pos + 6]
|
|
end_time_bytes = data_field[start_time_pos + 6:start_time_pos + 12]
|
|
|
|
parsed_data = {
|
|
# 基础信息
|
|
"pile_id": pile_id.hex().upper(),
|
|
"timestamp": parse_time(data_field[0:6]),
|
|
"gun_no": data_field[6],
|
|
|
|
# 电压电流
|
|
"voltage": struct.unpack("<H", data_field[7:9])[0] * 0.1, # 分辨率0.1V
|
|
"current": struct.unpack("<H", data_field[9:11])[0] * 0.1, # 分辨率0.1A
|
|
|
|
# 电量
|
|
"charging_kwh": struct.unpack("<I", data_field[11:15])[0] * 0.01, # 分辨率0.01kWh
|
|
|
|
# 时长和费用
|
|
"charging_time": struct.unpack("<I", data_field[15:19])[0], # 秒
|
|
"total_amount": struct.unpack("<I", data_field[19:23])[0] * 0.01, # 分辨率0.01元
|
|
|
|
# 模块信息
|
|
"module_count": data_field[23], # 充电模块接入数量
|
|
|
|
# 费用明细
|
|
"power_amount": struct.unpack("<I", data_field[24:28])[0] * 0.01, # 电费金额
|
|
"service_amount": struct.unpack("<I", data_field[28:32])[0] * 0.01, # 服务费金额
|
|
|
|
# 订单信息
|
|
"order_no": order_no,
|
|
|
|
# 开始和结束时间
|
|
"start_time": parse_time(start_time_bytes),
|
|
"end_time": parse_time(end_time_bytes)
|
|
}
|
|
|
|
# 打印解析结果
|
|
print("\n=== 25H充电信息解析结果 ===")
|
|
print(f"基本信息:")
|
|
print(f" 桩号: {parsed_data['pile_id']}")
|
|
print(f" 时间标识: {parsed_data['timestamp']}")
|
|
print(f" 枪号: {parsed_data['gun_no']}")
|
|
|
|
print(f"\n充电参数:")
|
|
print(f" 充电电压: {parsed_data['voltage']:.1f}V")
|
|
print(f" 充电电流: {parsed_data['current']:.1f}A")
|
|
print(f" 充电电量: {parsed_data['charging_kwh']:.2f}kWh")
|
|
print(f" 充电时长: {parsed_data['charging_time']}秒")
|
|
|
|
print(f"\n费用信息:")
|
|
print(f" 总金额: {parsed_data['total_amount']:.2f}元")
|
|
print(f" 电费金额: {parsed_data['power_amount']:.2f}元")
|
|
print(f" 服务费金额: {parsed_data['service_amount']:.2f}元")
|
|
|
|
print(f"\n时间信息:")
|
|
print(f" 开始时间: {parsed_data['start_time']}")
|
|
print(f" 结束时间: {parsed_data['end_time']}")
|
|
|
|
print(f"\n订单信息:")
|
|
print(f" 订单号: {parsed_data['order_no']}")
|
|
|
|
return parsed_data
|
|
|
|
except Exception as e:
|
|
logging.error(f"解析25H充电信息命令失败: {str(e)}")
|
|
print(f"解析失败: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
def process_25h(self, data):
|
|
"""处理25H充电信息命令"""
|
|
try:
|
|
print("\n处理充电信息命令...")
|
|
|
|
# 解析命令
|
|
result = self.parse_25h(data)
|
|
if not result:
|
|
return False
|
|
|
|
# 可以在这里添加额外处理逻辑
|
|
# 例如:保存到数据库、发送到其他系统等
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"处理25H充电信息命令失败: {str(e)}")
|
|
print(f"处理失败: {str(e)}")
|
|
return False
|
|
|
|
|
|
def test_charge_info():
|
|
"""测试充电信息命令处理"""
|
|
print("开始测试充电信息命令处理...")
|
|
|
|
# 创建处理器
|
|
handler = Command25()
|
|
|
|
# 测试数据
|
|
test_data = bytes.fromhex(
|
|
"4A582503176656113606370161001901090C161201350700000000000022000000000000000000000000000000003033313736363536313133363036333732353031303931323231343338313534011901090C152C1901090C1612320F0000AC0D0000000000000000000000000000DE")
|
|
|
|
print("\n测试数据:")
|
|
print(f"25H数据: {test_data.hex().upper()}")
|
|
|
|
# 测试处理流程
|
|
result = handler.process_25h(test_data)
|
|
print(f"\n最终处理结果: {'成功' if result else '失败'}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
test_charge_info() |