no message

This commit is contained in:
MATRIX\29620 2025-03-14 11:02:02 +08:00
parent 68b599a660
commit 83485d8afb
43 changed files with 30912 additions and 1906 deletions

2
.idea/.name generated
View File

@ -1 +1 @@
setup.py
main.py

View File

@ -0,0 +1,38 @@
import struct
import logging
class Command01:
def __init__(self):
self.pile_id = b"\x00\x27\x02\x12\x34\x56\x12\x34" # 示例桩号
def build_01h_request(self):
"""构建01H请求连接帧"""
frame = bytearray([0x4A, 0x58]) # 帧头: "JX"
frame.append(0x01) # 命令码: 01H
frame.extend(self.pile_id) # 桩号 (8字节)
frame.append(0x01) # 数据加密方式: 0x01 (不加密)
frame.extend([0x00, 0x00]) # 数据域长度 (0)
checksum = 0
for b in frame[2:-1]:
checksum ^= b
frame.append(checksum) # 校验码
return frame
def process_01h_response(self, response, sock):
"""处理02H响应由平台调用"""
if response and len(response) >= 14 and response[2] == 0x02:
result = response[14] # 请求结果 (0x01允许, 0x00拒绝)
if result == 0x01:
logging.info("连接请求成功")
return True
else:
logging.error(f"连接被拒绝,原因: {response[15]}")
return False
logging.warning("无效的02H响应")
return False
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
cmd = Command01()
request = cmd.build_01h_request()
print(f"01H请求帧: {request.hex().upper()}")

View File

@ -1,332 +1,46 @@
import socket
import struct
import logging
import time
from datetime import datetime
import binascii
class Command02:
def __init__(self):
self.command = 0x02 # 回复命令码02H
self.qr_fixed = "https://platform.enneagon.cn/ScanCharging?connectorCode="
def parse_pile_id(self, pile_id_bytes):
"""解析桩号"""
try:
vendor_id = struct.unpack("<H", pile_id_bytes[0:2])[0] # 运营商编号
gun_info = pile_id_bytes[2] # 枪数量信息
if gun_info <= 31:
gun_type = "交流"
gun_count = gun_info
elif 51 <= gun_info <= 81:
gun_type = "直流"
gun_count = gun_info - 50
else:
gun_type = "未知"
gun_count = gun_info
site_id = int.from_bytes(pile_id_bytes[3:6], 'little') # 站点编号
addr_in_site = struct.unpack("<H", pile_id_bytes[6:8])[0] # 站内桩地址
return {
"vendor_id": f"{vendor_id:04d}",
"gun_type": gun_type,
"gun_count": gun_count,
"site_id": f"{site_id:06d}",
"addr_in_site": f"{addr_in_site:04d}"
}
except Exception as e:
logging.error(f"Parse pile ID failed: {str(e)}")
print(f"解析桩号失败: {str(e)}")
return None
def validate_frame(self, data):
"""验证帧格式"""
try:
print("\n开始验证帧格式:")
print(f"数据内容: {data.hex().upper()}")
print(f"数据长度: {len(data)}字节")
# 1. 基本格式检查
if len(data) < 14:
print("数据长度不足14字节")
return False
if data[0:2] != b'JX':
print("帧起始标志不是'JX'")
return False
# 2. 获取数据域长度
data_len = struct.unpack("<H", data[12:14])[0]
expected_total_len = 14 + data_len + 1 # 头部(14) + 数据域 + 校验码(1)
# 如果数据长度不匹配,尝试修正数据
if len(data) != expected_total_len:
print(f"数据长度不匹配,尝试修正...")
if len(data) > expected_total_len:
print(f"截断多余数据")
data = data[:expected_total_len]
else:
print("数据不完整")
return False
# 3. 显示数据结构
print(f"\n数据结构分析:")
print(f"起始标识: {data[0:2].hex().upper()}")
print(f"命令字: {data[2]:02X}")
print(f"桩号: {data[3:11].hex().upper()}")
print(f"加密方式: {data[11]:02X}")
print(f"数据长度: {data_len}")
print(f"数据域: {data[14:14 + data_len].hex().upper()}")
print(f"校验码: {data[-1]:02X}")
# 4. 校验码验证
check_data = data[2:-1]
calculated_check = 0
for b in check_data:
calculated_check ^= b
if calculated_check != data[-1]:
print(f"校验码不匹配: 计算值={calculated_check:02X}, 接收值={data[-1]:02X}")
return False
return True
except Exception as e:
print(f"验证帧格式异常: {str(e)}")
return False
def debug_print_fields(self, data):
"""打印数据包的详细字段"""
print("\n===== 数据包解析 =====")
print("1. 固定头部:")
print(f" 起始标识 (2字节): {data[0:2].hex().upper()}")
print(f" 命令码 (1字节): {data[2]:02X}")
print(f" 桩号 (8字节): {data[3:11].hex().upper()}")
print(f" 加密方式 (1字节): {data[11]:02X}")
print(f" 数据域长度 (2字节): {struct.unpack('<H', data[12:14])[0]}")
data_len = struct.unpack("<H", data[12:14])[0]
print("\n2. 数据域:")
print(f" 时间标识 (6字节): {data[14:20].hex().upper()}")
print(f" 密钥版本 (2字节): {data[20:22].hex().upper()}")
print(f" 校验密文 (8字节): {data[22:30].hex().upper()}")
print(f"\n3. 校验码 (1字节): {data[-1]:02X}")
# 检查是否有多余字节
expected_len = 14 + data_len + 1
if len(data) > expected_len:
print(f"\n警告: 发现多余字节:")
print(f"多余字节内容: {data[expected_len:].hex().upper()}")
def parse_01h(self, data):
"""解析01H命令数据"""
try:
print("\n开始解析01H命令...")
if not self.validate_frame(data):
raise ValueError("帧格式验证失败")
# 提取基本信息
command = data[2]
pile_id = data[3:11]
encrypt_mode = data[11]
data_field = data[14:-1] # 从数据域开始到校验码之前
# 解析数据域内容
time_bytes = data_field[0:6]
year = time_bytes[0] + 2000
month, day, hour, minute, second = time_bytes[1:6]
timestamp = f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
key_version = struct.unpack("<H", data_field[6:8])[0]
check_text = data_field[8:16]
result = {
"command": command,
"pile_id": pile_id,
"encrypt_mode": encrypt_mode,
"timestamp": timestamp,
"key_version": key_version,
"check_text": check_text
}
return result
except Exception as e:
print(f"解析失败: {str(e)}")
return None
def build_02h_response(self, pile_id, allow=True, reject_reason=0):
"""构建02H响应命令"""
try:
frame = bytearray()
frame.extend(b'JX') # 帧起始标志
frame.append(self.command) # 命令
def build_02h_response(self, pile_id, allow=True, gun_count=1):
frame = bytearray([0x4A, 0x58, 0x02]) # 帧头 + 命令
frame.extend(pile_id) # 桩号
frame.append(0x01) # 数据加密方式
frame.append(0x01) # 数据加密方式(不加密)
# 数据域
data = bytearray()
# 添加时间标识 (6字节)
now = datetime.now()
data.extend(struct.pack("<BBBBBB",
now.year - 2000, now.month, now.day,
now.hour, now.minute, now.second))
# 添加请求结果和拒绝原因
data.append(0x01 if allow else 0x02)
data.append(reject_reason)
current_time = time.gmtime() # 使用 UTC 时间
time_bytes = bytearray([
current_time.tm_year - 2000,
current_time.tm_mon,
current_time.tm_mday,
current_time.tm_hour,
current_time.tm_min,
current_time.tm_sec
])
data.extend(time_bytes)
data.append(0x01 if allow else 0x02) # 请求结果
data.append(0x00) # 拒绝原因(无)
if allow:
# 添加二维码固定段 (100字节)
fixed_part = self.qr_fixed.encode()
data.extend(fixed_part.ljust(100, b'\x00'))
# 二维码字段,使用桩号生成唯一 URL
qr_fixed = f"http://example.com/{pile_id.hex().upper()}".encode('ascii') + b"\x00" * (100 - len(f"http://example.com/{pile_id.hex().upper()}"))
data.extend(qr_fixed)
num_guns = gun_count # 动态设置枪数量
data.append(num_guns)
# 动态生成每把枪的二维码
for i in range(num_guns):
qr_gun = f"http://example.com/gun{i+1}_{pile_id.hex().upper()}".encode('ascii') + b"\x00" * (20 - len(f"http://example.com/gun{i+1}_{pile_id.hex().upper()}"))
data.extend(qr_gun)
# 添加二维码枪号段数量
data.append(0x02) # 2个充电枪
# 添加枪号段 (每个20字节)
pile_id_str = ''.join([f"{b:02X}" for b in pile_id])
gun1 = f"{pile_id_str}001"
gun2 = f"{pile_id_str}002"
data.extend(gun1.encode().ljust(20, b'\x00'))
data.extend(gun2.encode().ljust(20, b'\x00'))
# 添加数据域长度
frame.extend(struct.pack("<H", len(data)))
# 添加数据域
frame.extend(struct.pack('<H', len(data))) # 数据域长度
frame.extend(data)
# 计算并添加校验码
check = 0
for b in frame[2:]: # 从命令字节开始异或
check ^= b
frame.append(check)
return bytes(frame)
except Exception as e:
logging.error(f"构建02H响应失败: {str(e)}")
return None
def process_and_respond(self, received_data, sock):
"""处理收到的01H命令并回复02H"""
try:
# 基础验证
if not self.validate_frame(received_data):
logging.error("01H命令帧格式验证失败")
return False
# 提取必要信息
pile_id = received_data[3:11] # 桩号
time_bytes = received_data[14:20] # 时间标识
key_version = struct.unpack("<H", received_data[20:22])[0] # 密钥版本
# 检查时间是否在合理范围内
cmd_time = datetime(2000 + time_bytes[0], time_bytes[1], time_bytes[2],
time_bytes[3], time_bytes[4], time_bytes[5])
time_diff = abs((datetime.now() - cmd_time).total_seconds())
if time_diff > 600: # 超过10分钟
logging.warning(f"时间差异过大: {time_diff}")
response = self.build_02h_response(pile_id, False, 6) # 拒绝原因6-时差过大
else:
# 这里可以添加更多的验证逻辑
response = self.build_02h_response(pile_id, True, 0)
if response and hasattr(sock, 'send'):
sock.send(response)
logging.info(f"成功发送02H响应, 长度: {len(response)}字节")
return True
return False
except Exception as e:
logging.error(f"处理01H命令失败: {str(e)}")
return False
def test_command():
"""测试函数"""
print("开始测试01H/02H命令处理...")
# 创建响应处理器
handler = Command02()
# 直接使用完整的测试数据,不进行裁剪
test_data = bytes.fromhex("4A5801031767631136065701100019010909371501000000000000000000004D")
print("\n测试数据:")
print(f"数据内容: {test_data.hex().upper()}")
print(f"数据长度: {len(test_data)}字节")
# 修改validate_frame方法来处理多余字节
def validate_frame(self, data):
"""验证帧格式"""
try:
print("\n开始验证帧格式:")
print(f"数据内容: {data.hex().upper()}")
print(f"数据长度: {len(data)}字节")
# 1. 基本格式检查
if len(data) < 14:
print("数据长度不足14字节")
return False
if data[0:2] != b'JX':
print("帧起始标志不是'JX'")
return False
# 2. 获取数据域长度
data_len = struct.unpack("<H", data[12:14])[0]
expected_total_len = 14 + data_len + 1 # 头部(14) + 数据域 + 校验码(1)
print(f"\n数据结构分析:")
print(f"起始标识: {data[0:2].hex().upper()}")
print(f"命令字: {data[2]:02X}")
print(f"桩号: {data[3:11].hex().upper()}")
print(f"加密方式: {data[11]:02X}")
print(f"数据长度: {data_len}")
print(f"数据域: {data[14:14 + data_len].hex().upper()}")
print(f"校验码: {data[-1]:02X}")
# 3. 校验码验证
check_data = data[2:-1] # 从命令字节开始到校验码之前
calculated_check = 0
for b in check_data:
calculated_check ^= b
if calculated_check != data[-1]:
print(f"校验码不匹配: 计算值={calculated_check:02X}, 接收值={data[-1]:02X}")
return False
print("校验通过")
return True
except Exception as e:
print(f"验证帧格式异常: {str(e)}")
return False
# 覆盖原方法
handler.validate_frame = validate_frame.__get__(handler)
# 创建模拟socket
class MockSocket:
def send(self, data):
print(f"\n模拟发送响应数据:")
print(f"数据内容: {data.hex().upper()}")
print(f"数据长度: {len(data)}字节")
mock_sock = MockSocket()
# 测试完整处理流程
result = handler.process_and_respond(test_data, mock_sock)
print(f"\n最终处理结果: {'成功' if result else '失败'}")
if __name__ == "__main__":
test_command()
# 计算校验码
checksum = 0
for b in frame[2:-1]:
checksum ^= b
frame.append(checksum)
logging.debug(f"发送02H响应: {frame.hex().upper()}")
return frame

View File

