diff --git a/charging_pile_proxy/hejin_forward/charge_order_detail.py b/charging_pile_proxy/hejin_forward/charge_order_detail.py index e7e364b..f7ebd1c 100644 --- a/charging_pile_proxy/hejin_forward/charge_order_detail.py +++ b/charging_pile_proxy/hejin_forward/charge_order_detail.py @@ -6,7 +6,7 @@ import logging import time import uuid -# 配置日志 + logging.basicConfig( filename='charge_order_detail.log', level=logging.INFO, @@ -76,7 +76,7 @@ class ChargeOrderDetailMigrator: self.pg_conn.autocommit = True self.pg_cursor = self.pg_conn.cursor() logging.info("成功连接到 PostgreSQL") - break # 连接成功,退出重试循环 + break except Exception as e: logging.error(f"连接错误 (第 {attempt + 1} 次): {str(e)}") diff --git a/charging_pile_proxy/hejin_forward/charge_order_processor.py b/charging_pile_proxy/hejin_forward/charge_order_processor.py new file mode 100644 index 0000000..61cbc90 --- /dev/null +++ b/charging_pile_proxy/hejin_forward/charge_order_processor.py @@ -0,0 +1,344 @@ +import taosrest # 使用 taosrest 连接 TDengine +import paho.mqtt.client as mqtt +import binascii +import json +import logging +import time +from datetime import datetime +import struct +import sys + + +logging.basicConfig( + filename='charging_pile_processor.log', + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + encoding='utf-8' +) + + +MQTT_BROKER = "123.6.102.119" +MQTT_PORT = 1883 +MQTT_KEEPALIVE = 60 +MQTT_USERNAME = "emqx_test" +MQTT_PASSWORD = "emqx_test" +MQTT_TOPIC = "hejin/order_processed" + + +TDENGINE_CONFIG = { + 'host': '123.6.102.119', + 'port': 6041, + 'user': 'readonly_user', + 'password': 'Aassword123', + 'database': 'antsev' +} + + +class ChargingPileProcessor: + def __init__(self): + self.mqtt_client = None + self.td_conn = None + self.td_cursor = None + self.setup_logging() + self.skipped_msg_types = set() + self.connect_mqtt() + self.connect_tdengine() + + def setup_logging(self): + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(logging.INFO) + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + console_handler.setFormatter(formatter) + logging.getLogger().addHandler(console_handler) + + def connect_mqtt(self): + try: + self.mqtt_client = mqtt.Client(client_id="ChargingPileProcessor", protocol=mqtt.MQTTv5) + self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) + self.mqtt_client.on_connect = self.on_mqtt_connect + self.mqtt_client.on_disconnect = self.on_mqtt_disconnect + self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, MQTT_KEEPALIVE) + self.mqtt_client.loop_start() + logging.info("MQTT 客户端已初始化。") + except Exception as e: + logging.error(f"无法连接到 MQTT 服务器:{str(e)}") + raise + + def on_mqtt_connect(self, client, userdata, flags, rc, properties=None): + if rc == 0: + logging.info("成功连接到 MQTT 服务器。") + else: + logging.error(f"连接 MQTT 服务器失败,错误码:{rc}") + + def on_mqtt_disconnect(self, client, userdata, rc, properties=None): + logging.warning("与 MQTT 服务器断开连接。") + + def connect_tdengine(self): + max_retries = 3 + retry_delay = 10 + for attempt in range(max_retries): + try: + logging.info(f"尝试连接到 TDengine (第 {attempt + 1} 次): {TDENGINE_CONFIG}") + rest_url = f"http://{TDENGINE_CONFIG['host']}:{TDENGINE_CONFIG['port']}" + self.td_conn = taosrest.connect( + url=rest_url, + user=TDENGINE_CONFIG['user'], + password=TDENGINE_CONFIG['password'], + database=TDENGINE_CONFIG['database'] + ) + self.td_cursor = self.td_conn.cursor() + logging.info("成功连接到 TDengine") + + self.td_cursor.execute("SELECT SERVER_VERSION()") + version = self.td_cursor.fetchone() + logging.info(f"TDengine 服务器版本: {version[0]}") + break + + except Exception as e: + logging.error(f"连接 TDengine 错误 (第 {attempt + 1} 次): {str(e)}") + if attempt < max_retries - 1: + logging.info(f"将在 {retry_delay} 秒后重试...") + time.sleep(retry_delay) + else: + raise + + def fetch_latest_message(self): + try: + query = "SELECT hex_data FROM antsev.charge_jiuxing ORDER BY ts DESC LIMIT 1" + self.td_cursor.execute(query) + result = self.td_cursor.fetchone() + if result and result[0]: + hex_data = result[0].replace(" ", "") # 移除空格 + return binascii.unhexlify(hex_data) + else: + logging.warning("数据库中未找到报文。") + return None + except Exception as e: + logging.error(f"从 TDengine 获取报文时出错:{str(e)}") + return None + + def parse_message(self, data): + """根据协议解析充电桩报文""" + if not data or len(data) < 1: + logging.error("报文数据无效。") + return None + + # 检查报文类型(第一个字节) + msg_type = data[0] + if msg_type != 0x23: # 23H 最新充电订单 + # 仅在第一次遇到该类型时记录跳过日志 + if msg_type not in self.skipped_msg_types: + logging.info(f"报文类型 {hex(msg_type)} 不是充电订单(23H),跳过。") + self.skipped_msg_types.add(msg_type) + return None + + + if len(data) < 159: + logging.error(f"报文长度 {len(data)} 太短,无法解析充电订单。") + return None + + try: + + year = 2000 + data[0] + month = data[1] + day = data[2] + hour = data[3] + minute = data[4] + second = data[5] + order_time = datetime(year, month, day, hour, minute, second).strftime("%Y-%m-%d %H:%M:%S") + + # 解析枪号 + gun_number = data[6] + + # 解析记录索引号 + record_index = struct.unpack(">I", data[7:11])[0] + + # 解析充电订单号 + order_number = data[11:43].decode('ascii', errors='ignore').rstrip('\x00') + + # 解析用户 ID + user_id = data[43:75].decode('ascii', errors='ignore').rstrip('\x00') + + # 解析用户类型 + user_type = struct.unpack(">H", data[75:77])[0] + + # 解析组织代码 + org_code = data[77:86].decode('ascii', errors='ignore').rstrip('\x00') + + # 解析充电卡余额 + card_balance = struct.unpack(">I", data[86:90])[0] / 100.0 # 单位:0.01 元 + + # 解析 VIN + vin = data[90:107].decode('ascii', errors='ignore').rstrip('\x00') + + # 解析开始充电时间 + start_year = 2000 + data[107] + start_month = data[108] + start_day = data[109] + start_hour = data[110] + start_minute = data[111] + start_second = data[112] + start_time = datetime(start_year, start_month, start_day, start_hour, start_minute, start_second).strftime( + "%Y-%m-%d %H:%M:%S") + + # 解析结束充电时间 + end_year = 2000 + data[113] + end_month = data[114] + end_day = data[115] + end_hour = data[116] + end_minute = data[117] + end_second = data[118] + end_time = datetime(end_year, end_month, end_day, end_hour, end_minute, end_second).strftime( + "%Y-%m-%d %H:%M:%S") + + # 解析开始和结束电量 + start_energy = struct.unpack(">I", data[119:123])[0] / 100.0 # 单位:0.01 kWh + end_energy = struct.unpack(">I", data[123:127])[0] / 100.0 + + # 解析费用 + electricity_fee = struct.unpack(">I", data[147:151])[0] / 100.0 # 单位:0.01 元 + service_fee = struct.unpack(">I", data[151:155])[0] / 100.0 # 单位:0.01 元 + parking_fee = struct.unpack(">I", data[155:159])[0] / 10000.0 # 单位:0.0001 元 + + # 解析时间段分段 + num_segments = data[159] + segments = [] + segment_start = 160 + for i in range(num_segments): + if segment_start + 3 > len(data): + break + rate_model_index = data[segment_start] + energy = struct.unpack(">H", data[segment_start + 1:segment_start + 3])[0] / 100.0 # 单位:0.01 kWh + segments.append({"rate_model_index": rate_model_index, "energy": energy}) + segment_start += 3 + + # 解析电池序列号 + battery_serial = None + battery_start = segment_start + if battery_start + 17 <= len(data): + battery_serial = data[battery_start:battery_start + 17].decode('ascii', errors='ignore').rstrip('\x00') + + # 构建数据 + parsed_data = { + "order_time": order_time, + "gun_number": gun_number, + "record_index": record_index, + "order_number": order_number, + "user_id": user_id, + "user_type": user_type, + "org_code": org_code, + "card_balance": card_balance, + "vin": vin, + "start_time": start_time, + "end_time": end_time, + "start_energy": start_energy, + "end_energy": end_energy, + "electricity_fee": electricity_fee, + "service_fee": service_fee, + "parking_fee": parking_fee, + "time_segments": segments, + "battery_serial": battery_serial + } + logging.info(f"解析后的充电订单报文:{parsed_data}") + return parsed_data + + except Exception as e: + logging.error(f"解析报文时出错:{str(e)}") + return None + + def query_card_account_by_vin(self, vin): + #vin接口预留 + logging.info(f"查询 VIN {vin} 的卡账户") + return 100.0 + + def query_rate_model_by_gun_number(self, gun_number): + #费率接口预留 + logging.info(f"查询枪号 {gun_number} 的费率模型") + return {"rate": 0.5} + + def perform_deduction(self, user_id, amount): + #余额接口预留 + logging.info(f"对用户 {user_id} 执行扣款,金额:{amount}") + return True + + def process_message(self, parsed_data): + if not parsed_data: + return + + # 计算费用 + total_fee = parsed_data["electricity_fee"] + parsed_data["service_fee"] + parsed_data["parking_fee"] + logging.info(f"计算总费用:{total_fee} 元") + + + vin = parsed_data["vin"] + account_balance = self.query_card_account_by_vin(vin) + + + gun_number = parsed_data["gun_number"] + rate_model = self.query_rate_model_by_gun_number(gun_number) + + + user_id_for_deduction = parsed_data["user_id"] if parsed_data["user_type"] == 1 else vin + # 执行扣款 + deduction_success = self.perform_deduction(user_id_for_deduction, total_fee) + + if not deduction_success: + logging.error("扣款失败。") + return + + + mqtt_message = { + "order_details": parsed_data, + "account_balance_before": account_balance, + "rate_model": rate_model, + "deduction_amount": total_fee, + "deduction_success": deduction_success + } + + + try: + self.mqtt_client.publish(MQTT_TOPIC, json.dumps(mqtt_message), qos=1) + logging.info(f"已发布到 MQTT:{mqtt_message}") + except Exception as e: + logging.error(f"发布到 MQTT 失败:{str(e)}") + + def run(self): + try: + while True: + message_data = self.fetch_latest_message() + if message_data: + parsed_data = self.parse_message(message_data) + if parsed_data: + self.process_message(parsed_data) + + + time.sleep(5) + + except KeyboardInterrupt: + logging.info("程序被用户中断。") + except Exception as e: + logging.error(f"主循环中发生意外错误:{str(e)}") + finally: + self.cleanup() + + def cleanup(self): + try: + if self.td_cursor: + self.td_cursor.close() + if self.td_conn: + self.td_conn.close() + if self.mqtt_client: + self.mqtt_client.loop_stop() + self.mqtt_client.disconnect() + logging.info("资源已清理:TDengine 和 MQTT 连接已关闭。") + except Exception as e: + logging.error(f"关闭连接时出错:{str(e)}") + + +def main(): + processor = ChargingPileProcessor() + processor.run() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/charging_pile_proxy/hejin_forward/charge_parsed_data.py b/charging_pile_proxy/hejin_forward/charge_parsed_data.py new file mode 100644 index 0000000..1b4a861 --- /dev/null +++ b/charging_pile_proxy/hejin_forward/charge_parsed_data.py @@ -0,0 +1,201 @@ +import paho.mqtt.client as mqtt +import psycopg2 +import json +import logging +import sys +import time +from datetime import datetime + + +logging.basicConfig( + filename='mqtt_to_postgres.log', + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + encoding='utf-8' +) + + +MQTT_BROKER = "123.6.102.119" +MQTT_PORT = 1883 +MQTT_KEEPALIVE = 60 +MQTT_USERNAME = "emqx_test" +MQTT_PASSWORD = "emqx_test" +MQTT_TOPIC = "hejin/order_processed" + + +PG_CONFIG = { + 'host': '123.6.102.119', + 'port': 5432, + 'database': 'tms-design', + 'user': 'postgres', + 'password': '687315e66ae24eeab8bb5c0441a40d79' +} + +class MqttToPostgres: + def __init__(self): + self.mqtt_client = None + self.pg_conn = None + self.pg_cursor = None + self.setup_logging() + self.connect_mqtt() + self.connect_postgres() + + def setup_logging(self): + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(logging.INFO) + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + console_handler.setFormatter(formatter) + logging.getLogger().addHandler(console_handler) + + def connect_mqtt(self): + try: + self.mqtt_client = mqtt.Client(client_id="MqttToPostgres", protocol=mqtt.MQTTv5) + self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) + self.mqtt_client.on_connect = self.on_mqtt_connect + self.mqtt_client.on_disconnect = self.on_mqtt_disconnect + self.mqtt_client.on_message = self.on_mqtt_message + self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, MQTT_KEEPALIVE) + self.mqtt_client.loop_start() + logging.info("MQTT 客户端已初始化。") + except Exception as e: + logging.error(f"无法连接到 MQTT 服务器:{str(e)}") + raise + + def on_mqtt_connect(self, client, userdata, flags, rc, properties=None): + if rc == 0: + logging.info("成功连接到 MQTT 服务器。") + # 订阅主题 + self.mqtt_client.subscribe(MQTT_TOPIC, qos=1) + logging.info(f"已订阅主题:{MQTT_TOPIC}") + else: + logging.error(f"连接 MQTT 服务器失败,错误码:{rc}") + + def on_mqtt_disconnect(self, client, userdata, rc, properties=None): + logging.warning("与 MQTT 服务器断开连接。") + + def on_mqtt_message(self, client, userdata, msg): + try: + payload = msg.payload.decode('utf-8') + message = json.loads(payload) + logging.info(f"收到 MQTT 消息:{message}") + + parsed_data = message.get("order_details") + if not parsed_data: + logging.warning("消息中未找到 order_details,跳过。") + return + + self.write_to_postgres(parsed_data) + + except Exception as e: + logging.error(f"处理 MQTT 消息时出错:{str(e)}") + + def connect_postgres(self): + max_retries = 3 + retry_delay = 10 + for attempt in range(max_retries): + try: + logging.info(f"尝试连接到 PostgreSQL (第 {attempt + 1} 次): {PG_CONFIG}") + self.pg_conn = psycopg2.connect( + host=PG_CONFIG['host'], + port=PG_CONFIG['port'], + database=PG_CONFIG['database'], + user=PG_CONFIG['user'], + password=PG_CONFIG['password'] + ) + self.pg_conn.autocommit = True + self.pg_cursor = self.pg_conn.cursor() + logging.info("成功连接到 PostgreSQL") + break + + except Exception as e: + logging.error(f"连接 PostgreSQL 错误 (第 {attempt + 1} 次): {str(e)}") + if attempt < max_retries - 1: + logging.info(f"将在 {retry_delay} 秒后重试...") + time.sleep(retry_delay) + else: + raise + + def write_to_postgres(self, parsed_data): + """将 parsed_data 写入 PostgreSQL 数据库""" + try: + # 检查记录是否已存在(基于 order_number 唯一性) + check_query = """ + SELECT 1 FROM charge_orders WHERE order_number = %s + """ + self.pg_cursor.execute(check_query, (parsed_data["order_number"],)) + exists = self.pg_cursor.fetchone() is not None + + if exists: + logging.info(f"订单 {parsed_data['order_number']} 已存在,跳过插入。") + return + + insert_query = """ + INSERT INTO charge_orders ( + order_time, gun_number, record_index, order_number, user_id, user_type, org_code, + card_balance, vin, start_time, end_time, start_energy, end_energy, electricity_fee, + service_fee, parking_fee, time_segments, battery_serial, created_at, updated_at + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + time_segments_json = json.dumps(parsed_data["time_segments"]) + # 当前时间作为 created_at 和 updated_at + current_time = datetime.now() + + values = ( + parsed_data["order_time"], + parsed_data["gun_number"], + parsed_data["record_index"], + parsed_data["order_number"], + parsed_data["user_id"], + parsed_data["user_type"], + parsed_data["org_code"], + parsed_data["card_balance"], + parsed_data["vin"], + parsed_data["start_time"], + parsed_data["end_time"], + parsed_data["start_energy"], + parsed_data["end_energy"], + parsed_data["electricity_fee"], + parsed_data["service_fee"], + parsed_data["parking_fee"], + time_segments_json, + parsed_data["battery_serial"], + current_time, + current_time + ) + + self.pg_cursor.execute(insert_query, values) + logging.info(f"成功插入订单 {parsed_data['order_number']} 到 PostgreSQL 数据库。") + + except Exception as e: + logging.error(f"写入 PostgreSQL 时出错:{str(e)}") + + def run(self): + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + logging.info("程序被用户中断。") + except Exception as e: + logging.error(f"主循环中发生意外错误:{str(e)}") + finally: + self.cleanup() + + def cleanup(self): + try: + if self.pg_cursor: + self.pg_cursor.close() + if self.pg_conn: + self.pg_conn.close() + if self.mqtt_client: + self.mqtt_client.loop_stop() + self.mqtt_client.disconnect() + logging.info("资源已清理:PostgreSQL 和 MQTT 连接已关闭。") + except Exception as e: + logging.error(f"关闭连接时出错:{str(e)}") + +def main(): + processor = MqttToPostgres() + processor.run() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/charging_pile_proxy/hejin_forward/charging_pile_processor.log b/charging_pile_proxy/hejin_forward/charging_pile_processor.log new file mode 100644 index 0000000..09880e2 --- /dev/null +++ b/charging_pile_proxy/hejin_forward/charging_pile_processor.log @@ -0,0 +1,25 @@ +2025-03-28 10:32:53,335 - INFO - MQTT 客户端已初始化。 +2025-03-28 10:32:53,363 - INFO - 成功连接到 MQTT 服务器。 +2025-03-28 10:32:54,892 - ERROR - 无法连接到 TDengine 数据库:[0x000b]: Unable to establish connection +2025-03-28 10:36:11,678 - INFO - MQTT 客户端已初始化。 +2025-03-28 10:36:11,679 - INFO - 尝试连接到 TDengine (第 1 次): {'host': '123.6.102.119', 'port': 6041, 'user': 'readonly_user', 'password': 'Aassword123', 'database': 'antsev'} +2025-03-28 10:36:11,696 - INFO - 成功连接到 MQTT 服务器。 +2025-03-28 10:36:43,002 - INFO - 成功连接到 TDengine +2025-03-28 10:36:43,046 - INFO - TDengine 服务器版本: 3.3.3.0 +2025-03-28 10:36:43,095 - INFO - 报文类型 0x4a 不是充电订单(23H),跳过。 +2025-03-28 10:36:48,145 - INFO - 报文类型 0x4a 不是充电订单(23H),跳过。 +2025-03-28 10:36:53,188 - INFO - 报文类型 0x4a 不是充电订单(23H),跳过。 +2025-03-28 10:36:58,235 - INFO - 报文类型 0x4a 不是充电订单(23H),跳过。 +2025-03-28 10:37:03,283 - INFO - 报文类型 0x4a 不是充电订单(23H),跳过。 +2025-03-28 10:37:03,705 - INFO - 程序被用户中断。 +2025-03-28 10:37:04,231 - WARNING - 与 MQTT 服务器断开连接。 +2025-03-28 10:37:04,231 - INFO - 资源已清理:TDengine 和 MQTT 连接已关闭。 +2025-03-28 10:41:04,135 - INFO - MQTT 客户端已初始化。 +2025-03-28 10:41:04,135 - INFO - 尝试连接到 TDengine (第 1 次): {'host': '123.6.102.119', 'port': 6041, 'user': 'readonly_user', 'password': 'Aassword123', 'database': 'antsev'} +2025-03-28 10:41:04,150 - INFO - 成功连接到 MQTT 服务器。 +2025-03-28 10:41:35,383 - INFO - 成功连接到 TDengine +2025-03-28 10:41:35,421 - INFO - TDengine 服务器版本: 3.3.3.0 +2025-03-28 10:41:35,461 - INFO - 报文类型 0x4a 不是充电订单(23H),跳过。 +2025-03-28 10:44:01,204 - INFO - 程序被用户中断。 +2025-03-28 10:44:01,741 - WARNING - 与 MQTT 服务器断开连接。 +2025-03-28 10:44:01,742 - INFO - 资源已清理:TDengine 和 MQTT 连接已关闭。