import socket import struct import logging import threading import time from datetime import datetime import binascii import sys import json import paho.mqtt.client as mqtt # 设置系统默认编码为UTF-8 if sys.version_info[0] == 3: sys.stdout.reconfigure(encoding='utf-8') # 配置日志 logging.basicConfig( filename='test.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', encoding='utf-8' ) class MQTTClient: def __init__(self): self.client = mqtt.Client(client_id="GoClientExample", protocol=mqtt.MQTTv311, callback_api_version=mqtt.CallbackAPIVersion.VERSION1) self.client.username_pw_set("emqx_test", "emqx_test") self.client.on_connect = self.on_connect self.client.on_disconnect = self.on_disconnect self.connected = False def on_connect(self, client, userdata, flags, rc): if rc == 0: print("Connected to MQTT broker") self.connected = True else: print(f"Failed to connect to MQTT broker with code: {rc}") def on_disconnect(self, client, userdata, rc): print("Disconnected from MQTT broker") self.connected = False def connect(self): try: self.client.connect("123.6.102.119", 1883, 60) self.client.loop_start() except Exception as e: print(f"MQTT connection error: {str(e)}") def publish_message(self, message): try: if self.connected: # 直接发布消息值,无论是列表还是字符串 self.client.publish("hejin/charging/log", json.dumps(message), qos=1) else: print("MQTT client not connected") except Exception as e: print(f"MQTT publish error: {str(e)}") def disconnect(self): self.client.loop_stop() self.client.disconnect() class ChargingPileProxyServer: def __init__(self, listen_host='0.0.0.0', listen_port=52461, forward_host='139.9.209.227', forward_port=52461): self.listen_host = listen_host self.listen_port = listen_port self.forward_host = forward_host self.forward_port = forward_port self.server_socket = None self.running = False self.clients = {} # 存储客户端连接 self.remote_connections = {} # 存储与远程服务器的连接 self.mqtt_client = MQTTClient() # 创建MQTT客户端 self.pile_ids = {} # 存储客户端地址与桩号的映射 def extract_pile_id(self, data): """从数据包中提取桩号""" try: if len(data) > 10: # 确保数据包足够长 # 桩号在第5-8个字节 pile_id = ''.join([f"{b:02X}" for b in data[3:11]]) return pile_id return None except Exception: return None def format_hex_data(self, data): """格式化十六进制数据显示""" return ' '.join([f"{b:02X}" for b in data]) def start(self): """启动代理服务器""" try: self.mqtt_client.connect() self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.listen_host, self.listen_port)) self.server_socket.listen(5) self.running = True start_msg = f"代理服务器已启动,监听地址: {self.listen_host}:{self.listen_port}" logging.info(start_msg) print(start_msg) self.mqtt_client.publish_message(start_msg) while self.running: client_socket, address = self.server_socket.accept() client_msg = f"收到新的客户端连接,地址: {address}" logging.info(client_msg) print(client_msg) self.mqtt_client.publish_message(client_msg) client_thread = threading.Thread(target=self.handle_client, args=(client_socket, address)) client_thread.daemon = True client_thread.start() except Exception as e: error_msg = f"代理服务器错误: {str(e)}" logging.error(error_msg) print(error_msg) self.mqtt_client.publish_message(error_msg) finally: if self.server_socket: self.server_socket.close() def get_socket_info(self, socket_obj): """获取socket的本地和远程地址信息""" try: local_address = socket_obj.getsockname() remote_address = socket_obj.getpeername() return local_address, remote_address except: return None, None def create_remote_connection(self, client_address): """创建与远程服务器的连接""" try: remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) remote_socket.connect((self.forward_host, self.forward_port)) connect_msg = f"已连接到远程服务器 {self.forward_host}:{self.forward_port}" logging.info(connect_msg) print(connect_msg) self.mqtt_client.publish_message(connect_msg) self.remote_connections[client_address] = remote_socket return remote_socket except Exception as e: error_msg = f"连接远程服务器失败: {str(e)}" logging.error(error_msg) print(error_msg) self.mqtt_client.publish_message(error_msg) return None def forward_data(self, source_socket, destination_socket, source_address): """转发数据""" try: while self.running: data = source_socket.recv(1024) if not data: break #尝试提取桩号 if source_socket not in self.remote_connections.values(): pile_id = self.extract_pile_id(data) if pile_id: # 只在成功提取到桩号时更新 self.pile_ids[source_address] = pile_id # 获取源和目标socket的地址信息 source_local, source_remote = self.get_socket_info(source_socket) dest_local, dest_remote = self.get_socket_info(destination_socket) # 确定数据传输方向 is_to_remote = destination_socket in self.remote_connections.values() direction = "发送到远程服务器" if is_to_remote else "发送到充电桩" mqtt_direction = "u" if is_to_remote else "d" # 新增:MQTT消息中的方向标识 # 获取实际的源IP和目标IP from_ip = source_remote[0] if source_remote else "unknown" to_ip = dest_remote[0] if dest_remote else "unknown" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 记录转发的数据 hex_data = self.format_hex_data(data) # 构建消息数组 msg_array = [ from_ip, # 源IP地址 to_ip, # 目标IP地址 current_time, # 当前时间 hex_data, # 十六进制数据 self.pile_ids.get(source_address, "unknown"), # 桩号 mqtt_direction # 传输方向 ] print(msg_array) # 发送到MQTT self.mqtt_client.publish_message(msg_array) # 为控制台和日志文件使用格式化的消息 formatted_msg = f"[{current_time}] [{from_ip} -> {to_ip}] 转发数据 ({direction}): {hex_data}" logging.info(formatted_msg) print(formatted_msg) destination_socket.send(data) except Exception as e: error_msg = f"转发数据错误: {str(e)}" logging.error(error_msg) print(error_msg) self.mqtt_client.publish_message(error_msg) finally: source_socket.close() if source_address in self.remote_connections: self.remote_connections[source_address].close() del self.remote_connections[source_address] if source_address in self.pile_ids: del self.pile_ids[source_address] def handle_client(self, client_socket, client_address): """处理客户端连接""" try: remote_socket = self.create_remote_connection(client_address) if not remote_socket: client_socket.close() return forward_thread = threading.Thread( target=self.forward_data, args=(client_socket, remote_socket, client_address) ) backward_thread = threading.Thread( target=self.forward_data, args=(remote_socket, client_socket, client_address) ) forward_thread.daemon = True backward_thread.daemon = True forward_thread.start() backward_thread.start() forward_thread.join() backward_thread.join() except Exception as e: error_msg = f"处理客户端连接错误: {str(e)}" logging.error(error_msg) print(error_msg) self.mqtt_client.publish_message(error_msg) finally: if client_address in self.remote_connections: self.remote_connections[client_address].close() del self.remote_connections[client_address] if client_address in self.pile_ids: del self.pile_ids[client_address] client_socket.close() close_msg = f"客户端连接已关闭 {client_address}" logging.info(close_msg) print(close_msg) self.mqtt_client.publish_message(close_msg) def stop(self): """停止代理服务器""" self.running = False if self.server_socket: self.server_socket.close() # 关闭所有连接 for remote_socket in self.remote_connections.values(): remote_socket.close() self.remote_connections.clear() self.pile_ids.clear() # 断开MQTT连接 self.mqtt_client.disconnect() stop_msg = "代理服务器已停止" logging.info(stop_msg) print(stop_msg) self.mqtt_client.publish_message(stop_msg) def main(): server = ChargingPileProxyServer() try: server.start() except KeyboardInterrupt: server.stop() msg = "代理服务器已完全关闭" logging.info(msg) print(msg) if __name__ == "__main__": main()