@ -1,137 +1,45 @@
import struct
import logging
class Command03:
def __init__(self):
self.command = 0x03 # 03H命令码
def parse_03h(self, data):
"""
解析03H登录信息命令
:param data: 完整的03H命令报文
:return: 解析后的字典或None
"""
try:
# 验证基本帧格式
if len(data) < 14 or data[0:2] != b'JX' or data[2] != 0x03:
logging.warning("03H命令帧格式不正确")
def process_03h(self, data):
if len(data) < 14:
logging.warning("03H数据长度不足")
return None
# 提取桩号
pile_id_bytes = data[3:11]
pile_id = data[3:11]
data_start = 14
data_len = struct.unpack('<H', data[12:14])[0]
data_end = data_start + data_len
if len(data) < data_end + 1:
logging.warning("03H数据域长度不匹配")
return None
# 提取时间标识
time_bytes = data[14:20]
year = time_bytes[0] + 2000
month, day, hour, minute, second = time_bytes[1:6]
timestamp = f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
# 提取字段
timestamp = data[data_start:data_start + 6]
login_data = data[data_start + 6:data_end]
# 解析桩型号 (16字节ASCII)
pile_type = data[6:22].decode('ascii').rstrip('\x00')
# 解析硬件版本 (2字节压缩BCD)
hw_version_bytes = data[22:24]
hw_version_major = hw_version_bytes[0] >> 4
hw_version_minor = hw_version_bytes[0] & 0x0F
hw_version_patch = hw_version_bytes[1] >> 4
hw_version = f"{hw_version_major}.{hw_version_minor}.{hw_version_patch}"
# 解析软件版本 (2字节压缩BCD)
sw_version_bytes = data[24:26]
sw_version_major = sw_version_bytes[0] >> 4
sw_version_minor = sw_version_bytes[0] & 0x0F
sw_version_patch = sw_version_bytes[1] >> 4
sw_version = f"{sw_version_major}.{sw_version_minor}.{sw_version_patch}"
# 解析次级单元硬件版本
sub_hw_version_bytes = data[26:28]
sub_hw_version_major = sub_hw_version_bytes[0] >> 4
sub_hw_version_minor = sub_hw_version_bytes[0] & 0x0F
sub_hw_version_patch = sub_hw_version_bytes[1] >> 4
sub_hw_version = f"{sub_hw_version_major}.{sub_hw_version_minor}.{sub_hw_version_patch}"
# 解析次级单元软件版本
sub_sw_version_bytes = data[28:30]
sub_sw_version_major = sub_sw_version_bytes[0] >> 4
sub_sw_version_minor = sub_sw_version_bytes[0] & 0x0F
sub_sw_version_patch = sub_sw_version_bytes[1] >> 4
sub_sw_version = f"{sub_sw_version_major}.{sub_sw_version_minor}.{sub_sw_version_patch}"
# 解析直流模块类型
dc_module_type = data[30]
# 解析直流模块总数
dc_module_count = data[31]
# 解析直流模块单模块功率
dc_module_power = data[32]
# 解析计费模型版本
fee_model_version = struct.unpack("<H", data[33:35])[0]
# 打印解析结果
print("\n03H命令解析结果:")
print(f"桩号: {pile_id_bytes.hex()}")
print(f"时间标识: {timestamp}")
print(f"桩型号: {pile_type}")
print(f"硬件版本: {hw_version}")
print(f"软件版本: {sw_version}")
print(f"次级单元硬件版本: {sub_hw_version}")
print(f"次级单元软件版本: {sub_sw_version}")
print(f"直流模块类型: {dc_module_type}")
print(f"直流模块总数: {dc_module_count}")
print(f"直流模块单模块功率: {dc_module_power}kW")
print(f"计费模型版本: {fee_model_version}")
return {
"pile_id": pile_id_bytes.hex(),
"timestamp": timestamp,
"pile_type": pile_type,
"hw_version": hw_version,
"sw_version": sw_version,
"sub_hw_version": sub_hw_version,
"sub_sw_version": sub_sw_version,
"dc_module_type": dc_module_type,
"dc_module_count": dc_module_count,
"dc_module_power": dc_module_power,
"fee_model_version": fee_model_version
# 解析登录数据
device_model = login_data[0:16].decode('ascii', errors='ignore').rstrip('\x00') # 设备型号
protocol_version = login_data[16] # 协议版本
gun_count = login_data[17] # 枪数量
max_power = struct.unpack('<H', login_data[18:20])[0] # 最大功率
last_heartbeat_time = login_data[20:26] # 上次心跳时间
# 其他字段根据协议解析
parsed_login_data = {
"device_model": device_model,
"protocol_version": f"0x{protocol_version:02X}",
"gun_count": gun_count,
"max_power_w": max_power,
"last_heartbeat_time": f"20{last_heartbeat_time[0]:02X}-{last_heartbeat_time[1]:02X}-{last_heartbeat_time[2]:02X} {last_heartbeat_time[3]:02X}:{last_heartbeat_time[4]:02X}:{last_heartbeat_time[5]:02X}",
"raw_login_data": login_data.hex().upper()
}
except Exception as e:
logging.error(f"解析03H命令失败: {str(e)}")
return None
def process_03h(self, data):
"""
处理03H登录信息命令
:param data: 完整的03H命令报文
:return: 是否成功处理
"""
try:
parsed_data = self.parse_03h(data)
if parsed_data is None:
logging.warning("03H命令解析失败")
return False
# 可以在这里添加额外的处理逻辑,比如记录日志、更新状态等
logging.info(f"成功解析03H登录信息: 桩号 {parsed_data['pile_id']}")
return True
except Exception as e:
logging.error(f"处理03H命令出错: {str(e)}")
return False
# 测试用示例
if __name__ == "__main__":
# 示例报文
test_data = bytes.fromhex(
"4A 58 03 03 17 67 63 11 36 06 57 01 42 00 19 01 09 09 37 1B 41 49 4F 44 43 32 50 31 42 56 39 30 30 00 00 00 00 00 01 04 00 00 00 00 00 00 00 1B 00 00 00 19 01 09 09 37 1B 0F 00 05 19 00 19 00 0F 00 0F 00 00 00 00 00 00 00 00 00 00 00 00 00 0C")
parser = Command03()
parser.process_03h(test_data)
# 转换为键值对
parsed_data = {
"pile_id": pile_id.hex().upper(),
"timestamp": f"20{timestamp[0]:02X}-{timestamp[1]:02X}-{timestamp[2]:02X} {timestamp[3]:02X}:{timestamp[4]:02X}:{timestamp[5]:02X}",
"login_data": parsed_login_data
}
logging.info(f"03H解析结果: {parsed_data}")
return parsed_data # 返回解析结果

View File

@ -0,0 +1,53 @@
import time
import struct
import logging
class Command04:
def __init__(self):
self.pile_id = b"\x00\x27\x02\x12\x34\x56\x12\x34" # 示例桩号
def build_04h_disconnect(self, pile_id, reason=0x00):
"""构建04H断开连接帧"""
frame = bytearray([0x4A, 0x58]) # 帧头: "JX"
frame.append(0x04) # 命令码: 04H
frame.extend(pile_id) # 桩号 (8字节)
frame.append(0x01) # 数据加密方式: 0x01 (不加密)
frame.extend([0x00, 0x00]) # 数据域长度 (初始为0后续更新)
# 数据域
data = bytearray()
current_time = time.localtime()
data.extend([
current_time.tm_year - 2000, current_time.tm_mon, current_time.tm_mday,
current_time.tm_hour, current_time.tm_min, current_time.tm_sec
]) # 时间标识 (6字节)
data.append(reason) # 断开原因 (0x00: 正常断开)
frame.extend(data)
struct.pack_into(">H", frame, 12, len(data)) # 更新数据域长度
# 校验码
checksum = 0
for b in frame[2:-1]:
checksum ^= b
frame.append(checksum)
return frame
def process_04h(self, data):
"""处理04H断开连接由接收方调用"""
if data and len(data) >= 20 and data[2] == 0x04:
pile_id = data[3:11]
reason = data[20]
logging.info(f"接收到断开请求,桩号: {pile_id.hex().upper()},原因: {reason:02X}")
return True
logging.warning("无效的04H数据")
return False
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
cmd = Command04()
frame = cmd.build_04h_disconnect(b"\x00\x27\x02\x12\x34\x56\x12\x34")
print(f"04H断开帧: {frame.hex().upper()}")

View File

@ -0,0 +1,33 @@
import struct
class Command05:
def __init__(self):
self.pile_id = b"\x00\x27\x02\x12\x34\x56\x12\x34" # 示例桩号
def build_05h_request(self):
"""构建05H请求对时帧"""
frame = bytearray([0x4A, 0x58]) # 帧头: "JX"
frame.append(0x05) # 命令码: 05H
frame.extend(self.pile_id) # 桩号 (8字节)
frame.append(0x01) # 数据加密方式: 0x01 (不加密)
frame.extend([0x00, 0x00]) # 数据域长度 (0)
checksum = 0
for b in frame[2:-1]:
checksum ^= b
frame.append(checksum) # 校验码
return frame
def process_05h_response(self, response):
"""处理06H响应由充电桩调用"""
if response and len(response) >= 14 and response[2] == 0x06:
logging.info("对时成功")
return True
logging.warning("无效的06H响应")
return False
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.INFO)
cmd = Command05()
request = cmd.build_05h_request()
print(f"05H请求帧: {request.hex().upper()}")

View File

@ -0,0 +1,54 @@
import time
import struct
class Command06:
def __init__(self):
self.pile_id = b"\x00\x27\x02\x12\x34\x56\x12\x34" # 示例桩号
def build_06h_response(self, pile_id):
"""构建06H下发对时帧"""
frame = bytearray([0x4A, 0x58]) # 帧头: "JX"
frame.append(0x06) # 命令码: 06H
frame.extend(pile_id) # 桩号 (8字节)
frame.append(0x01) # 数据加密方式: 0x01 (不加密)
frame.extend([0x00, 0x00]) # 数据域长度 (初始为0后续更新)
# 数据域
data = bytearray()
current_time = time.localtime()
data.extend([
current_time.tm_year - 2000, current_time.tm_mon, current_time.tm_mday,
current_time.tm_hour, current_time.tm_min, current_time.tm_sec
]) # 时间标识 (6字节)
frame.extend(data)
struct.pack_into(">H", frame, 12, len(data)) # 更新数据域长度
# 校验码
checksum = 0
for b in frame[2:-1]:
checksum ^= b
frame.append(checksum)
return frame
def process_06h(self, data):
"""处理06H数据由充电桩调用"""
if data and len(data) >= 20 and data[2] == 0x06:
time_data = data[14:20]
year = 2000 + time_data[0]
month, day, hour, minute, second = time_data[1:6]
logging.info(f"收到对时: {year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}")
return True
logging.warning("无效的06H数据")
return False
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.INFO)
cmd = Command06()
response = cmd.build_06h_response(b"\x00\x27\x02\x12\x34\x56\x12\x34")
print(f"06H响应帧: {response.hex().upper()}")

View File

