2025-01-18 14:57:32 +08:00

201 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import struct
import logging
from datetime import datetime
class Command1F20:
def __init__(self):
self.command_1f = 0x1F # 启动充电命令
self.command_20 = 0x20 # 启动充电回复
def build_1f_command(self, pile_id, card_no="18771978016"):
"""构建1FH启动充电命令"""
try:
print("\n构建1FH启动充电命令...")
frame = bytearray()
frame.extend(b'JX') # 帧起始标志
frame.append(self.command_1f) # 命令码1FH
frame.extend(pile_id) # 桩号
frame.append(0x01) # 数据加密方式(不加密)
# 构建数据域
data = bytearray()
# 添加时间标识
now = datetime.now()
data.extend(bytes([
now.year - 2000,
now.month,
now.day,
now.hour,
now.minute,
now.second
]))
# 添加枪号
data.append(0x01)
# 添加卡号(32字节)
data.extend(card_no.encode().ljust(32, b'\x00'))
# 添加用户ID(32字节)
user_id = "84043"
data.extend(user_id.encode().ljust(32, b'\x00'))
# 添加组织机构代码(9字节)
data.extend(b'\x16'.ljust(9, b'\x00'))
# 添加控制方式(1字节) - 定金额充
data.append(0x03)
# 添加控制参数(4字节) - 1000元
data.extend(struct.pack("<I", 1000))
# 添加充电模式(1字节) - 正常充电
data.append(0x01)
# 添加启动方式(1字节) - 立即启动
data.append(0x01)
# 添加定时启动时间(6字节)
data.extend(bytes([0x19, 0x01, 0x09, 0x0B, 0x24, 0x2F]))
# 添加用户操作码(6字节)
data.extend(b'ws8quu')
# 添加计费模型选择(1字节) - 本地计费模型
data.append(0x01)
# 计算数据长度
frame.extend(struct.pack("<H", len(data)))
# 添加数据域
frame.extend(data)
# 计算校验码
check = 0
for b in frame[2:]:
check ^= b
frame.append(check)
command = bytes(frame)
print(f"启动充电命令数据: {command.hex().upper()}")
return command
except Exception as e:
logging.error(f"构建1FH启动充电命令失败: {str(e)}")
print(f"构建命令失败: {str(e)}")
return None
def parse_20h(self, data):
"""解析20H启动充电回复"""
try:
print("\n开始解析20H启动充电回复...")
print(f"接收数据: {data.hex().upper()}")
# 基础验证
if len(data) < 14 or data[0:2] != b'JX' or data[2] != self.command_20:
logging.warning("20H命令帧格式不正确")
return None
# 解析数据
pile_id = data[3:11] # 桩号
encrypt_mode = data[11] # 加密方式
data_len = struct.unpack("<H", data[12:14])[0] # 数据长度
data_field = data[14:14 + data_len] # 数据域
def parse_time(time_bytes):
"""解析时间字节"""
try:
year = time_bytes[0] + 2000
month = time_bytes[1]
day = time_bytes[2]
hour = time_bytes[3]
minute = time_bytes[4]
second = time_bytes[5]
return f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
except Exception as e:
print(f"时间解析错误: {e}")
return "Invalid time"
# 解析数据域
parsed_data = {
"pile_id": pile_id.hex().upper(),
"timestamp": parse_time(data_field[0:6]),
"gun_no": data_field[6],
"card_no": data_field[7:39].decode('ascii').rstrip('\x00'),
"user_id": data_field[39:71].decode('ascii').rstrip('\x00'),
"execution_result": data_field[-2], # 1-成功2-失败
"fail_reason": data_field[-1] if data_field[-2] == 2 else 0
}
print("\n解析结果:")
print(f"桩号: {parsed_data['pile_id']}")
print(f"时间标识: {parsed_data['timestamp']}")
print(f"枪号: {parsed_data['gun_no']}")
print(f"卡号: {parsed_data['card_no']}")
print(f"用户ID: {parsed_data['user_id']}")
print(f"执行结果: {'成功' if parsed_data['execution_result'] == 1 else '失败'}")
if parsed_data['execution_result'] == 2:
print(f"失败原因代码: {parsed_data['fail_reason']}")
return parsed_data
except Exception as e:
logging.error(f"解析20H启动充电回复失败: {str(e)}")
print(f"解析失败: {str(e)}")
return None
def process_and_respond(self, pile_id, sock):
"""发送启动充电命令"""
try:
# 构建并发送1FH命令
command = self.build_1f_command(pile_id)
if not command:
return False
if sock and hasattr(sock, 'send'):
sock.send(command)
print("启动充电命令发送成功")
return True
except Exception as e:
logging.error(f"处理启动充电命令失败: {str(e)}")
print(f"处理失败: {str(e)}")
return False
def test_start_charge():
"""测试启动充电命令"""
print("开始测试启动充电命令处理...")
# 创建处理器
handler = Command1F20()
# 测试桩号
pile_id = bytes.fromhex("0317665611360637")
# 创建模拟socket
class MockSocket:
def send(self, data):
print(f"\n模拟发送数据:")
print(f"1FH数据: {data.hex().upper()}")
mock_sock = MockSocket()
# 测试1FH命令发送
print("\n测试发送启动充电命令:")
result = handler.process_and_respond(pile_id, mock_sock)
print(f"命令发送结果: {'成功' if result else '失败'}")
# 测试20H回复解析
print("\n测试解析启动充电回复:")
test_reply = bytes.fromhex(
"4A5820031766561136063701680019010 90B242D013138373731393738303136313535353636313000000000000000000000000000003834303433000000000000000000000000000000000000000000000000000000160000000000000000000003F4010000040119010 90B242F7773387175750101 00D7")
handler.parse_20h(test_reply)
if __name__ == "__main__":
test_start_charge()