303 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import socket
import struct
import logging
import threading
import time
from datetime import datetime
import binascii
import sys
import json
import paho.mqtt.client as mqtt
# 设置系统默认编码为UTF-8
if sys.version_info[0] == 3:
sys.stdout.reconfigure(encoding='utf-8')
# 配置日志
logging.basicConfig(
filename='test.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
encoding='utf-8'
)
class MQTTClient:
def __init__(self):
self.client = mqtt.Client(client_id="GoClientExample", protocol=mqtt.MQTTv311,
callback_api_version=mqtt.CallbackAPIVersion.VERSION1)
self.client.username_pw_set("emqx_test", "emqx_test")
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.connected = False
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT broker")
self.connected = True
else:
print(f"Failed to connect to MQTT broker with code: {rc}")
def on_disconnect(self, client, userdata, rc):
print("Disconnected from MQTT broker")
self.connected = False
def connect(self):
try:
self.client.connect("123.6.102.119", 1883, 60)
self.client.loop_start()
except Exception as e:
print(f"MQTT connection error: {str(e)}")
def publish_message(self, message):
try:
if self.connected:
# 直接发布消息值,无论是列表还是字符串
self.client.publish("hejin/charging/log", json.dumps(message), qos=1)
else:
print("MQTT client not connected")
except Exception as e:
print(f"MQTT publish error: {str(e)}")
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()
class ChargingPileProxyServer:
def __init__(self, listen_host='0.0.0.0', listen_port=52461,
forward_host='139.9.209.227', forward_port=52461):
self.listen_host = listen_host
self.listen_port = listen_port
self.forward_host = forward_host
self.forward_port = forward_port
self.server_socket = None
self.running = False
self.clients = {} # 存储客户端连接
self.remote_connections = {} # 存储与远程服务器的连接
self.mqtt_client = MQTTClient() # 创建MQTT客户端
self.pile_ids = {} # 存储客户端地址与桩号的映射
def extract_pile_id(self, data):
"""从数据包中提取桩号"""
try:
if len(data) > 10: # 确保数据包足够长
# 桩号在第5-8个字节
pile_id = ''.join([f"{b:02X}" for b in data[3:11]])
return pile_id
return None
except Exception:
return None
def format_hex_data(self, data):
"""格式化十六进制数据显示"""
return ' '.join([f"{b:02X}" for b in data])
def start(self):
"""启动代理服务器"""
try:
self.mqtt_client.connect()
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.listen_host, self.listen_port))
self.server_socket.listen(5)
self.running = True
start_msg = f"代理服务器已启动,监听地址: {self.listen_host}:{self.listen_port}"
logging.info(start_msg)
print(start_msg)
self.mqtt_client.publish_message(start_msg)
while self.running:
client_socket, address = self.server_socket.accept()
client_msg = f"收到新的客户端连接,地址: {address}"
logging.info(client_msg)
print(client_msg)
self.mqtt_client.publish_message(client_msg)
client_thread = threading.Thread(target=self.handle_client,
args=(client_socket, address))
client_thread.daemon = True
client_thread.start()
except Exception as e:
error_msg = f"代理服务器错误: {str(e)}"
logging.error(error_msg)
print(error_msg)
self.mqtt_client.publish_message(error_msg)
finally:
if self.server_socket:
self.server_socket.close()
def get_socket_info(self, socket_obj):
"""获取socket的本地和远程地址信息"""
try:
local_address = socket_obj.getsockname()
remote_address = socket_obj.getpeername()
return local_address, remote_address
except:
return None, None
def create_remote_connection(self, client_address):
"""创建与远程服务器的连接"""
try:
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote_socket.connect((self.forward_host, self.forward_port))
connect_msg = f"已连接到远程服务器 {self.forward_host}:{self.forward_port}"
logging.info(connect_msg)
print(connect_msg)
self.mqtt_client.publish_message(connect_msg)
self.remote_connections[client_address] = remote_socket
return remote_socket
except Exception as e:
error_msg = f"连接远程服务器失败: {str(e)}"
logging.error(error_msg)
print(error_msg)
self.mqtt_client.publish_message(error_msg)
return None
def forward_data(self, source_socket, destination_socket, source_address):
"""转发数据"""
try:
while self.running:
data = source_socket.recv(1024)
if not data:
break
#尝试提取桩号
if source_socket not in self.remote_connections.values():
pile_id = self.extract_pile_id(data)
if pile_id: # 只在成功提取到桩号时更新
self.pile_ids[source_address] = pile_id
# 获取源和目标socket的地址信息
source_local, source_remote = self.get_socket_info(source_socket)
dest_local, dest_remote = self.get_socket_info(destination_socket)
# 确定数据传输方向
is_to_remote = destination_socket in self.remote_connections.values()
direction = "发送到远程服务器" if is_to_remote else "发送到充电桩"
mqtt_direction = "u" if is_to_remote else "d" # 新增MQTT消息中的方向标识
# 获取实际的源IP和目标IP
from_ip = source_remote[0] if source_remote else "unknown"
to_ip = dest_remote[0] if dest_remote else "unknown"
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# 记录转发的数据
hex_data = self.format_hex_data(data)
# 构建消息数组
msg_array = [
from_ip, # 源IP地址
to_ip, # 目标IP地址
current_time, # 当前时间
hex_data, # 十六进制数据
self.pile_ids.get(source_address, "unknown"), # 桩号
mqtt_direction # 传输方向
]
print(msg_array)
# 发送到MQTT
self.mqtt_client.publish_message(msg_array)
# 为控制台和日志文件使用格式化的消息
formatted_msg = f"[{current_time}] [{from_ip} -> {to_ip}] 转发数据 ({direction}): {hex_data}"
logging.info(formatted_msg)
print(formatted_msg)
destination_socket.send(data)
except Exception as e:
error_msg = f"转发数据错误: {str(e)}"
logging.error(error_msg)
print(error_msg)
self.mqtt_client.publish_message(error_msg)
finally:
source_socket.close()
if source_address in self.remote_connections:
self.remote_connections[source_address].close()
del self.remote_connections[source_address]
if source_address in self.pile_ids:
del self.pile_ids[source_address]
def handle_client(self, client_socket, client_address):
"""处理客户端连接"""
try:
remote_socket = self.create_remote_connection(client_address)
if not remote_socket:
client_socket.close()
return
forward_thread = threading.Thread(
target=self.forward_data,
args=(client_socket, remote_socket, client_address)
)
backward_thread = threading.Thread(
target=self.forward_data,
args=(remote_socket, client_socket, client_address)
)
forward_thread.daemon = True
backward_thread.daemon = True
forward_thread.start()
backward_thread.start()
forward_thread.join()
backward_thread.join()
except Exception as e:
error_msg = f"处理客户端连接错误: {str(e)}"
logging.error(error_msg)
print(error_msg)
self.mqtt_client.publish_message(error_msg)
finally:
if client_address in self.remote_connections:
self.remote_connections[client_address].close()
del self.remote_connections[client_address]
if client_address in self.pile_ids:
del self.pile_ids[client_address]
client_socket.close()
close_msg = f"客户端连接已关闭 {client_address}"
logging.info(close_msg)
print(close_msg)
self.mqtt_client.publish_message(close_msg)
def stop(self):
"""停止代理服务器"""
self.running = False
if self.server_socket:
self.server_socket.close()
# 关闭所有连接
for remote_socket in self.remote_connections.values():
remote_socket.close()
self.remote_connections.clear()
self.pile_ids.clear()
# 断开MQTT连接
self.mqtt_client.disconnect()
stop_msg = "代理服务器已停止"
logging.info(stop_msg)
print(stop_msg)
self.mqtt_client.publish_message(stop_msg)
def main():
server = ChargingPileProxyServer()
try:
server.start()
except KeyboardInterrupt:
server.stop()
msg = "代理服务器已完全关闭"
logging.info(msg)
print(msg)
if __name__ == "__main__":
main()