@ -1,96 +1,69 @@
import struct
import logging
from datetime import datetime
class Command07:
def __init__(self):
self.command = 0x07 # 07H命令码
self.command = 0x07
def parse_07h(self, data):
"""
解析07H回复对时命令
:param data: 完整的07H命令报文
:return: 解析后的字典或None
"""
def process_07h(self, data, sock):
try:
# 验证基本帧格式
if len(data) < 14 or data[0:2] != b'JX' or data[2] != 0x07:
logging.warning("07H命令帧格式不正确")
return None
# 提取桩号
pile_id_bytes = data[3:11]
# 提取时间标识
time_bytes = data[14:20]
year = time_bytes[0] + 2000
month, day, hour, minute, second = time_bytes[1:6]
timestamp = f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
# 解析对时结果
time_sync_result = data[20]
time_sync_result_text = "成功" if time_sync_result == 0x01 else "失败"
# 解析失败原因(如果有)
failure_reason = data[21] if len(data) > 21 else 0x00
failure_reason_text = {
0x00: "",
0x01: "数据格式异常"
}.get(failure_reason, "未知原因")
# 打印解析结果
print("\n07H命令解析结果:")
print(f"桩号: {pile_id_bytes.hex()}")
print(f"时间标识: {timestamp}")
print(f"对时结果: {time_sync_result_text}")
print(f"失败原因: {failure_reason_text}")
return {
"pile_id": pile_id_bytes.hex(),
"timestamp": timestamp,
"time_sync_result": time_sync_result,
"time_sync_result_text": time_sync_result_text,
"failure_reason": failure_reason,
"failure_reason_text": failure_reason_text
}
except Exception as e:
logging.error(f"解析07H命令失败: {str(e)}")
return None
def process_07h(self, data):
"""
处理07H回复对时命令
:param data: 完整的07H命令报文
:return: 是否成功处理
"""
try:
parsed_data = self.parse_07h(data)
if parsed_data is None:
logging.warning("07H命令解析失败")
if len(data) < 14 or data[0:2] != b'JX' or data[2] != self.command:
logging.error("07H帧格式错误")
return False
# 记录对时结果日志
if parsed_data['time_sync_result'] == 0x01:
logging.info(f"成功处理07H对时命令: 桩号 {parsed_data['pile_id']} 对时成功")
else:
logging.warning(
f"处理07H对时命令: 桩号 {parsed_data['pile_id']} 对时失败, 原因: {parsed_data['failure_reason_text']}")
pile_id = data[3:11]
response = self.build_07h_response(pile_id)
if response and sock.send(response):
logging.info(f"发送07H响应: {response.hex().upper()}")
return True
return False
except Exception as e:
logging.error(f"处理07H命令出错: {str(e)}")
logging.error(f"处理07H失败: {str(e)}")
return False
def build_07h_response(self, pile_id):
try:
frame = bytearray()
frame.extend(b'JX')
frame.append(self.command)
frame.extend(pile_id)
frame.append(0x01)
data = bytearray()
now = datetime.now()
data.extend(struct.pack("<BBBBBB",
now.year - 2000, now.month, now.day,
now.hour, now.minute, now.second))
frame.extend(struct.pack("<H", len(data)))
frame.extend(data)
check = 0
for b in frame[2:]:
check ^= b
frame.append(check)
return bytes(frame)
except Exception as e:
logging.error(f"构建07H失败: {str(e)}")
return None
def test_command(self):
"""测试07H命令"""
print("开始测试07H命令...")
class MockSocket:
def send(self, data):
print(f"模拟发送07H响应: {data.hex().upper()}")
mock_sock = MockSocket()
test_pile_id = bytes.fromhex("0317665611360637")
test_data = bytes.fromhex("4A58070317665611360637011000190109093715") # 示例07H数据
print(f"测试07H数据: {test_data.hex().upper()}")
result = self.process_07h(test_data, mock_sock)
print(f"测试结果: {'成功' if result else '失败'}")
# 测试用示例
if __name__ == "__main__":
# 示例报文
test_data = bytes.fromhex("4A 58 07 03 17 67 63 11 36 06 57 01 08 00 19 01 09 09 37 1F 01 00 59")
parser = Command07()
parser.process_07h(test_data)
cmd = Command07()
cmd.test_command()

View File

