2025-01-18 09:10:52 +08:00

279 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import socket
import struct
import logging
import time
from datetime import datetime
import binascii
class Command02:
def __init__(self):
self.command = 0x02 # 回复命令码02H
self.qr_fixed = "https://platform.enneagon.cn/ScanCharging?connectorCode="
def parse_pile_id(self, pile_id_bytes):
"""解析桩号"""
try:
vendor_id = struct.unpack("<H", pile_id_bytes[0:2])[0] # 运营商编号
gun_info = pile_id_bytes[2] # 枪数量信息
if gun_info <= 31:
gun_type = "交流"
gun_count = gun_info
elif 51 <= gun_info <= 81:
gun_type = "直流"
gun_count = gun_info - 50
else:
gun_type = "未知"
gun_count = gun_info
site_id = int.from_bytes(pile_id_bytes[3:6], 'little') # 站点编号
addr_in_site = struct.unpack("<H", pile_id_bytes[6:8])[0] # 站内桩地址
return {
"vendor_id": f"{vendor_id:04d}",
"gun_type": gun_type,
"gun_count": gun_count,
"site_id": f"{site_id:06d}",
"addr_in_site": f"{addr_in_site:04d}"
}
except Exception as e:
logging.error(f"Parse pile ID failed: {str(e)}")
print(f"解析桩号失败: {str(e)}")
return None
def validate_frame(self, data):
"""验证帧格式"""
try:
print(f"\n验证帧格式:")
print(f"数据内容: {data.hex()}")
print(f"数据长度: {len(data)}字节")
# 1. 基本长度检查
if len(data) < 14:
print("数据长度不足14字节无效")
return False
# 2. 检查帧起始标志
if data[0:2] != b'JX':
print("帧起始标志不是'JX',无效")
return False
# 3. 获取并检查数据域长度
data_len = struct.unpack("<H", data[12:14])[0]
print(f"数据域长度字段值: {data_len}")
# 4. 计算并检查总长度
expected_len = 16 + data_len # 固定部分(14) + 数据域 + 校验码(1)
print(f"期望总长度: {expected_len}")
print(f"实际长度: {len(data)}")
if len(data) != expected_len:
print("数据总长度不匹配")
return False
# 5. 验证校验码
check_data = data[2:-1] # 从命令字节到校验码前的数据
calculated_check = 0
for b in check_data:
calculated_check ^= b
received_check = data[-1]
print(f"计算得到的校验码: {calculated_check:02X}")
print(f"接收到的校验码: {received_check:02X}")
if calculated_check != received_check:
print("校验码不匹配")
return False
print("帧格式验证通过")
return True
except Exception as e:
print(f"帧格式验证出错: {str(e)}")
return False
def parse_01h(self, data):
"""解析01H命令数据"""
try:
print("\n开始解析01H命令...")
if not self.validate_frame(data):
raise ValueError("帧格式验证失败")
command = data[2]
pile_id = data[3:11]
encrypt_mode = data[11]
data_len = struct.unpack("<H", data[12:14])[0]
data_field = data[14:14 + data_len]
# 解析时间标识
time_bytes = data_field[0:6]
year = time_bytes[0] + 2000
month, day, hour, minute, second = time_bytes[1:6]
timestamp = f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
# 解析密钥版本和校验密文
key_version = struct.unpack("<H", data_field[6:8])[0]
check_text = data_field[8:16]
# 解析桩号
pile_info = self.parse_pile_id(pile_id)
result = {
"command": command,
"pile_id": pile_id,
"pile_info": pile_info,
"encrypt_mode": encrypt_mode,
"timestamp": timestamp,
"key_version": key_version,
"check_text": check_text
}
print("\n解析结果:")
print(f"命令码: {command:02X}")
print(f"桩号信息: {pile_info}")
print(f"加密方式: {encrypt_mode:02X}")
print(f"时间标识: {timestamp}")
print(f"密钥版本: {key_version}")
print(f"校验密文: {check_text.hex()}")
return result
except Exception as e:
logging.error(f"解析01H命令失败: {str(e)}")
print(f"解析失败: {str(e)}")
return None
def build_02h_response(self, pile_id, allow=True, reject_reason=0):
"""构建02H响应命令"""
try:
print("\n构建02H响应...")
frame = bytearray()
frame.extend(b'JX') # 帧起始标志
frame.append(self.command) # 命令
frame.extend(pile_id) # 桩号
frame.append(0x01) # 数据加密方式
data = bytearray()
# 时间标识
now = datetime.now()
data.extend(struct.pack("<BBBBBB",
now.year - 2000, now.month, now.day,
now.hour, now.minute, now.second))
# 请求结果
data.append(0x01 if allow else 0x02)
# 拒绝原因
data.append(reject_reason)
if allow:
# 二维码固定段
data.extend(self.qr_fixed.ljust(100, '\x00').encode())
# 二维码枪号段数量
data.append(0x02)
# 二维码枪号段1和2
pile_id_str = ''.join([f"{b:02X}" for b in pile_id])
gun1 = f"{pile_id_str}001"
gun2 = f"{pile_id_str}002"
data.extend(gun1.ljust(20, '\x00').encode())
data.extend(gun2.ljust(20, '\x00').encode())
# 数据域长度
frame.extend(struct.pack("<H", len(data)))
# 数据域
frame.extend(data)
# 计算校验码
check = 0
for b in frame[2:]:
check ^= b
frame.append(check)
print("响应数据构建成功:")
print(f"数据内容: {frame.hex()}")
print(f"数据长度: {len(frame)}字节")
return bytes(frame)
except Exception as e:
logging.error(f"构建02H响应失败: {str(e)}")
print(f"构建响应失败: {str(e)}")
return None
def process_and_respond(self, received_data, sock):
"""处理收到的01H命令并回复02H"""
try:
print("\n处理01H命令并生成响应...")
# 解析收到的01H命令
parsed = self.parse_01h(received_data)
if not parsed:
return False
# 构建02H响应
allow = True # 这里可以根据业务逻辑判断是否允许连接
reject_reason = 0
response = self.build_02h_response(parsed["pile_id"], allow, reject_reason)
if not response:
return False
# 发送响应
if hasattr(sock, 'send'):
sock.send(response)
return True
except Exception as e:
logging.error(f"处理和响应失败: {str(e)}")
print(f"处理失败: {str(e)}")
return False
def test_command():
"""测试函数"""
print("开始测试01H/02H命令处理...")
# 配置日志
# logging.basicConfig(
# filename='command_response_02h.log',
# level=logging.INFO,
# format='%(asctime)s - %(levelname)s - %(message)s',
# encoding='utf-8'
# )
# 创建响应处理器
handler = Command02()
# 测试数据 - 使用实际收到的数据
test_data = bytes.fromhex("4A5801031767631136065701100019010909371501000000000000000000004D")
print("\n测试数据:")
print(f"十六进制: {test_data.hex()}")
print(f"长度: {len(test_data)}字节")
# 创建模拟socket
class MockSocket:
def send(self, data):
print(f"\n模拟发送响应数据:")
print(f"数据内容: {data.hex()}")
print(f"数据长度: {len(data)}字节")
mock_sock = MockSocket()
# 测试完整处理流程
result = handler.process_and_respond(test_data, mock_sock)
print(f"\n最终处理结果: {'成功' if result else '失败'}")
if __name__ == "__main__":
test_command()