diff --git a/charging_pile_proxy/order.py b/charging_pile_proxy/order.py new file mode 100644 index 0000000..c300e45 --- /dev/null +++ b/charging_pile_proxy/order.py @@ -0,0 +1,303 @@ +import socket +import struct +import logging +import threading +import time +from datetime import datetime +import binascii +import sys +import json +import paho.mqtt.client as mqtt + +# 设置系统默认编码为UTF-8 +if sys.version_info[0] == 3: + sys.stdout.reconfigure(encoding='utf-8') + +# 配置日志 +logging.basicConfig( + filename='test.log', + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + encoding='utf-8' +) + + +class MQTTClient: + def __init__(self): + self.client = mqtt.Client(client_id="GoClientExample", protocol=mqtt.MQTTv311, + callback_api_version=mqtt.CallbackAPIVersion.VERSION1) + self.client.username_pw_set("emqx_test", "emqx_test") + self.client.on_connect = self.on_connect + self.client.on_disconnect = self.on_disconnect + self.connected = False + + def on_connect(self, client, userdata, flags, rc): + if rc == 0: + print("Connected to MQTT broker") + self.connected = True + else: + print(f"Failed to connect to MQTT broker with code: {rc}") + + def on_disconnect(self, client, userdata, rc): + print("Disconnected from MQTT broker") + self.connected = False + + def connect(self): + try: + self.client.connect("123.6.102.119", 1883, 60) + self.client.loop_start() + except Exception as e: + print(f"MQTT connection error: {str(e)}") + + def publish_message(self, message): + try: + if self.connected: + # 直接发布消息值,无论是列表还是字符串 + self.client.publish("hejin/charging/log", json.dumps(message), qos=1) + else: + print("MQTT client not connected") + except Exception as e: + print(f"MQTT publish error: {str(e)}") + + def disconnect(self): + self.client.loop_stop() + self.client.disconnect() + + +class ChargingPileProxyServer: + def __init__(self, listen_host='0.0.0.0', listen_port=52461, + forward_host='139.9.209.227', forward_port=52461): + self.listen_host = listen_host + self.listen_port = listen_port + self.forward_host = forward_host + self.forward_port = forward_port + self.server_socket = None + self.running = False + self.clients = {} # 存储客户端连接 + self.remote_connections = {} # 存储与远程服务器的连接 + self.mqtt_client = MQTTClient() # 创建MQTT客户端 + self.pile_ids = {} # 存储客户端地址与桩号的映射 + + def extract_pile_id(self, data): + """从数据包中提取桩号""" + try: + + if len(data) > 10: # 确保数据包足够长 + # 桩号在第5-8个字节 + pile_id = ''.join([f"{b:02X}" for b in data[3:11]]) + return pile_id + return None + except Exception: + return None + + def format_hex_data(self, data): + """格式化十六进制数据显示""" + return ' '.join([f"{b:02X}" for b in data]) + + def start(self): + """启动代理服务器""" + try: + self.mqtt_client.connect() + + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.bind((self.listen_host, self.listen_port)) + self.server_socket.listen(5) + self.running = True + + start_msg = f"代理服务器已启动,监听地址: {self.listen_host}:{self.listen_port}" + logging.info(start_msg) + print(start_msg) + self.mqtt_client.publish_message(start_msg) + + while self.running: + client_socket, address = self.server_socket.accept() + client_msg = f"收到新的客户端连接,地址: {address}" + logging.info(client_msg) + print(client_msg) + self.mqtt_client.publish_message(client_msg) + + client_thread = threading.Thread(target=self.handle_client, + args=(client_socket, address)) + client_thread.daemon = True + client_thread.start() + + except Exception as e: + error_msg = f"代理服务器错误: {str(e)}" + logging.error(error_msg) + print(error_msg) + self.mqtt_client.publish_message(error_msg) + finally: + if self.server_socket: + self.server_socket.close() + + def get_socket_info(self, socket_obj): + """获取socket的本地和远程地址信息""" + try: + local_address = socket_obj.getsockname() + remote_address = socket_obj.getpeername() + return local_address, remote_address + except: + return None, None + + def create_remote_connection(self, client_address): + """创建与远程服务器的连接""" + try: + remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + remote_socket.connect((self.forward_host, self.forward_port)) + + connect_msg = f"已连接到远程服务器 {self.forward_host}:{self.forward_port}" + logging.info(connect_msg) + print(connect_msg) + self.mqtt_client.publish_message(connect_msg) + + self.remote_connections[client_address] = remote_socket + return remote_socket + except Exception as e: + error_msg = f"连接远程服务器失败: {str(e)}" + logging.error(error_msg) + print(error_msg) + self.mqtt_client.publish_message(error_msg) + return None + + def forward_data(self, source_socket, destination_socket, source_address): + """转发数据""" + try: + while self.running: + data = source_socket.recv(1024) + if not data: + break + + #尝试提取桩号 + if source_socket not in self.remote_connections.values(): + pile_id = self.extract_pile_id(data) + if pile_id: # 只在成功提取到桩号时更新 + self.pile_ids[source_address] = pile_id + + # 获取源和目标socket的地址信息 + source_local, source_remote = self.get_socket_info(source_socket) + dest_local, dest_remote = self.get_socket_info(destination_socket) + + # 确定数据传输方向 + is_to_remote = destination_socket in self.remote_connections.values() + direction = "发送到远程服务器" if is_to_remote else "发送到充电桩" + mqtt_direction = "u" if is_to_remote else "d" # 新增:MQTT消息中的方向标识 + + # 获取实际的源IP和目标IP + from_ip = source_remote[0] if source_remote else "unknown" + to_ip = dest_remote[0] if dest_remote else "unknown" + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + + # 记录转发的数据 + hex_data = self.format_hex_data(data) + # 构建消息数组 + msg_array = [ + from_ip, # 源IP地址 + to_ip, # 目标IP地址 + current_time, # 当前时间 + hex_data, # 十六进制数据 + self.pile_ids.get(source_address, "unknown"), # 桩号 + mqtt_direction # 传输方向 + ] + print(msg_array) + + # 发送到MQTT + self.mqtt_client.publish_message(msg_array) + + # 为控制台和日志文件使用格式化的消息 + formatted_msg = f"[{current_time}] [{from_ip} -> {to_ip}] 转发数据 ({direction}): {hex_data}" + logging.info(formatted_msg) + print(formatted_msg) + + destination_socket.send(data) + + except Exception as e: + error_msg = f"转发数据错误: {str(e)}" + logging.error(error_msg) + print(error_msg) + self.mqtt_client.publish_message(error_msg) + finally: + source_socket.close() + if source_address in self.remote_connections: + self.remote_connections[source_address].close() + del self.remote_connections[source_address] + if source_address in self.pile_ids: + del self.pile_ids[source_address] + + def handle_client(self, client_socket, client_address): + """处理客户端连接""" + try: + remote_socket = self.create_remote_connection(client_address) + if not remote_socket: + client_socket.close() + return + + forward_thread = threading.Thread( + target=self.forward_data, + args=(client_socket, remote_socket, client_address) + ) + backward_thread = threading.Thread( + target=self.forward_data, + args=(remote_socket, client_socket, client_address) + ) + + forward_thread.daemon = True + backward_thread.daemon = True + + forward_thread.start() + backward_thread.start() + + forward_thread.join() + backward_thread.join() + + except Exception as e: + error_msg = f"处理客户端连接错误: {str(e)}" + logging.error(error_msg) + print(error_msg) + self.mqtt_client.publish_message(error_msg) + finally: + if client_address in self.remote_connections: + self.remote_connections[client_address].close() + del self.remote_connections[client_address] + if client_address in self.pile_ids: + del self.pile_ids[client_address] + client_socket.close() + close_msg = f"客户端连接已关闭 {client_address}" + logging.info(close_msg) + print(close_msg) + self.mqtt_client.publish_message(close_msg) + + def stop(self): + """停止代理服务器""" + self.running = False + if self.server_socket: + self.server_socket.close() + + # 关闭所有连接 + for remote_socket in self.remote_connections.values(): + remote_socket.close() + self.remote_connections.clear() + self.pile_ids.clear() + + # 断开MQTT连接 + self.mqtt_client.disconnect() + + stop_msg = "代理服务器已停止" + logging.info(stop_msg) + print(stop_msg) + self.mqtt_client.publish_message(stop_msg) + + +def main(): + server = ChargingPileProxyServer() + try: + server.start() + except KeyboardInterrupt: + server.stop() + msg = "代理服务器已完全关闭" + logging.info(msg) + print(msg) + + +if __name__ == "__main__": + main() \ No newline at end of file