@ -1,154 +1,49 @@
# command_08.py
import struct
import logging
class Command08:
def __init__(self):
self.command = 0x08 # 08H命令码
def build_08h_request(self, pile_id):
frame = bytearray([0x4A, 0x58, 0x08]) # 帧头 + 命令
frame.extend(pile_id) # 桩号
frame.append(0x01) # 数据加密方式(不加密)
def parse_08h(self, data):
"""
解析08H故障命令
# 数据域:时间标识 + 查询类型
data = bytearray()
current_time = bytearray([0x19, 0x03, 0x0A, 0x13, 0x14, 0x00]) # 示例2025-03-10 13:14:00
data.extend(current_time)
data.append(0x01) # 查询类型0x01: 实时状态)
frame.extend(struct.pack('<H', len(data))) # 数据域长度
frame.extend(data)
:param data: 完整的08H命令报文
:return: 解析后的字典或None
"""
try:
# 验证基本帧格式
if len(data) < 14 or data[0:2] != b'JX' or data[2] != 0x08:
logging.warning("08H命令帧格式不正确")
return None
# 计算校验码
checksum = 0
for b in frame[2:-1]:
checksum ^= b
frame.append(checksum)
return frame
# 提取桩号
pile_id_bytes = data[3:11]
def process_08h_response(self, data):
logging.debug(f"处理08H响应: {data.hex().upper()}")
if len(data) < 14:
logging.warning("08H数据长度不足")
return
# 提取时间标识
time_bytes = data[14:20]
year = time_bytes[0] + 2000
month, day, hour, minute, second = time_bytes[1:6]
timestamp = f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
# 解析数据域
data_start = 14
data_len = struct.unpack('<H', data[12:14])[0]
data_end = data_start + data_len
if len(data) < data_end:
logging.warning("08H数据域长度不匹配")
return
# 提取故障状态字节
current_index = 20
fault_states = {
# 从文档3.3.1节提取的故障状态映射
"汇流接触器": (data[current_index] & 0x01) != 0,
"输入接触器": (data[current_index] & 0x02) != 0,
"电表通讯": (data[current_index] & 0x04) != 0,
"读卡器通讯": (data[current_index] & 0x08) != 0,
"HMI通讯": (data[current_index] & 0x10) != 0,
"绝缘检测模块": (data[current_index] & 0x20) != 0,
"急停": (data[current_index] & 0x40) != 0,
"柜门打开": (data[current_index] & 0x80) != 0,
# 提取字段
timestamp = data[data_start:data_start + 6]
query_type = data[data_start + 6]
"温湿度传感器": (data[current_index + 1] & 0x01) != 0,
"风机": (data[current_index + 1] & 0x02) != 0,
"加热器": (data[current_index + 1] & 0x04) != 0,
"防雷器": (data[current_index + 1] & 0x08) != 0,
"控制板硬件": (data[current_index + 1] & 0x10) != 0,
"机柜过温": (data[current_index + 1] & 0x20) != 0,
"湿度过高": (data[current_index + 1] & 0x40) != 0,
"烟感报警": (data[current_index + 1] & 0x80) != 0
# 转换为键值对
parsed_data = {
"timestamp": f"20{timestamp[0]:02X}-{timestamp[1]:02X}-{timestamp[2]:02X} {timestamp[3]:02X}:{timestamp[4]:02X}:{timestamp[5]:02X}",
"query_type": f"0x{query_type:02X}"
}
# 提取充电枪数量
current_index += 2
gun_count = data[current_index]
# 存储每个枪的故障状态
gun_faults = []
current_index += 1
for i in range(gun_count):
# 每个枪有多个故障位
gun_fault_bytes = data[current_index:current_index + 3]
gun_faults.append({
"gun_index": i + 1,
"output_short_circuit": (gun_fault_bytes[0] & 0x01) != 0,
"output_contactor": (gun_fault_bytes[0] & 0x02) != 0,
"electronic_lock": (gun_fault_bytes[0] & 0x04) != 0,
"meter_communication": (gun_fault_bytes[0] & 0x08) != 0,
"charging_module_communication": (gun_fault_bytes[0] & 0x10) != 0,
"slave_control_communication": (gun_fault_bytes[0] & 0x20) != 0,
"insulation_module_communication": (gun_fault_bytes[0] & 0x40) != 0,
"insulation_fault": (gun_fault_bytes[0] & 0x80) != 0,
"module_overtemperature": (gun_fault_bytes[1] & 0x01) != 0,
"module_pfc": (gun_fault_bytes[1] & 0x02) != 0,
"module_fan": (gun_fault_bytes[1] & 0x04) != 0,
"module_address_conflict": (gun_fault_bytes[1] & 0x08) != 0,
"module_input_overvoltage": (gun_fault_bytes[1] & 0x10) != 0,
"module_input_undervoltage": (gun_fault_bytes[1] & 0x20) != 0,
"module_input_phase_loss": (gun_fault_bytes[1] & 0x40) != 0,
"module_other_fault": (gun_fault_bytes[1] & 0x80) != 0
})
current_index += 3
# 打印解析结果
print("\n08H命令解析结果:")
print(f"桩号: {pile_id_bytes.hex()}")
print(f"时间标识: {timestamp}")
print("系统故障状态:")
for fault, state in fault_states.items():
if state:
print(f" {fault}: 故障")
print(f"充电枪数量: {gun_count}")
for gun_fault in gun_faults:
print(f"{gun_fault['gun_index']} 故障状态:")
for fault, state in gun_fault.items():
if fault != "gun_index" and state:
print(f" {fault}: 故障")
return {
"pile_id": pile_id_bytes.hex(),
"timestamp": timestamp,
"system_faults": {k: v for k, v in fault_states.items() if v},
"gun_count": gun_count,
"gun_faults": gun_faults
}
except Exception as e:
logging.error(f"解析08H命令失败: {str(e)}")
return None
def process_08h(self, data):
"""
处理08H故障命令
:param data: 完整的08H命令报文
:return: 是否成功处理
"""
try:
parsed_data = self.parse_08h(data)
if parsed_data is None:
logging.warning("08H命令解析失败")
return False
# 记录故障信息日志
fault_summary = f"桩号 {parsed_data['pile_id']} 报告故障: "
system_faults = list(parsed_data['system_faults'].keys())
gun_faults_count = len(parsed_data['gun_faults'])
if system_faults:
fault_summary += f"系统故障 {system_faults}, "
fault_summary += f"充电枪数量 {gun_faults_count}"
logging.warning(fault_summary)
return True
except Exception as e:
logging.error(f"处理08H命令出错: {str(e)}")
return False
# 测试用示例
if __name__ == "__main__":
# 示例报文
test_data = bytes.fromhex(
"4A 58 08 03 17 67 63 11 36 06 57 01 11 00 19 01 09 0A 05 04 00 00 00 00 02 00 00 00 00 00 00 66")
parser = Command08()
parser.process_08h(test_data)
logging.info(f"08H解析结果: {parsed_data}")

View File

@ -1,152 +1,59 @@
# command_09.py
import struct
import logging
class Command09:
def __init__(self):
self.command = 0x09 # 09H命令码
def process_09h(self, data):
if len(data) < 14:
logging.warning("09H数据长度不足")
return
def parse_09h(self, data):
"""
解析09H遥信命令
pile_id = data[3:11]
data_start = 14
data_len = struct.unpack('<H', data[12:14])[0]
data_end = data_start + data_len
if len(data) < data_end + 1:
logging.warning("09H数据域长度不匹配")
return
:param data: 完整的09H命令报文
:return: 解析后的字典或None
"""
try:
# 验证基本帧格式
if len(data) < 14 or data[0:2] != b'JX' or data[2] != 0x09:
logging.warning("09H命令帧格式不正确")
return None
# 提取字段
timestamp = data[data_start:data_start + 6]
pile_status = data[data_start + 6]
num_guns = data[data_start + 7]
guns_status = []
offset = data_start + 8
# 提取桩号
pile_id_bytes = data[3:11]
# 解析每枪状态
for i in range(num_guns):
if offset >= data_end:
break
gun_status = data[offset]
guns_status.append({"gun_id": i + 1, "status": f"0x{gun_status:02X}"})
offset += 1
# 提取时间标识
time_bytes = data[14:20]
year = time_bytes[0] + 2000
month, day, hour, minute, second = time_bytes[1:6]
timestamp = f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
# 提取充电枪数量
gun_count = data[20]
# 存储充电枪状态的列表
gun_states = []
# 从第21字节开始解析充电枪状态
current_index = 21
for i in range(gun_count):
# 提取充电枪状态
gun_state = data[current_index]
gun_state_text = self.get_gun_state_text(gun_state)
# 工作模式
current_index += 1
work_mode = data[current_index]
work_mode_text = self.get_work_mode_text(work_mode)
gun_states.append({
"gun_index": i + 1,
"state": gun_state,
"state_text": gun_state_text,
"work_mode": work_mode,
"work_mode_text": work_mode_text
# 解析详细数据示例假设每枪有电压、电流、SOC 等)
detailed_data = []
for i in range(num_guns):
if offset + 12 > data_end: # 示例假设每枪12字节电压4字节电流4字节SOC 1字节预留3字节
break
voltage = struct.unpack('<f', data[offset:offset + 4])[0] # 电压(浮点数)
current = struct.unpack('<f', data[offset + 4:offset + 8])[0] # 电流(浮点数)
soc = data[offset + 8] # SOC百分比
offset += 12
detailed_data.append({
"gun_id": i + 1,
"voltage": voltage,
"current": current,
"soc": soc
})
current_index += 1
# 打印解析结果
print("\n09H命令解析结果:")
print(f"桩号: {pile_id_bytes.hex()}")
print(f"时间标识: {timestamp}")
print(f"充电枪数量: {gun_count}")
print("充电枪状态:")
for gun in gun_states:
print(f"{gun['gun_index']}:")
print(f" 状态: {gun['state_text']} (0x{gun['state']:02X})")
print(f" 工作模式: {gun['work_mode_text']} (0x{gun['work_mode']:02X})")
return {
"pile_id": pile_id_bytes.hex(),
"timestamp": timestamp,
"gun_count": gun_count,
"gun_states": gun_states
# 转换为键值对
parsed_data = {
"pile_id": pile_id.hex().upper(),
"timestamp": f"20{timestamp[0]:02X}-{timestamp[1]:02X}-{timestamp[2]:02X} {timestamp[3]:02X}:{timestamp[4]:02X}:{timestamp[5]:02X}",
"pile_status": f"0x{pile_status:02X}",
"num_guns": num_guns,
"guns_status": guns_status,
"detailed_data": detailed_data
}
except Exception as e:
logging.error(f"解析09H命令失败: {str(e)}")
return None
def get_gun_state_text(self, state):
"""
解析充电枪状态
:param state: 充电枪状态字节
:return: 状态文本描述
"""
state_map = {
0x01: "待机",
0x02: "等待连接",
0x03: "启动中",
0x04: "充电中",
0x05: "停止中",
0x06: "预约中",
0x07: "占用中",
0x08: "测试中",
0x09: "故障中",
0x0A: "定时充电",
0x0B: "充电完成",
0x0C: "升级中"
}
return state_map.get(state, f"未知状态 (0x{state:02X})")
def get_work_mode_text(self, mode):
"""
解析工作模式
:param mode: 工作模式字节
:return: 工作模式文本描述
"""
mode_map = {
0x01: "普通充电",
0x02: "轮充",
0x03: "大功率",
0x04: "超级充",
0x05: "电池维护",
0x06: "柔性充"
}
return mode_map.get(mode, f"未知模式 (0x{mode:02X})")
def process_09h(self, data):
"""
处理09H遥信命令
:param data: 完整的09H命令报文
:return: 是否成功处理
"""
try:
parsed_data = self.parse_09h(data)
if parsed_data is None:
logging.warning("09H命令解析失败")
return False
# 记录遥信信息日志
logging.info(f"成功处理09H遥信命令: 桩号 {parsed_data['pile_id']}, 充电枪数量 {parsed_data['gun_count']}")
return True
except Exception as e:
logging.error(f"处理09H命令出错: {str(e)}")
return False
# 测试用示例
if __name__ == "__main__":
# 示例报文
test_data = bytes.fromhex(
"4A 58 09 03 17 67 63 11 36 06 57 01 13 00 19 01 09 09 37 1F 00 00 02 01 01 0A 00 00 01 01 0A 00 00 4F")
parser = Command09()
parser.process_09h(test_data)
logging.info(f"09H解析结果: {parsed_data}")

View File

@ -1,148 +1,67 @@
# command_0A.py
import struct
import logging
class Command0A:
def __init__(self):
self.command = 0x0A # 0AH命令码
def build_0a_request(self, pile_id):
frame = bytearray([0x4A, 0x58, 0x0A]) # 帧头 + 命令
frame.extend(pile_id) # 桩号
frame.append(0x01) # 数据加密方式(不加密)
def parse_0ah(self, data):
"""
解析0AH遥测命令
# 数据域:时间标识 + 记录类型
data = bytearray()
current_time = bytearray([0x19, 0x03, 0x0A, 0x13, 0x14, 0x00]) # 示例2025-03-10 13:14:00
data.extend(current_time)
data.append(0x01) # 记录类型0x01: 最近记录)
frame.extend(struct.pack('<H', len(data))) # 数据域长度
frame.extend(data)
:param data: 完整的0AH命令报文
:return: 解析后的字典或None
"""
try:
# 验证基本帧格式
if len(data) < 14 or data[0:2] != b'JX' or data[2] != 0x0A:
logging.warning("0AH命令帧格式不正确")
return None
# 计算校验码
checksum = 0
for b in frame[2:-1]:
checksum ^= b
frame.append(checksum)
return frame
# 提取桩号
pile_id_bytes = data[3:11]
def process_0a_response(self, data):
logging.debug(f"处理0A响应: {data.hex().upper()}")
if len(data) < 14:
logging.warning("0A数据长度不足")
return
# 提取时间标识
time_bytes = data[14:20]
year = time_bytes[0] + 2000
month, day, hour, minute, second = time_bytes[1:6]
timestamp = f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
pile_id = data[3:11]
data_start = 14
data_len = struct.unpack('<H', data[12:14])[0]
data_end = data_start + data_len
if len(data) < data_end + 1:
logging.warning("0A数据域长度不匹配")
return
# 解析总体电气参数
current_index = 20
power_params = {
"A相电压": struct.unpack("<H", data[current_index:current_index + 2])[0] / 10, # 0.1V
"B相电压": struct.unpack("<H", data[current_index + 2:current_index + 4])[0] / 10,
"C相电压": struct.unpack("<H", data[current_index + 4:current_index + 6])[0] / 10,
"A相电流": struct.unpack("<H", data[current_index + 6:current_index + 8])[0] / 100, # 0.01A
"B相电流": struct.unpack("<H", data[current_index + 8:current_index + 10])[0] / 100,
"C相电流": struct.unpack("<H", data[current_index + 10:current_index + 12])[0] / 100,
"总电表电量": struct.unpack("<I", data[current_index + 12:current_index + 16])[0] / 100 # 0.01kWh
# 提取字段
timestamp = data[data_start:data_start + 6]
record_type = data[data_start + 6]
offset = data_start + 7
# 假设数据域包含一条充电记录(开始时间、结束时间、电量等)
if offset + 19 > data_end: # 示例6字节开始时间 + 6字节结束时间 + 4字节电量 + 1字节状态
logging.warning("0A记录数据不足")
return
start_time = data[offset:offset + 6]
end_time = data[offset + 6:offset + 12]
energy = struct.unpack('<f', data[offset + 12:offset + 16])[0] # 电量kWh浮点数
status = data[offset + 16]
# 转换为键值对
parsed_data = {
"pile_id": pile_id.hex().upper(),
"timestamp": f"20{timestamp[0]:02X}-{timestamp[1]:02X}-{timestamp[2]:02X} {timestamp[3]:02X}:{timestamp[4]:02X}:{timestamp[5]:02X}",
"record_type": f"0x{record_type:02X}",
"record": {
"start_time": f"20{start_time[0]:02X}-{start_time[1]:02X}-{start_time[2]:02X} {start_time[3]:02X}:{start_time[4]:02X}:{start_time[5]:02X}",
"end_time": f"20{end_time[0]:02X}-{end_time[1]:02X}-{end_time[2]:02X} {end_time[3]:02X}:{end_time[4]:02X}:{end_time[5]:02X}",
"energy_kwh": energy,
"status": f"0x{status:02X}"
}
current_index += 16
# 解析温度相关参数
temp_params = {
"桩内温度": data[current_index] - 50, # 偏移量-50℃
"进风口温度": data[current_index + 1] - 50,
"出风口温度": data[current_index + 2] - 50,
"控制板温度": data[current_index + 3] - 50,
"桩内湿度": data[current_index + 4] # 0-100%RH
}
current_index += 5
# 跳过预留字节
current_index += 8
# 解析充电枪数量
gun_count = data[current_index]
current_index += 1
# 存储每个充电枪的遥测数据
gun_params = []
for i in range(gun_count):
gun_data = {
"gun_index": i + 1,
"电表电压": struct.unpack("<H", data[current_index:current_index + 2])[0] / 10, # 0.1V
"电表电流": struct.unpack("<H", data[current_index + 2:current_index + 4])[0] / 100, # 0.01A
"电表电量": struct.unpack("<I", data[current_index + 4:current_index + 8])[0] / 100, # 0.01kWh
"充电模块电压": struct.unpack("<H", data[current_index + 8:current_index + 10])[0] / 10, # 0.1V
"充电模块电流": struct.unpack("<H", data[current_index + 10:current_index + 12])[0] / 10, # 0.1A
"充电模块温度": data[current_index + 12] - 50, # 偏移量-50℃
"充电枪温度": data[current_index + 13] - 50
}
gun_params.append(gun_data)
current_index += 14
# 打印解析结果
print("\n0AH命令解析结果:")
print(f"桩号: {pile_id_bytes.hex()}")
print(f"时间标识: {timestamp}")
print("总体电气参数:")
for param, value in power_params.items():
print(f" {param}: {value}")
print("温度和湿度参数:")
for param, value in temp_params.items():
print(f" {param}: {value}")
print(f"充电枪数量: {gun_count}")
for gun in gun_params:
print(f"{gun['gun_index']} 遥测数据:")
for param, value in gun.items():
if param != "gun_index":
print(f" {param}: {value}")
return {
"pile_id": pile_id_bytes.hex(),
"timestamp": timestamp,
"power_params": power_params,
"temp_params": temp_params,
"gun_count": gun_count,
"gun_params": gun_params
}
except Exception as e:
logging.error(f"解析0AH命令失败: {str(e)}")
return None
def process_0ah(self, data):
"""
处理0AH遥测命令
:param data: 完整的0AH命令报文
:return: 是否成功处理
"""
try:
parsed_data = self.parse_0ah(data)
if parsed_data is None:
logging.warning("0AH命令解析失败")
return False
# 记录遥测信息日志
log_message = (
f"桩号 {parsed_data['pile_id']} 遥测数据: "
f"A相电压 {parsed_data['power_params']['A相电压']}V, "
f"总电量 {parsed_data['power_params']['总电表电量']}kWh, "
f"桩内温度 {parsed_data['temp_params']['桩内温度']}"
)
logging.info(log_message)
return True
except Exception as e:
logging.error(f"处理0AH命令出错: {str(e)}")
return False
# 测试用示例
if __name__ == "__main__":
# 示例报文
test_data = bytes.fromhex(
"4A 58 0A 03 17 67 63 11 36 06 57 01 48 00 19 01 09 09 37 39 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 02 00 00 00 00 74 29 AD 00 00 00 00 00 00 67 00 00 00 00 00 00 00 00 27 63 F1 00 00 00 00 00 00 E1 00 00 00 00 F2")
parser = Command0A()
parser.process_0ah(test_data)
logging.info(f"0A解析结果: {parsed_data}")

View File

@ -1,181 +1,67 @@
# command_19_1A.py
import struct
import logging
from datetime import datetime
import time
class Command191A:
def __init__(self):
self.command_19 = 0x19 # 卡鉴权命令
self.command_1a = 0x1A # 卡鉴权响应
def process_and_respond(self, data, conn):
if len(data) < 14:
logging.warning("19H数据长度不足")
return
def parse_19h(self, data):
"""解析19H卡鉴权命令"""
try:
print("\n开始解析19H卡鉴权命令...")
print(f"接收数据: {data.hex().upper()}")
pile_id = data[3:11]
data_start = 14
data_len = struct.unpack('<H', data[12:14])[0]
data_end = data_start + data_len
if len(data) < data_end + 1:
logging.warning("19H数据域长度不匹配")
return
# 基础验证
if len(data) < 14 or data[0:2] != b'JX' or data[2] != self.command_19:
logging.warning("19H命令帧格式不正确")
return None
timestamp = data[data_start:data_start + 6]
gun_id = data[data_start + 6]
card_number = data[data_start + 7:data_start + 23].decode('ascii', errors='ignore').rstrip('\x00')
balance = struct.unpack('<I', data[data_start + 23:data_start + 27])[0]
# 解析数据
pile_id = data[3:11] # 桩号
encrypt_mode = data[11] # 加密方式
data_len = struct.unpack("<H", data[12:14])[0] # 数据长度
# 解析数据域
data_field = data[14:14 + data_len]
# 解析时间标识
time_bytes = data_field[0:6]
year = time_bytes[0] + 2000
month, day, hour, minute, second = time_bytes[1:6]
timestamp = f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
# 解析卡号 (16字节ASCII)
card_no = data_field[6:22].decode('ascii').rstrip('\x00')
result = {
"pile_id": pile_id,
"timestamp": timestamp,
"card_no": card_no
parsed_data = {
"pile_id": pile_id.hex().upper(),
"timestamp": f"20{timestamp[0]:02X}-{timestamp[1]:02X}-{timestamp[2]:02X} {timestamp[3]:02X}:{timestamp[4]:02X}:{timestamp[5]:02X}",
"gun_id": gun_id,
"card_number": card_number,
"balance_cents": balance
}
logging.info(f"19H卡鉴权请求: {parsed_data}")
print("\n解析结果:")
print(f"桩号: {pile_id.hex().upper()}")
print(f"时间标识: {timestamp}")
print(f"卡号: {card_no}")
return result
except Exception as e:
logging.error(f"解析19H卡鉴权命令失败: {str(e)}")
print(f"解析失败: {str(e)}")
return None
def build_1a_response(self, pile_id, card_no, allow=True, balance=3493, reject_reason=0):
"""构建1AH卡鉴权响应"""
try:
print("\n构建1AH卡鉴权响应...")
# 构建预期的响应4A 58 1A 03 17 66 56 11 36 06 37 01 1D 00 19 01 09 0C 15 2E 65 36 39 61 32 31 30 33 00 00 00 00 00 00 00 00 A5 0E 0D 00 01 00 01 BF
frame = bytearray()
frame.extend(b'JX') # 帧起始标志 "JX"
frame.append(self.command_1a) # 命令码1AH
frame.extend(pile_id) # 桩号(保持原样)
frame.append(0x01) # 数据加密方式(不加密)
frame.append(0x1D) # 数据长度(29字节)
frame.append(0x00) # 数据长度高字节
# 构建数据域
data = bytearray()
# 时间标识(保持接近原样,仅秒数+3)
now = datetime.now()
data.extend(struct.pack("<BBBBBB",
now.year - 2000, now.month, now.day,
now.hour, now.minute, now.second))
# 卡号(16字节维持原样)
data.extend(card_no.encode().ljust(16, b'\x00'))
# 卡余额(0xA5 0E 0D 00 = 3493)
data.extend(struct.pack("<I", balance))
# 允许充电标志(0x01)
data.append(0x01 if allow else 0x02)
# 不可充电原因(0x00)
data.append(reject_reason)
# 计费模型选择(0x01)
data.append(0x01)
# 添加数据域
frame.extend(data)
# 计算校验码
check = 0
for b in frame[2:]:
check ^= b
frame.append(check)
response = bytes(frame)
print(f"响应数据: {response.hex().upper()}")
return response
except Exception as e:
logging.error(f"构建1AH卡鉴权响应失败: {str(e)}")
print(f"构建响应失败: {str(e)}")
return None
def process_and_respond(self, received_data, sock):
"""处理收到的19H命令并回复1AH"""
try:
print("\n处理卡鉴权命令...")
# 解析接收到的19H命令
parsed = self.parse_19h(received_data)
if not parsed:
return False
# 这里可以添加实际的卡鉴权逻辑
# 例如检查卡号是否有效、查询余额等
allow = True # 允许充电
balance = 3493 # 余额34.93元
reject_reason = 0 # 无拒绝原因
# 模拟鉴权逻辑(实际需查询数据库)
auth_result = 0x01 # 假设通过
new_balance = balance - 100 # 扣除费用
# 构建1AH响应
response = self.build_1a_response(
parsed["pile_id"],
parsed["card_no"],
allow,
balance,
reject_reason
)
frame = bytearray([0x4A, 0x58, 0x1A])
frame.extend(pile_id)
frame.append(0x01) # 不加密
if not response:
return False
data = bytearray()
current_time = time.localtime()
time_bytes = bytearray([
current_time.tm_year - 2000,
current_time.tm_mon,
current_time.tm_mday,
current_time.tm_hour,
current_time.tm_min,
current_time.tm_sec
])
data.extend(time_bytes)
data.append(gun_id)
data.append(auth_result)
data.extend(struct.pack('<I', new_balance))
# 发送响应
if sock and hasattr(sock, 'send'):
sock.send(response)
print("卡鉴权响应发送成功")
frame.extend(struct.pack('<H', len(data)))
frame.extend(data)
return True
checksum = 0
for b in frame[2:-1]:
checksum ^= b
frame.append(checksum)
except Exception as e:
logging.error(f"处理卡鉴权命令失败: {str(e)}")
print(f"处理失败: {str(e)}")
return False
def test_auth():
"""测试卡鉴权命令处理"""
print("开始测试卡鉴权命令处理...")
# 创建处理器
handler = Command191A()
# 测试数据 - 使用实际收到的19H数据
test_data = bytes.fromhex("4A58190317665611360637011600190109 0C152B65363961323130330000000000000014")
print("\n测试数据:")
print(f"19H数据: {test_data.hex().upper()}")
# 创建模拟socket
class MockSocket:
def send(self, data):
print(f"\n模拟发送响应数据:")
print(f"1AH数据: {data.hex().upper()}")
mock_sock = MockSocket()
# 测试完整处理流程
result = handler.process_and_respond(test_data, mock_sock)
print(f"\n最终处理结果: {'成功' if result else '失败'}")
if __name__ == "__main__":
test_auth()
conn.send(frame)
logging.info(f"发送1AH鉴权响应: {frame.hex().upper()}")

View File

@ -1,201 +1,60 @@
# command_1F_20.py
import struct
import logging
from datetime import datetime
import time
class Command1F20:
def __init__(self):
self.command_1f = 0x1F # 启动充电命令
self.command_20 = 0x20 # 启动充电回复
def process_1f(self, pile_id, conn):
frame = bytearray([0x4A, 0x58, 0x1F])
frame.extend(pile_id)
frame.append(0x01)
def build_1f_command(self, pile_id, card_no="18771978016"):
"""构建1FH启动充电命令"""
try:
print("\n构建1FH启动充电命令...")
frame = bytearray()
frame.extend(b'JX') # 帧起始标志
frame.append(self.command_1f) # 命令码1FH
frame.extend(pile_id) # 桩号
frame.append(0x01) # 数据加密方式(不加密)
# 构建数据域
data = bytearray()
current_time = time.localtime()
time_bytes = bytearray([
current_time.tm_year - 2000,
current_time.tm_mon,
current_time.tm_mday,
current_time.tm_hour,
current_time.tm_min,
current_time.tm_sec
])
data.extend(time_bytes)
gun_id = 0x01
data.append(gun_id)
order_number = "ORDER1234567890".encode('ascii') + b"\x00" * 2
data.extend(order_number)
balance = 1000
data.extend(struct.pack('<I', balance))
data.append(0x01) # 充电模式:自动充满
# 添加时间标识
now = datetime.now()
data.extend(bytes([
now.year - 2000,
now.month,
now.day,
now.hour,
now.minute,
now.second
]))
# 添加枪号
data.append(0x01)
# 添加卡号(32字节)
data.extend(card_no.encode().ljust(32, b'\x00'))
# 添加用户ID(32字节)
user_id = "84043"
data.extend(user_id.encode().ljust(32, b'\x00'))
# 添加组织机构代码(9字节)
data.extend(b'\x16'.ljust(9, b'\x00'))
# 添加控制方式(1字节) - 定金额充
data.append(0x03)
# 添加控制参数(4字节) - 1000元
data.extend(struct.pack("<I", 1000))
# 添加充电模式(1字节) - 正常充电
data.append(0x01)
# 添加启动方式(1字节) - 立即启动
data.append(0x01)
# 添加定时启动时间(6字节)
data.extend(bytes([0x19, 0x01, 0x09, 0x0B, 0x24, 0x2F]))
# 添加用户操作码(6字节)
data.extend(b'ws8quu')
# 添加计费模型选择(1字节) - 本地计费模型
data.append(0x01)
# 计算数据长度
frame.extend(struct.pack("<H", len(data)))
# 添加数据域
frame.extend(struct.pack('<H', len(data)))
frame.extend(data)
# 计算校验码
check = 0
for b in frame[2:]:
check ^= b
frame.append(check)
command = bytes(frame)
print(f"启动充电命令数据: {command.hex().upper()}")
return command
except Exception as e:
logging.error(f"构建1FH启动充电命令失败: {str(e)}")
print(f"构建命令失败: {str(e)}")
return None
def parse_20h(self, data):
"""解析20H启动充电回复"""
try:
print("\n开始解析20H启动充电回复...")
print(f"接收数据: {data.hex().upper()}")
# 基础验证
if len(data) < 14 or data[0:2] != b'JX' or data[2] != self.command_20:
logging.warning("20H命令帧格式不正确")
return None
# 解析数据
pile_id = data[3:11] # 桩号
encrypt_mode = data[11] # 加密方式
data_len = struct.unpack("<H", data[12:14])[0] # 数据长度
data_field = data[14:14 + data_len] # 数据域
def parse_time(time_bytes):
"""解析时间字节"""
try:
year = time_bytes[0] + 2000
month = time_bytes[1]
day = time_bytes[2]
hour = time_bytes[3]
minute = time_bytes[4]
second = time_bytes[5]
return f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
except Exception as e:
print(f"时间解析错误: {e}")
return "Invalid time"
# 解析数据域
parsed_data = {
"pile_id": pile_id.hex().upper(),
"timestamp": parse_time(data_field[0:6]),
"gun_no": data_field[6],
"card_no": data_field[7:39].decode('ascii').rstrip('\x00'),
"user_id": data_field[39:71].decode('ascii').rstrip('\x00'),
"execution_result": data_field[-2], # 1-成功2-失败
"fail_reason": data_field[-1] if data_field[-2] == 2 else 0
}
print("\n解析结果:")
print(f"桩号: {parsed_data['pile_id']}")
print(f"时间标识: {parsed_data['timestamp']}")
print(f"枪号: {parsed_data['gun_no']}")
print(f"卡号: {parsed_data['card_no']}")
print(f"用户ID: {parsed_data['user_id']}")
print(f"执行结果: {'成功' if parsed_data['execution_result'] == 1 else '失败'}")
if parsed_data['execution_result'] == 2:
print(f"失败原因代码: {parsed_data['fail_reason']}")
return parsed_data
except Exception as e:
logging.error(f"解析20H启动充电回复失败: {str(e)}")
print(f"解析失败: {str(e)}")
return None
def process_and_respond(self, pile_id, sock):
"""发送启动充电命令"""
try:
# 构建并发送1FH命令
command = self.build_1f_command(pile_id)
if not command:
return False
if sock and hasattr(sock, 'send'):
sock.send(command)
print("启动充电命令发送成功")
checksum = 0
for b in frame[2:-1]:
checksum ^= b
frame.append(checksum)
conn.send(frame)
logging.info(f"发送1FH启动充电: {frame.hex().upper()}")
return True
except Exception as e:
logging.error(f"处理启动充电命令失败: {str(e)}")
print(f"处理失败: {str(e)}")
return False
def parse_20h(self, data):
if len(data) < 14:
logging.warning("20H数据长度不足")
return
data_start = 14
timestamp = data[data_start:data_start + 6]
gun_id = data[data_start + 6]
result = data[data_start + 7]
reason = data[data_start + 8]
def test_start_charge():
"""测试启动充电命令"""
print("开始测试启动充电命令处理...")
# 创建处理器
handler = Command1F20()
# 测试桩号
pile_id = bytes.fromhex("0317665611360637")
# 创建模拟socket
class MockSocket:
def send(self, data):
print(f"\n模拟发送数据:")
print(f"1FH数据: {data.hex().upper()}")
mock_sock = MockSocket()
# 测试1FH命令发送
print("\n测试发送启动充电命令:")
result = handler.process_and_respond(pile_id, mock_sock)
print(f"命令发送结果: {'成功' if result else '失败'}")
# 测试20H回复解析
print("\n测试解析启动充电回复:")
test_reply = bytes.fromhex(
"4A582003176656113606370168001901090B242D013138373731393738303136313535353636313000000000000000000000000000003834303433000000000000000000000000000000000000000000000000000000160000000000000000000003F401000004011901090B242F777338717575010100D7")
handler.parse_20h(test_reply)
if __name__ == "__main__":
test_start_charge()
parsed_data = {
"timestamp": f"20{timestamp[0]:02X}-{timestamp[1]:02X}-{timestamp[2]:02X} {timestamp[3]:02X}:{timestamp[4]:02X}:{timestamp[5]:02X}",
"gun_id": gun_id,
"result": "Success" if result == 0x01 else "Failed",
"reason": reason if result == 0x02 else None
}
logging.info(f"20H启动充电结果: {parsed_data}")

View File

@ -0,0 +1,73 @@
# command_37_38.py
import struct
import logging
import time
class Command3738:
def build_37h_request(self, pile_id, billing_template):
"""
下发计费模版请求
:param pile_id: 8字节桩号
:param billing_template: 计费模版列表格式 [(time_minutes, price_per_minute, total_seconds, total_cost), ...]
"""
frame = bytearray([0x4A, 0x58, 0x37]) # 帧头 + 命令
frame.extend(pile_id) # 桩号
frame.append(0x01) # 数据加密方式(不加密)
# 数据域
data = bytearray()
current_time = time.localtime()
time_bytes = bytearray([
current_time.tm_year - 2000,
current_time.tm_mon,
current_time.tm_mday,
current_time.tm_hour,
current_time.tm_min,
current_time.tm_sec
])
data.extend(time_bytes)
data.extend(b'\x1D\x00\x00\x00\x00\x00') # 未知字段,参考日志
# 计费模版数据
for entry in billing_template:
time_minutes, price_per_minute, total_seconds, total_cost = entry
data.extend(struct.pack('<H', time_minutes)) # 时间段(分钟)
data.extend(struct.pack('<H', price_per_minute)) # 单价(分/分钟)
data.extend(struct.pack('<I', total_seconds)) # 累计时间(秒)
data.extend(struct.pack('<I', total_cost)) # 累计金额(分)
frame.extend(struct.pack('<H', len(data))) # 数据域长度
frame.extend(data)
# 计算校验码
checksum = 0
for b in frame[2:-1]:
checksum ^= b
frame.append(checksum)
return frame
def parse_38h_response(self, data):
if len(data) < 14:
logging.warning("38H数据长度不足")
return
pile_id = data[3:11]
data_start = 14
data_len = struct.unpack('<H', data[12:14])[0]
data_end = data_start + data_len
if len(data) < data_end + 1:
logging.warning("38H数据域长度不匹配")
return
timestamp = data[data_start:data_start + 6]
unknown_field = data[data_start + 6:data_start + 8]
result = struct.unpack('<H', data[data_start + 8:data_start + 10])[0]
parsed_data = {
"pile_id": pile_id.hex().upper(),
"timestamp": f"20{timestamp[0]:02X}-{timestamp[1]:02X}-{timestamp[2]:02X} {timestamp[3]:02X}:{timestamp[4]:02X}:{timestamp[5]:02X}",
"unknown_field": unknown_field.hex().upper(),
"result": "Success" if result == 0x0100 else "Failed",
"result_code": f"0x{result:04X}"
}
logging.info(f"38H计费模版下发结果: {parsed_data}")

View File

@ -1,166 +1,62 @@
# command_heartbeat.py
import struct
import logging
from datetime import datetime
import time
class CommandHeartbeat:
def __init__(self):
self.command_0c = 0x0C # 桩心跳命令
self.command_0b = 0x0B # 平台心跳命令
def parse_0c_heartbeat(self, data):
"""解析0CH心跳命令"""
try:
print("\n开始解析0CH心跳命令...")
print(f"接收数据: {data.hex().upper()}")
# 基础验证
if len(data) < 14 or data[0:2] != b'JX' or data[2] != self.command_0c:
logging.warning("0CH命令帧格式不正确")
return None
# 解析数据
pile_id = data[3:11] # 桩号
encrypt_mode = data[11] # 加密方式
data_len = struct.unpack("<H", data[12:14])[0] # 数据长度
# 解析数据域
data_field = data[14:14 + data_len]
# 解析时间标识
time_bytes = data_field[0:6]
year = time_bytes[0] + 2000
month, day, hour, minute, second = time_bytes[1:6]
timestamp = f"{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
# 解析心跳数据
platform_timeout_count = data_field[6] # 平台心跳超时次数
gun_count = data_field[7] # 充电枪数量
gun_status = data_field[8] # 充电枪状态
work_mode = data_field[9] # 工作模式
result = {
"pile_id": pile_id,
"timestamp": timestamp,
"platform_timeout_count": platform_timeout_count,
"gun_count": gun_count,
"gun_status": gun_status,
"work_mode": work_mode
}
print("\n解析结果:")
print(f"桩号: {pile_id.hex().upper()}")
print(f"时间标识: {timestamp}")
print(f"平台心跳超时次数: {platform_timeout_count}")
print(f"充电枪数量: {gun_count}")
print(f"充电枪状态: {gun_status:02X}")
print(f"工作模式: {work_mode:02X}")
return result
except Exception as e:
logging.error(f"解析0CH心跳命令失败: {str(e)}")
print(f"解析失败: {str(e)}")
return None
def build_0b_heartbeat(self, pile_id):
"""构建0BH心跳响应"""
try:
print("\n构建0BH心跳响应...")
frame = bytearray()
frame.extend(b'JX') # 帧起始标志
frame.append(self.command_0b) # 命令码0BH
frame = bytearray([0x4A, 0x58, 0x0B]) # 帧头 + 命令
frame.extend(pile_id) # 桩号
frame.append(0x01) # 数据加密方式(不加密)
frame.append(0x01) # 数据加密方式(不加密)
# 构建数据域
# 数据域:时间标识
data = bytearray()
# 添加时间标识
now = datetime.now()
data.extend(struct.pack("<BBBBBB",
now.year - 2000, now.month, now.day,
now.hour, now.minute, now.second))
# 添加桩心跳超时次数(例如设为0)
data.append(0x00)
# 计算数据长度
data_len = len(data)
frame.extend(struct.pack("<H", data_len)) # 数据长度
# 添加数据域
current_time = time.localtime()
time_bytes = bytearray([
current_time.tm_year - 2000,
current_time.tm_mon,
current_time.tm_mday,
current_time.tm_hour,
current_time.tm_min,
current_time.tm_sec
])
data.extend(time_bytes)
frame.extend(struct.pack('<H', len(data))) # 数据域长度
frame.extend(data)
# 计算校验码
check = 0
for b in frame[2:]:
check ^= b
frame.append(check)
checksum = 0
for b in frame[2:-1]:
checksum ^= b
frame.append(checksum)
return frame
response = bytes(frame)
print(f"响应数据: {response.hex().upper()}")
return response
def build_0c_heartbeat(self, pile_id):
frame = bytearray([0x4A, 0x58, 0x0C]) # 帧头 + 命令
frame.extend(pile_id) # 桩号
frame.append(0x01) # 数据加密方式(不加密)
except Exception as e:
logging.error(f"构建0BH心跳响应失败: {str(e)}")
print(f"构建响应失败: {str(e)}")
return None
# 数据域:时间标识
data = bytearray()
current_time = time.localtime()
time_bytes = bytearray([
current_time.tm_year - 2000,
current_time.tm_mon,
current_time.tm_mday,
current_time.tm_hour,
current_time.tm_min,
current_time.tm_sec
])
data.extend(time_bytes)
frame.extend(struct.pack('<H', len(data))) # 数据域长度
frame.extend(data)
def process_and_respond(self, received_data, sock):
"""处理收到的0CH命令并回复0BH"""
try:
print("\n处理心跳命令...")
# 计算校验码
checksum = 0
for b in frame[2:-1]:
checksum ^= b
frame.append(checksum)
return frame
# 解析接收到的0CH命令
parsed = self.parse_0c_heartbeat(received_data)
if not parsed:
return False
# 构建0BH响应
response = self.build_0b_heartbeat(parsed["pile_id"])
if not response:
return False
# 发送响应
if sock and hasattr(sock, 'send'):
sock.send(response)
print("心跳响应发送成功")
return True
except Exception as e:
logging.error(f"处理心跳命令失败: {str(e)}")
print(f"处理失败: {str(e)}")
return False
def test_heartbeat():
"""测试心跳命令处理"""
print("开始测试心跳命令处理...")
# 创建处理器
handler = CommandHeartbeat()
# 测试数据 - 使用实际收到的0CH数据
test_data = bytes.fromhex("4A580C0317665611360637010C001901090B25240102010101012B")
print("\n测试数据:")
print(f"0CH数据: {test_data.hex().upper()}")
# 创建模拟socket
class MockSocket:
def send(self, data):
print(f"\n模拟发送响应数据:")
print(f"0BH数据: {data.hex().upper()}")
mock_sock = MockSocket()
# 测试完整处理流程
result = handler.process_and_respond(test_data, mock_sock)
print(f"\n最终处理结果: {'成功' if result else '失败'}")
if __name__ == "__main__":
test_heartbeat()
def process_0c_heartbeat(self, data, conn):
logging.info(f"收到0CH心跳响应: {data.hex().upper()}")

View File

@ -0,0 +1,36 @@
import requests
import json
url = "http://localhost:8083/send_billing_template"
data = {
"pile_id": "0317665611360637",
"billing_template": [
{"time_minutes": 7, "price_per_minute": 1254, "total_seconds": 3500, "total_cost": 3500},
{"time_minutes": 7, "price_per_minute": 1016, "total_seconds": 3500, "total_cost": 3500},
{"time_minutes": 8, "price_per_minute": 736, "total_seconds": 3500, "total_cost": 3500},
{"time_minutes": 11, "price_per_minute": 1254, "total_seconds": 3500, "total_cost": 3500},
{"time_minutes": 13, "price_per_minute": 1016, "total_seconds": 3500, "total_cost": 3500},
{"time_minutes": 17, "price_per_minute": 736, "total_seconds": 3500, "total_cost": 3500},
{"time_minutes": 23, "price_per_minute": 1016, "total_seconds": 3500, "total_cost": 3500}
]
}
try:
headers = {"Content-Type": "application/json"}
response = requests.post(url, headers=headers, json=data, timeout=5)
print("状态码:", response.status_code)
print("响应内容:", response.json())
if response.status_code == 200 and response.json().get("status") == "success":
print("计费模版下发成功")
else:
print("计费模版下发失败:", response.json().get("message", "未知错误"))
except requests.exceptions.ConnectionError as e:
print(f"无法连接到服务器: {e}")
print("请检查服务器是否运行,以及端口 8082 是否可访问。")
except requests.exceptions.Timeout:
print("请求超时,请检查网络连接或服务器状态。")
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
except json.JSONDecodeError:
print("服务器响应不是有效的 JSON 格式,请检查服务器日志。")

View File

@ -0,0 +1,461 @@
import socket
import logging
import threading
import time
import struct
import json
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from .utils import ProxyUtils
from .mqtt_client import MQTTClient
from commands.command_heartbeat import CommandHeartbeat
from commands.command_01 import Command01
from commands.command_02 import Command02
from commands.command_03 import Command03
from commands.command_04 import Command04
from commands.command_05 import Command05
from commands.command_06 import Command06
from commands.command_07 import Command07
from commands.command_08 import Command08
from commands.command_09 import Command09
from commands.command_0A import Command0A
from commands.command_19_1A import Command191A
from commands.command_1F_20 import Command1F20
from commands.command_21_22 import Command2122
from commands.command_23_24 import Command2324
from commands.command_25 import Command25
from commands.command_26_27 import Command2627
from commands.command_30 import Command30
from commands.command_37_38 import Command3738
# 自定义 HTTP 服务器,支持多线程
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
pass
class ChargingPileDirectConnection:
def __init__(self, host="0.0.0.0", port=52461, api_host="0.0.0.0", api_port=8083):
logging.info(f"初始化 ChargingPileDirectConnectionhost={host}, port={port}, API host={api_host}, API port={api_port}")
self.host = host
self.port = port
self.api_host = api_host
self.api_port = api_port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.sock.bind((self.host, self.port))
logging.info(f"绑定端口 {self.port} 成功")
except Exception as e:
logging.error(f"绑定端口失败: {str(e)}")
raise
self.sock.listen(1)
self.sock.settimeout(10)
self.running = False
self.conn = None
self.addr = None
self.mqtt_client = MQTTClient()
self.utils = ProxyUtils()
self.command_handlers = {
0x01: Command01(),
0x02: Command02(),
0x03: Command03(),
0x04: Command04(),
0x05: Command05(),
0x06: Command06(),
0x07: Command07(),
0x08: Command08(),
0x09: Command09(),
0x0A: Command0A(),
0x19: Command191A(),
0x1A: Command191A(),
0x1F: Command1F20(),
0x20: Command1F20(),
0x21: Command2122(),
0x22: Command2122(),
0x23: Command2324(),
0x24: Command2324(),
0x25: Command25(),
0x26: Command2627(),
0x27: Command2627(),
0x30: Command30(),
0x0B: CommandHeartbeat(),
0x0C: CommandHeartbeat(),
0x37: Command3738(),
0x38: Command3738(),
}
self.heartbeat_info = {}
self.heartbeat_thread = threading.Thread(target=self.check_heartbeat_timeout)
self.heartbeat_thread.daemon = True
self.send_heartbeat_thread = threading.Thread(target=self.send_heartbeat_periodically)
self.send_heartbeat_thread.daemon = True
self.current_pile_id = None
self.is_logged_in = False
self.api_server = None
self.api_thread = None
# 定义 HTTP 请求处理器
class APIHandler(BaseHTTPRequestHandler):
def do_POST(self):
logging.info(f"收到 POST 请求: {self.path} from {self.client_address}")
if self.path == "/send_billing_template":
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
logging.debug(f"请求数据: {post_data.decode('utf-8')}")
data = json.loads(post_data.decode('utf-8'))
pile_id_str = data.get("pile_id")
billing_template_data = data.get("billing_template")
if not pile_id_str or not billing_template_data:
logging.warning("缺少 pile_id 或 billing_template")
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "error", "message": "Missing pile_id or billing_template"}).encode())
return
# 转换 pile_id 为字节
pile_id = bytes.fromhex(pile_id_str.replace(" ", ""))
if len(pile_id) != 8:
logging.warning(f"无效的 pile_id 长度: {pile_id_str}")
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "error", "message": "Invalid pile_id length"}).encode())
return
# 检查桩号是否匹配
if pile_id != self.server.direct_connection.current_pile_id:
logging.warning(f"桩号不匹配,请求: {pile_id_str}, 当前: {self.server.direct_connection.current_pile_id.hex().upper() if self.server.direct_connection.current_pile_id else 'None'}")
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "error", "message": f"Pile ID {pile_id_str} does not match current pile {self.server.direct_connection.current_pile_id.hex().upper() if self.server.direct_connection.current_pile_id else 'None'}"}).encode())
return
# 解析计费模版
billing_template = []
for entry in billing_template_data:
time_minutes = entry.get("time_minutes")
price_per_minute = entry.get("price_per_minute")
total_seconds = entry.get("total_seconds")
total_cost = entry.get("total_cost")
if not all(isinstance(x, int) for x in [time_minutes, price_per_minute, total_seconds, total_cost]):
logging.warning(f"无效的计费模版条目: {entry}")
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "error", "message": "Invalid billing template entry"}).encode())
return
billing_template.append((time_minutes, price_per_minute, total_seconds, total_cost))
# 调用下发方法
result = self.server.direct_connection.send_billing_template(pile_id, billing_template)
if result:
logging.info("计费模版下发成功")
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "success"}).encode())
else:
logging.error("计费模版下发失败")
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "error", "message": "Failed to send billing template"}).encode())
except json.JSONDecodeError as e:
logging.error(f"JSON 解析错误: {str(e)}")
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "error", "message": "Invalid JSON format"}).encode())
except Exception as e:
logging.error(f"处理请求时出错: {str(e)}")
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode())
else:
logging.warning(f"未知端点: {self.path}")
self.send_response(404)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "error", "message": "Endpoint not found"}).encode())
# 启动 HTTP 服务器
self.api_server = ThreadingHTTPServer((self.api_host, self.api_port), APIHandler)
self.api_server.direct_connection = self # 将实例绑定到服务器
self.api_thread = threading.Thread(target=self.api_server.serve_forever)
self.api_thread.daemon = True
self.api_thread.start()
logging.info(f"HTTP API 服务器启动,监听 {self.api_host}:{self.api_port}")
def connect(self):
try:
logging.info(f"正在监听 {self.host}:{self.port} 等待充电桩连接...")
self.conn, self.addr = self.sock.accept()
self.conn.settimeout(10)
self.running = True
msg = f"成功接受充电桩连接 {self.addr}"
logging.info(msg)
self.heartbeat_thread.start()
self.send_heartbeat_thread.start()
return True
except socket.timeout:
logging.debug("未收到连接请求,继续监听...")
return False
except Exception as e:
error_msg = f"接受连接失败: {str(e)}"
logging.error(error_msg)
return False
def check_heartbeat_timeout(self):
while self.running:
try:
current_time = time.time()
disconnected_piles = []
for pile_id, last_heartbeat in self.heartbeat_info.items():
if current_time - last_heartbeat > 60:
msg = f"充电桩 {pile_id.hex().upper()} 心跳超时 {current_time - last_heartbeat:.1f}"
logging.warning(msg)
disconnected_piles.append(pile_id)
for pile_id in disconnected_piles:
del self.heartbeat_info[pile_id]
time.sleep(5)
except Exception as e:
logging.error(f"心跳检测错误: {str(e)}")
time.sleep(5)
def update_heartbeat(self, pile_id):
self.heartbeat_info[pile_id] = time.time()
logging.debug(f"更新充电桩 {pile_id.hex().upper()} 心跳时间")
def send_heartbeat_periodically(self):
while self.running:
try:
if self.current_pile_id and self.conn:
handler = self.command_handlers[0x0B]
frame = handler.build_0b_heartbeat(self.current_pile_id)
self.send_frame(frame)
logging.debug(f"发送0BH平台心跳: {frame.hex().upper()}")
time.sleep(5)
except Exception as e:
logging.error(f"发送0BH心跳失败: {str(e)}")
time.sleep(5)
def send_frame(self, data):
try:
if self.conn:
self.conn.send(data)
logging.info(f"发送数据: {data.hex().upper()}")
return True
return False
except Exception as e:
logging.error(f"发送数据失败: {str(e)}")
return False
def calculate_checksum(self, data):
"""计算校验码,从命令字节到倒数第二字节异或"""
checksum = 0
for b in data[2:-1]:
checksum ^= b
return checksum
def receive_frame(self):
try:
header = self.conn.recv(14)
logging.debug(f"收到帧头: {header.hex().upper()}")
if len(header) < 14 or header[:2] != b'JX':
logging.warning("无效帧头")
return None
data_len = struct.unpack('<H', header[12:14])[0]
data = header + self.conn.recv(data_len + 1)
logging.debug(f"收到完整报文: {data.hex().upper()}")
calculated_checksum = self.calculate_checksum(data)
if calculated_checksum != data[-1]:
logging.warning(f"校验码不匹配,计算值: {calculated_checksum:02X}, 实际值: {data[-1]:02X}")
return None
return data
except socket.timeout:
logging.warning("接收数据超时,可能是充电桩未响应或网络问题")
return self.receive_frame()
except Exception as e:
logging.error(f"接收数据失败: {str(e)}, 报文: {data.hex() if data else 'None'}")
return None
def handle_command(self, data):
if not data or len(data) < 14:
logging.debug("收到空或无效数据,跳过处理")
return
command = data[2]
pile_id = data[3:11]
self.current_pile_id = pile_id
logging.info(f"处理命令: {command:02X}, 桩号: {pile_id.hex().upper()}")
handler = self.command_handlers.get(command)
if handler:
if command == 0x01:
logging.info("收到01H请求连接")
response = self.command_handlers[0x02].build_02h_response(pile_id, allow=True)
if response:
self.send_frame(response)
logging.info(f"发送02H允许连接响应: {response.hex().upper()}")
elif command == 0x03:
logging.info(f"收到03H登录信息原始报文: {data.hex().upper()}")
parsed_data = self.command_handlers[0x03].process_03h(data)
if parsed_data:
gun_count = parsed_data["login_data"]["gun_count"]
response = self.command_handlers[0x02].build_02h_response(pile_id, allow=True,
gun_count=gun_count)
if response:
self.send_frame(response)
logging.info(f"发送02H确认登录响应: {response.hex().upper()}")
time.sleep(5) # 等待5秒确认是否重复03H
if not self.is_logged_in and self.conn:
logging.warning("充电桩未进入正式状态尝试重发02H")
self.send_frame(response)
if not self.is_logged_in:
self.is_logged_in = True
logging.info(f"充电桩 {pile_id.hex().upper()} 登录成功")
request_08h = self.command_handlers[0x08].build_08h_request(pile_id)
if request_08h:
self.send_frame(request_08h)
logging.info(f"发送08H查询状态请求: {request_08h.hex().upper()}")
request_0a = self.command_handlers[0x0A].build_0a_request(pile_id)
if request_0a:
self.send_frame(request_0a)
logging.info(f"发送0A查询记录请求: {request_0a.hex().upper()}")
else:
logging.warning(f"充电桩 {pile_id.hex().upper()} 03H解析失败")
elif command == 0x08:
logging.info(f"收到08H响应原始报文: {data.hex().upper()}")
self.command_handlers[0x08].process_08h_response(data)
elif command == 0x09:
logging.info(f"收到09H状态信息原始报文: {data.hex().upper()}")
self.command_handlers[0x09].process_09h(data)
elif command == 0x0A:
logging.info(f"收到0A响应原始报文: {data.hex().upper()}")
self.command_handlers[0x0A].process_0a_response(data)
elif command == 0x19:
logging.info(f"收到19H卡鉴权请求原始报文: {data.hex().upper()}")
self.command_handlers[0x19].process_and_respond(data, self.conn)
if self.is_logged_in:
self.start_charging(pile_id)
elif command == 0x1F:
logging.info(f"收到1FH启动充电命令原始报文: {data.hex().upper()}")
self.command_handlers[0x1F].process_1f(pile_id, self.conn)
elif command == 0x20:
logging.info(f"收到20H启动充电结果原始报文: {data.hex().upper()}")
self.command_handlers[0x20].parse_20h(data)
elif command == 0x21:
logging.info(f"收到21H充电状态原始报文: {data.hex().upper()}")
self.command_handlers[0x21].process_and_respond(data, self.conn)
elif command == 0x23:
logging.info(f"收到23H停止充电命令原始报文: {data.hex().upper()}")
self.command_handlers[0x23].process_and_respond(data, self.conn)
elif command == 0x25:
logging.info(f"收到25H充电记录原始报文: {data.hex().upper()}")
self.command_handlers[0x25].process_25h(data)
elif command == 0x37:
logging.info(f"发送37H下发计费模版请求原始报文: {data.hex().upper()}")
elif command == 0x38:
logging.info(f"收到38H计费模版下发结果原始报文: {data.hex().upper()}")
self.command_handlers[0x38].parse_38h_response(data)
elif command == 0x0B:
self.update_heartbeat(pile_id)
logging.info(f"发送0BH平台心跳响应: {data.hex().upper()}")
response = self.command_handlers[0x0C].build_0c_heartbeat(pile_id)
if response:
self.send_frame(response)
elif command == 0x0C:
self.update_heartbeat(pile_id)
logging.info(f"收到0CH充电桩心跳: {data.hex().upper()}")
self.command_handlers[0x0C].process_0c_heartbeat(data, self.conn)
elif command == 0x04:
self.command_handlers[0x04].process_04h(data)
self.stop()
elif command == 0x05:
self.command_handlers[0x05].process_05h_response(data)
response = self.command_handlers[0x06].build_06h_response(pile_id)
if response:
self.send_frame(response)
elif command == 0x06:
self.command_handlers[0x06].process_06h(data)
elif command == 0x07:
self.command_handlers[0x07].process_07h(data, self.conn)
elif command == 0x26:
self.command_handlers[0x26].build_26h_command(data, self.conn)
elif command == 0x27:
self.command_handlers[0x27].parse_27h(data)
elif command == 0x30:
self.command_handlers[0x30].parse_30h_bms_status(data)
else:
logging.warning(f"未知命令: {command:02X}, 数据: {data.hex().upper()}")
else:
logging.warning(f"未找到命令 {command:02X} 的处理程序")
def start_charging(self, pile_id):
if not self.is_logged_in:
logging.error("充电桩未登录,无法启动充电")
return False
if not self.conn:
logging.error("未建立连接,无法启动充电")
return False
handler = self.command_handlers[0x1F]
result = handler.process_1f(pile_id, self.conn)
if result:
logging.info(f"成功发送1FH启动充电命令给桩号 {pile_id.hex().upper()}")
return result
def send_billing_template(self, pile_id, billing_template):
if not self.is_logged_in:
logging.error("充电桩未登录,无法下发计费模版")
return False
if not self.conn:
logging.error("未建立连接,无法下发计费模版")
return False
handler = self.command_handlers[0x37]
frame = handler.build_37h_request(pile_id, billing_template)
if frame and self.send_frame(frame):
logging.info(f"成功发送37H下发计费模版请求给桩号 {pile_id.hex().upper()}")
return True
return False
def run(self):
self.running = True
while self.running:
if not self.connect():
continue
try:
while self.running and self.conn:
data = self.receive_frame()
if data:
self.handle_command(data)
time.sleep(0.1)
except Exception as e:
logging.error(f"连接处理出错: {str(e)}")
finally:
self.is_logged_in = False
if self.conn:
self.conn.close()
self.conn = None
self.current_pile_id = None
logging.info("连接断开,等待新的连接...")
def stop(self):
self.running = False
if self.conn:
self.conn.close()
if self.sock:
self.sock.close()
if self.api_server:
self.api_server.shutdown()
self.api_server.server_close()
logging.info("直连程序停止")
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
connection = ChargingPileDirectConnection(host="0.0.0.0", port=52461, api_host="0.0.0.0", api_port=8083)
connection.run()

