163 lines
5.1 KiB
Python
163 lines
5.1 KiB
Python
import struct
|
||
import logging
|
||
from datetime import datetime
|
||
|
||
|
||
class Command2627:
|
||
def __init__(self):
|
||
self.command_26 = 0x26 # 停止充电命令
|
||
self.command_27 = 0x27 # 停止充电回复
|
||
|
||
def build_26h_command(self, pile_id, gun_no=1):
|
||
"""构建26H停止充电命令"""
|
||
try:
|
||
print("\n构建26H停止充电命令...")
|
||
|
||
frame = bytearray()
|
||
frame.extend(b'JX') # 帧起始标志
|
||
frame.append(self.command_26) # 命令码26H
|
||
frame.extend(pile_id) # 桩号
|
||
frame.append(0x01) # 数据加密方式(不加密)
|
||
|
||
# 构建数据域
|
||
data = bytearray()
|
||
|
||
# 添加时间标识 (BCD格式)
|
||
now = datetime.now()
|
||
data.extend(bytes([
|
||
((now.year - 2000) // 10 << 4) + ((now.year - 2000) % 10),
|
||
(now.month // 10 << 4) + (now.month % 10),
|
||
(now.day // 10 << 4) + (now.day % 10),
|
||
(now.hour // 10 << 4) + (now.hour % 10),
|
||
(now.minute // 10 << 4) + (now.minute % 10),
|
||
(now.second // 10 << 4) + (now.second % 10)
|
||
]))
|
||
|
||
# 添加枪号
|
||
data.append(gun_no)
|
||
|
||
# 计算数据长度
|
||
frame.extend(struct.pack("<H", len(data))) # 数据长度
|
||
|
||
# 添加数据域
|
||
frame.extend(data)
|
||
|
||
# 计算校验码
|
||
check = 0
|
||
for b in frame[2:]:
|
||
check ^= b
|
||
frame.append(check)
|
||
|
||
response = bytes(frame)
|
||
print(f"停止充电命令数据: {response.hex().upper()}")
|
||
return response
|
||
|
||
except Exception as e:
|
||
logging.error(f"构建26H停止充电命令失败: {str(e)}")
|
||
print(f"构建命令失败: {str(e)}")
|
||
return None
|
||
|
||
def parse_27h(self, data):
|
||
"""解析27H停止充电回复"""
|
||
try:
|
||
print("\n开始解析27H停止充电回复...")
|
||
print(f"接收数据: {data.hex().upper()}")
|
||
|
||
# 基础验证
|
||
if len(data) < 14 or data[0:2] != b'JX' or data[2] != self.command_27:
|
||
logging.warning("27H命令帧格式不正确")
|
||
return None
|
||
|
||
# 解析数据
|
||
pile_id = data[3:11] # 桩号
|
||
encrypt_mode = data[11] # 加密方式
|
||
data_len = struct.unpack("<H", data[12:14])[0] # 数据长度
|
||
|
||
# 解析数据域
|
||
data_field = data[14:14 + data_len]
|
||
|
||
def bcd_to_str(time_bytes):
|
||
# 直接使用字节值,不需要BCD解码
|
||
year = time_bytes[0] + 2000 # 25 + 2000 = 2025
|
||
month = time_bytes[1] # 直接使用字节值
|
||
day = time_bytes[2]
|
||
hour = time_bytes[3]
|
||
minute = time_bytes[4]
|
||
second = time_bytes[5]
|
||
return f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
|
||
|
||
# 解析时间标识
|
||
timestamp = bcd_to_str(data_field[0:6])
|
||
|
||
# 解析枪号
|
||
gun_no = data_field[6]
|
||
|
||
result = {
|
||
"pile_id": pile_id.hex().upper(),
|
||
"timestamp": timestamp,
|
||
"gun_no": gun_no
|
||
}
|
||
|
||
print("\n解析结果:")
|
||
print(f"桩号: {result['pile_id']}")
|
||
print(f"时间标识: {result['timestamp']}")
|
||
print(f"枪号: {result['gun_no']}")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logging.error(f"解析27H停止充电回复失败: {str(e)}")
|
||
print(f"解析失败: {str(e)}")
|
||
return None
|
||
|
||
def process_and_send(self, sock, pile_id, gun_no=1):
|
||
"""发送停止充电命令并处理回复"""
|
||
try:
|
||
# 构建并发送26H命令
|
||
command = self.build_26h_command(pile_id, gun_no)
|
||
if not command:
|
||
return False
|
||
|
||
if sock and hasattr(sock, 'send'):
|
||
sock.send(command)
|
||
print("停止充电命令发送成功")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logging.error(f"处理停止充电命令失败: {str(e)}")
|
||
print(f"处理失败: {str(e)}")
|
||
return False
|
||
|
||
|
||
def test_stop_charge():
|
||
"""测试停止充电命令"""
|
||
print("开始测试停止充电命令处理...")
|
||
|
||
# 创建处理器
|
||
handler = Command2627()
|
||
|
||
# 测试桩号
|
||
pile_id = bytes.fromhex("0317665611360637")
|
||
|
||
# 创建模拟socket
|
||
class MockSocket:
|
||
def send(self, data):
|
||
print(f"\n模拟发送数据:")
|
||
print(f"26H数据: {data.hex().upper()}")
|
||
|
||
mock_sock = MockSocket()
|
||
|
||
# 测试26H命令发送
|
||
print("\n测试发送停止充电命令:")
|
||
result = handler.process_and_send(mock_sock, pile_id)
|
||
print(f"命令发送结果: {'成功' if result else '失败'}")
|
||
|
||
# 测试27H回复解析
|
||
print("\n测试解析停止充电回复:")
|
||
test_reply = bytes.fromhex("4A58270317665611360637010700190109 0B251A0137")
|
||
handler.parse_27h(test_reply)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
test_stop_charge() |