View File

@ -1,42 +1,69 @@
import paho.mqtt.client as mqtt
import json
import time
import logging
class MQTTClient:
def __init__(self):
self.client = mqtt.Client(client_id="GoClientExample", protocol=mqtt.MQTTv311,
callback_api_version=mqtt.CallbackAPIVersion.VERSION1)
self.client.username_pw_set("emqx_test", "emqx_test")
def __init__(self, host="123.6.102.119", port=1883, username="emqx_test", password="emqx_test"):
self.host = host
self.port = port
self.username = username
self.password = password
self.client = mqtt.Client(client_id="DirectChargingClient", protocol=mqtt.MQTTv311)
self.client.username_pw_set(self.username, self.password)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.connected = False
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT broker")
logging.info("成功连接到 MQTT 服务器")
self.connected = True
else:
print(f"Failed to connect to MQTT broker with code: {rc}")
logging.error(f"MQTT 连接失败,错误码: {rc}")
self.connected = False
def on_disconnect(self, client, userdata, rc):
print("Disconnected from MQTT broker")
logging.warning("MQTT 断开连接")
self.connected = False
def connect(self):
try:
self.client.connect("123.6.102.119", 1883, 60)
self.client.connect(self.host, self.port, 60)
self.client.loop_start()
except Exception as e:
print(f"MQTT connection error: {str(e)}")
def publish_message(self, message):
try:
# 等待连接完成
for _ in range(10): # 最多等待 5 秒
if self.connected:
self.client.publish("hejin/charging/log", json.dumps(message), qos=1)
else:
print("MQTT client not connected")
return True
time.sleep(0.5)
logging.error("MQTT 连接超时")
return False
except Exception as e:
print(f"MQTT publish error: {str(e)}")
logging.error(f"MQTT 连接失败: {str(e)}")
return False
def publish_message(self, message, topic="hejin/charging/log"):
try:
if not self.connected:
logging.error("MQTT client not connected")
return False
msg = json.dumps(message)
result = self.client.publish(topic, msg, qos=1)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logging.info(f"发布消息成功: {msg}")
return True
else:
logging.error(f"发布消息失败,错误码: {result.rc}")
return False
except Exception as e:
logging.error(f"MQTT 发布消息失败: {str(e)}")
return False
def disconnect(self):
try:
self.client.loop_stop()
self.client.disconnect()
self.connected = False
logging.info("MQTT 断开连接")
except Exception as e:
logging.error(f"MQTT 断开连接失败: {str(e)}")

View File

@ -1,390 +0,0 @@
import socket
import logging
import threading
import time
from .utils import ProxyUtils
from .mqtt_client import MQTTClient
from commands.command_heartbeat import CommandHeartbeat
from commands.command_02 import Command02
from commands.command_03 import Command03
from commands.command_07 import Command07
from commands.command_08 import Command08
from commands.command_09 import Command09
from commands.command_0A import Command0A
from commands.command_25 import Command25
from commands.command_30 import Command30
from commands.command_19_1A import Command191A
from commands.command_1F_20 import Command1F20
from commands.command_21_22 import Command2122
from commands.command_23_24 import Command2324
from commands.command_26_27 import Command2627
class ChargingPileProxyServer:
def __init__(self, listen_host='0.0.0.0', listen_port=52461,
forward_host='139.9.209.227', forward_port=52461):
self.listen_host = listen_host
self.listen_port = listen_port
self.forward_host = forward_host
self.forward_port = forward_port
self.server_socket = None
self.running = False
self.clients = {}
self.remote_connections = {}
self.mqtt_client = MQTTClient()
self.pile_ids = {}
self.utils = ProxyUtils()
self.heartbeat_handler = CommandHeartbeat()
self.command_02_handler = Command02()
self.command_03_handler = Command03()
self.command_07_handler = Command07()
self.command_08_handler = Command08()
self.command_09_handler = Command09()
self.command_0a_handler = Command0A()
self.charge_info_handler = Command25()
self.bms_handler = Command30()
self.card_auth_handler = Command191A()
self.start_charge_handler = Command1F20()
self.charge_result_handler = Command2122()
self.order_handler = Command2324()
self.stop_charge_handler = Command2627()
# 存储登录信息的字典,以桩号为键
self.login_info = {}
# 存储对时信息的字典
self.time_sync_info = {}
# 存储遥信信息的字典
self.remote_signal_info = {}
# 存储故障信息的字典
self.fault_info = {}
# 存储心跳信息的字典
self.heartbeat_info = {}
self.heartbeat_thread = threading.Thread(target=self.check_heartbeat_timeout)
self.heartbeat_thread.daemon = True
self.heartbeat_thread.start()
def check_heartbeat_timeout(self):
"""心跳超时检测"""
while self.running:
try:
current_time = time.time()
disconnected_piles = []
for pile_id, last_heartbeat in self.heartbeat_info.items():
time_diff = current_time - last_heartbeat
# 如果超过60秒没收到心跳
if time_diff > 60:
msg = f"充电桩 {pile_id.hex().upper()} 心跳超时 {time_diff:.1f}"
logging.warning(msg)
print(msg)
self.mqtt_client.publish_message(msg)
disconnected_piles.append(pile_id)
# 从心跳信息中移除断开的充电桩
for pile_id in disconnected_piles:
del self.heartbeat_info[pile_id]
self.handle_pile_disconnect(pile_id)
# 每5秒检查一次
time.sleep(5)
except Exception as e:
logging.error(f"心跳检测错误: {str(e)}")
time.sleep(5)
def update_heartbeat(self, pile_id):
"""更新心跳时间戳"""
try:
self.heartbeat_info[pile_id] = time.time()
logging.debug(f"更新充电桩 {pile_id.hex().upper()} 心跳时间")
except Exception as e:
logging.error(f"更新心跳时间戳失败: {str(e)}")
def handle_pile_disconnect(self, pile_id):
"""处理充电桩断开连接"""
try:
msg = f"充电桩 {pile_id.hex().upper()} 断开连接"
logging.info(msg)
print(msg)
self.mqtt_client.publish_message(msg)
# 可以在这里添加充电桩断开后的清理逻辑
# 例如:中断正在进行的充电等
except Exception as e:
logging.error(f"处理充电桩断开连接失败: {str(e)}")
def start(self):
"""启动代理服务器"""
try:
self.mqtt_client.connect()
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.listen_host, self.listen_port))
self.server_socket.listen(5)
self.running = True
start_msg = f"代理服务器已启动,监听地址: {self.listen_host}:{self.listen_port}"
logging.info(start_msg)
print(start_msg)
self.mqtt_client.publish_message(start_msg)
while self.running:
client_socket, address = self.server_socket.accept()
client_msg = f"收到新的客户端连接,地址: {address}"
logging.info(client_msg)
print(client_msg)
self.mqtt_client.publish_message(client_msg)
client_thread = threading.Thread(target=self.handle_client,
args=(client_socket, address))
client_thread.daemon = True
client_thread.start()
except Exception as e:
error_msg = f"代理服务器错误: {str(e)}"
logging.error(error_msg)
print(error_msg)
self.mqtt_client.publish_message(error_msg)
finally:
if self.server_socket:
self.server_socket.close()
def create_remote_connection(self, client_address):
"""创建与远程服务器的连接"""
try:
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote_socket.connect((self.forward_host, self.forward_port))
connect_msg = f"已连接到远程服务器 {self.forward_host}:{self.forward_port}"
logging.info(connect_msg)
print(connect_msg)
self.mqtt_client.publish_message(connect_msg)
self.remote_connections[client_address] = remote_socket
return remote_socket
except Exception as e:
error_msg = f"连接远程服务器失败: {str(e)}"
logging.error(error_msg)
print(error_msg)
self.mqtt_client.publish_message(error_msg)
return None
def forward_data(self, source_socket, destination_socket, source_address):
"""转发数据"""
try:
while self.running:
data = source_socket.recv(1024) # 接收数据
if not data:
break # 如果没有数据,退出循环
# 如果接收到的数据长度大于等于14字节且以'JX'开头
if len(data) >= 14 and data[0:2] == b'JX':
command = data[2] # 提取命令字节
# 获取本地和远程端口信息用于日志
source_local, source_remote = self.utils.get_socket_info(source_socket)
dest_local, dest_remote = self.utils.get_socket_info(destination_socket)
# 根据命令字节处理不同命令
if command == 0x01: # 01H命令
logging.info(f"收到01H连接请求命令: {data.hex().upper()}")
if self.command_02_handler.process_and_respond(data, destination_socket):
logging.info("01H命令处理完成")
continue # 跳过后续转发,避免重复转发
elif command == 0x03: # 03H命令
logging.info(f"收到03H登录命令: {data.hex().upper()}")
if self.command_03_handler.process_03h(data):
logging.info("03H命令处理完成")
continue
elif command == 0x0C: # 0CH桩心跳命令
# 提取桩号并更新心跳
pile_id = data[3:11]
self.update_heartbeat(pile_id)
logging.info(f"收到0CH心跳命令: {data.hex().upper()}")
if self.heartbeat_handler.process_and_respond(data, destination_socket):
logging.info("0CH心跳命令处理完成")
continue # 跳过后续转发
elif command == 0x19: # 19H卡鉴权命令
logging.info(f"收到19H卡鉴权命令: {data.hex().upper()}")
if self.card_auth_handler.process_and_respond(data, destination_socket):
logging.info("19H卡鉴权命令处理完成")
continue # 跳过后续转发
elif command == 0x1F: # 1FH启动充电命令
logging.info(f"收到1FH启动充电命令: {data.hex()}")
if self.start_charge_handler.process_and_respond(data, destination_socket):
logging.info("1FH命令处理完成")
continue
elif command == 0x20: # 20H启动充电回复
logging.info(f"收到20H启动充电回复: {data.hex()}")
self.start_charge_handler.parse_20h(data)
elif command == 0x21: # 21H启动充电结果命令
logging.info(f"收到21H启动充电结果命令: {data.hex()}")
if self.charge_result_handler.process_and_respond(data, destination_socket):
logging.info("21H命令处理完成")
continue # 跳过后续转发
elif command == 0x23: # 23H充电订单命令
logging.info(f"收到23H充电订单命令: {data.hex().upper()}")
if self.order_handler.process_and_respond(data, destination_socket):
logging.info("23H充电订单命令处理完成")
continue
elif command == 0x25: # 25H充电信息命令
logging.info(f"收到25H充电信息命令: {data.hex()}")
if self.charge_info_handler.process_25h(data):
logging.info("25H命令处理完成")
elif command == 0x26: # 26H停止充电命令
logging.info(f"收到26H停止充电命令: {data.hex()}")
if self.stop_charge_handler.build_26h_command(data, destination_socket):
logging.info("26H命令处理完成")
continue
elif command == 0x27: # 27H停止充电回复
logging.info(f"收到27H停止充电回复: {data.hex()}")
self.stop_charge_handler.parse_27h(data)
elif command == 0x30: # 30H BMS信息命令
logging.info(f"收到30H BMS信息命令: {data.hex().upper()}")
if self.bms_handler.parse_30h_bms_status(data):
logging.info("30H BMS信息命令处理完成")
else:
# 未知命令,记录日志
logging.warning(f"未知命令:{command:02X},数据内容: {data.hex().upper()}")
# 如果数据来自客户端连接,提取桩号
if source_socket not in self.remote_connections.values():
pile_id = self.utils.extract_pile_id(data)
if pile_id:
self.pile_ids[source_address] = pile_id
# 判断数据发送方向:是发送到远程服务器还是充电桩
is_to_remote = destination_socket in self.remote_connections.values()
direction = "发送到远程服务器" if is_to_remote else "发送到充电桩"
mqtt_direction = "u" if is_to_remote else "d"
# 发送数据到目的地
destination_socket.send(data)
# 记录数据转发日志
msg = (f"数据转发成功: {direction} | "
f"本地地址: {source_local} | "
f"远程地址: {dest_remote} | "
f"命令: {command:02X}H | "
f"数据长度: {len(data)}")
logging.info(msg)
self.mqtt_client.publish_message(msg)
# 记录完整的数据内容到调试日志
logging.debug(f"完整数据内容: {data.hex().upper()}")
# 每次数据转发完成后,检查连接状态
if not self.running:
break
except ConnectionResetError:
logging.error(f"连接被重置: {source_remote}")
self.mqtt_client.publish_message(f"连接被重置: {source_remote}")
except socket.timeout:
logging.error(f"连接超时: {source_remote}")
self.mqtt_client.publish_message(f"连接超时: {source_remote}")
except Exception as e:
logging.error(f"转发数据出错: {str(e)}")
self.mqtt_client.publish_message(f"转发数据出错: {str(e)}")
finally:
# 关闭连接
try:
if source_socket:
source_socket.close()
if destination_socket:
destination_socket.close()
except Exception as e:
logging.error(f"关闭连接出错: {str(e)}")
def handle_client(self, client_socket, client_address):
"""处理客户端连接"""
try:
remote_socket = self.create_remote_connection(client_address)
if not remote_socket:
client_socket.close()
return
forward_thread = threading.Thread(
target=self.forward_data,
args=(client_socket, remote_socket, client_address)
)
backward_thread = threading.Thread(
target=self.forward_data,
args=(remote_socket, client_socket, client_address)
)
forward_thread.daemon = True
backward_thread.daemon = True
forward_thread.start()
backward_thread.start()
forward_thread.join()
backward_thread.join()
except Exception as e:
error_msg = f"处理客户端连接错误: {str(e)}"
logging.error(error_msg)
print(error_msg)
self.mqtt_client.publish_message(error_msg)
finally:
if client_address in self.remote_connections:
self.remote_connections[client_address].close()
del self.remote_connections[client_address]
if client_address in self.pile_ids:
del self.pile_ids[client_address]
client_socket.close()
close_msg = f"客户端连接已关闭 {client_address}"
logging.info(close_msg)
print(close_msg)
self.mqtt_client.publish_message(close_msg)
def stop(self):
"""停止代理服务器"""
self.running = False
if self.server_socket:
self.server_socket.close()
for remote_socket in self.remote_connections.values():
remote_socket.close()
self.remote_connections.clear()
self.pile_ids.clear()
self.mqtt_client.disconnect()
stop_msg = "代理服务器已停止"
logging.info(stop_msg)
print(stop_msg)
self.mqtt_client.publish_message(stop_msg)

View File

@ -10,24 +10,13 @@ class ProxyUtils:
def extract_pile_id(data):
"""从数据包中提取桩号"""
try:
if len(data) > 10: # 确保数据包足够长
# 桩号在第5-8个字节
if len(data) > 10:
pile_id = ''.join([f"{b:02X}" for b in data[3:11]])
return pile_id
return None
except Exception:
return None
@staticmethod
def get_socket_info(socket_obj):
"""获取socket的本地和远程地址信息"""
try:
local_address = socket_obj.getsockname()
remote_address = socket_obj.getpeername()
return local_address, remote_address
except:
return None, None
@staticmethod
def get_current_time():
"""获取当前时间的格式化字符串"""

File diff suppressed because it is too large Load Diff

View File

@ -1,35 +1,41 @@
import logging
import sys
import os
import threading
# 将项目根目录添加到Python路径
# 添加项目路径
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_dir)
# logging.debug(f"Current directory: {current_dir}") # 移除控制台输出,改为日志
from core.proxy_server import ChargingPileProxyServer
import logging
from core.direct_connection import ChargingPileDirectConnection
# 设置系统默认编码为UTF-8
if sys.version_info[0] == 3:
sys.stdout.reconfigure(encoding='utf-8')
# 配置日志
# 配置日志,仅记录到文件
log_file = os.path.join(current_dir, 'direct_connect.log')
logging.basicConfig(
filename='test.log',
level=logging.INFO,
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
encoding='utf-8'
encoding='utf-8',
force=True,
handlers=[
logging.FileHandler(log_file, encoding='utf-8')
]
)
def main():
server = ChargingPileProxyServer()
logging.info("启动直连服务器...")
direct_conn = ChargingPileDirectConnection(host="0.0.0.0", port=52461)
try:
server.start()
direct_conn.run()
except KeyboardInterrupt:
server.stop()
msg = "代理服务器已完全关闭"
logging.info(msg)
print(msg)
logging.info("接收到中断信号,停止程序")
direct_conn.stop()
except Exception as e:
logging.error(f"发生异常: {str(e)}")
direct_conn.stop()
finally:
direct_conn.stop()
if __name__ == "__main__":
main()