添加了余额计算

This commit is contained in:
MATRIX\29620 2025-03-28 16:03:16 +08:00
parent 2073d823a9
commit 04271e628a
4 changed files with 572 additions and 2 deletions

View File

@ -6,7 +6,7 @@ import logging
import time
import uuid
# 配置日志
logging.basicConfig(
filename='charge_order_detail.log',
level=logging.INFO,
@ -76,7 +76,7 @@ class ChargeOrderDetailMigrator:
self.pg_conn.autocommit = True
self.pg_cursor = self.pg_conn.cursor()
logging.info("成功连接到 PostgreSQL")
break # 连接成功,退出重试循环
break
except Exception as e:
logging.error(f"连接错误 (第 {attempt + 1} 次): {str(e)}")

View File

@ -0,0 +1,344 @@
import taosrest # 使用 taosrest 连接 TDengine
import paho.mqtt.client as mqtt
import binascii
import json
import logging
import time
from datetime import datetime
import struct
import sys
logging.basicConfig(
filename='charging_pile_processor.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
encoding='utf-8'
)
MQTT_BROKER = "123.6.102.119"
MQTT_PORT = 1883
MQTT_KEEPALIVE = 60
MQTT_USERNAME = "emqx_test"
MQTT_PASSWORD = "emqx_test"
MQTT_TOPIC = "hejin/order_processed"
TDENGINE_CONFIG = {
'host': '123.6.102.119',
'port': 6041,
'user': 'readonly_user',
'password': 'Aassword123',
'database': 'antsev'
}
class ChargingPileProcessor:
def __init__(self):
self.mqtt_client = None
self.td_conn = None
self.td_cursor = None
self.setup_logging()
self.skipped_msg_types = set()
self.connect_mqtt()
self.connect_tdengine()
def setup_logging(self):
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logging.getLogger().addHandler(console_handler)
def connect_mqtt(self):
try:
self.mqtt_client = mqtt.Client(client_id="ChargingPileProcessor", protocol=mqtt.MQTTv5)
self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
self.mqtt_client.on_connect = self.on_mqtt_connect
self.mqtt_client.on_disconnect = self.on_mqtt_disconnect
self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, MQTT_KEEPALIVE)
self.mqtt_client.loop_start()
logging.info("MQTT 客户端已初始化。")
except Exception as e:
logging.error(f"无法连接到 MQTT 服务器:{str(e)}")
raise
def on_mqtt_connect(self, client, userdata, flags, rc, properties=None):
if rc == 0:
logging.info("成功连接到 MQTT 服务器。")
else:
logging.error(f"连接 MQTT 服务器失败,错误码:{rc}")
def on_mqtt_disconnect(self, client, userdata, rc, properties=None):
logging.warning("与 MQTT 服务器断开连接。")
def connect_tdengine(self):
max_retries = 3
retry_delay = 10
for attempt in range(max_retries):
try:
logging.info(f"尝试连接到 TDengine (第 {attempt + 1} 次): {TDENGINE_CONFIG}")
rest_url = f"http://{TDENGINE_CONFIG['host']}:{TDENGINE_CONFIG['port']}"
self.td_conn = taosrest.connect(
url=rest_url,
user=TDENGINE_CONFIG['user'],
password=TDENGINE_CONFIG['password'],
database=TDENGINE_CONFIG['database']
)
self.td_cursor = self.td_conn.cursor()
logging.info("成功连接到 TDengine")
self.td_cursor.execute("SELECT SERVER_VERSION()")
version = self.td_cursor.fetchone()
logging.info(f"TDengine 服务器版本: {version[0]}")
break
except Exception as e:
logging.error(f"连接 TDengine 错误 (第 {attempt + 1} 次): {str(e)}")
if attempt < max_retries - 1:
logging.info(f"将在 {retry_delay} 秒后重试...")
time.sleep(retry_delay)
else:
raise
def fetch_latest_message(self):
try:
query = "SELECT hex_data FROM antsev.charge_jiuxing ORDER BY ts DESC LIMIT 1"
self.td_cursor.execute(query)
result = self.td_cursor.fetchone()
if result and result[0]:
hex_data = result[0].replace(" ", "") # 移除空格
return binascii.unhexlify(hex_data)
else:
logging.warning("数据库中未找到报文。")
return None
except Exception as e:
logging.error(f"从 TDengine 获取报文时出错:{str(e)}")
return None
def parse_message(self, data):
"""根据协议解析充电桩报文"""
if not data or len(data) < 1:
logging.error("报文数据无效。")
return None
# 检查报文类型(第一个字节)
msg_type = data[0]
if msg_type != 0x23: # 23H 最新充电订单
# 仅在第一次遇到该类型时记录跳过日志
if msg_type not in self.skipped_msg_types:
logging.info(f"报文类型 {hex(msg_type)} 不是充电订单23H跳过。")
self.skipped_msg_types.add(msg_type)
return None
if len(data) < 159:
logging.error(f"报文长度 {len(data)} 太短,无法解析充电订单。")
return None
try:
year = 2000 + data[0]
month = data[1]
day = data[2]
hour = data[3]
minute = data[4]
second = data[5]
order_time = datetime(year, month, day, hour, minute, second).strftime("%Y-%m-%d %H:%M:%S")
# 解析枪号
gun_number = data[6]
# 解析记录索引号
record_index = struct.unpack(">I", data[7:11])[0]
# 解析充电订单号
order_number = data[11:43].decode('ascii', errors='ignore').rstrip('\x00')
# 解析用户 ID
user_id = data[43:75].decode('ascii', errors='ignore').rstrip('\x00')
# 解析用户类型
user_type = struct.unpack(">H", data[75:77])[0]
# 解析组织代码
org_code = data[77:86].decode('ascii', errors='ignore').rstrip('\x00')
# 解析充电卡余额
card_balance = struct.unpack(">I", data[86:90])[0] / 100.0 # 单位0.01 元
# 解析 VIN
vin = data[90:107].decode('ascii', errors='ignore').rstrip('\x00')
# 解析开始充电时间
start_year = 2000 + data[107]
start_month = data[108]
start_day = data[109]
start_hour = data[110]
start_minute = data[111]
start_second = data[112]
start_time = datetime(start_year, start_month, start_day, start_hour, start_minute, start_second).strftime(
"%Y-%m-%d %H:%M:%S")
# 解析结束充电时间
end_year = 2000 + data[113]
end_month = data[114]
end_day = data[115]
end_hour = data[116]
end_minute = data[117]
end_second = data[118]
end_time = datetime(end_year, end_month, end_day, end_hour, end_minute, end_second).strftime(
"%Y-%m-%d %H:%M:%S")
# 解析开始和结束电量
start_energy = struct.unpack(">I", data[119:123])[0] / 100.0 # 单位0.01 kWh
end_energy = struct.unpack(">I", data[123:127])[0] / 100.0
# 解析费用
electricity_fee = struct.unpack(">I", data[147:151])[0] / 100.0 # 单位0.01 元
service_fee = struct.unpack(">I", data[151:155])[0] / 100.0 # 单位0.01 元
parking_fee = struct.unpack(">I", data[155:159])[0] / 10000.0 # 单位0.0001 元
# 解析时间段分段
num_segments = data[159]
segments = []
segment_start = 160
for i in range(num_segments):
if segment_start + 3 > len(data):
break
rate_model_index = data[segment_start]
energy = struct.unpack(">H", data[segment_start + 1:segment_start + 3])[0] / 100.0 # 单位0.01 kWh
segments.append({"rate_model_index": rate_model_index, "energy": energy})
segment_start += 3
# 解析电池序列号
battery_serial = None
battery_start = segment_start
if battery_start + 17 <= len(data):
battery_serial = data[battery_start:battery_start + 17].decode('ascii', errors='ignore').rstrip('\x00')
# 构建数据
parsed_data = {
"order_time": order_time,
"gun_number": gun_number,
"record_index": record_index,
"order_number": order_number,
"user_id": user_id,
"user_type": user_type,
"org_code": org_code,
"card_balance": card_balance,
"vin": vin,
"start_time": start_time,
"end_time": end_time,
"start_energy": start_energy,
"end_energy": end_energy,
"electricity_fee": electricity_fee,
"service_fee": service_fee,
"parking_fee": parking_fee,
"time_segments": segments,
"battery_serial": battery_serial
}
logging.info(f"解析后的充电订单报文:{parsed_data}")
return parsed_data
except Exception as e:
logging.error(f"解析报文时出错:{str(e)}")
return None
def query_card_account_by_vin(self, vin):
#vin接口预留
logging.info(f"查询 VIN {vin} 的卡账户")
return 100.0
def query_rate_model_by_gun_number(self, gun_number):
#费率接口预留
logging.info(f"查询枪号 {gun_number} 的费率模型")
return {"rate": 0.5}
def perform_deduction(self, user_id, amount):
#余额接口预留
logging.info(f"对用户 {user_id} 执行扣款,金额:{amount}")
return True
def process_message(self, parsed_data):
if not parsed_data:
return
# 计算费用
total_fee = parsed_data["electricity_fee"] + parsed_data["service_fee"] + parsed_data["parking_fee"]
logging.info(f"计算总费用:{total_fee}")
vin = parsed_data["vin"]
account_balance = self.query_card_account_by_vin(vin)
gun_number = parsed_data["gun_number"]
rate_model = self.query_rate_model_by_gun_number(gun_number)
user_id_for_deduction = parsed_data["user_id"] if parsed_data["user_type"] == 1 else vin
# 执行扣款
deduction_success = self.perform_deduction(user_id_for_deduction, total_fee)
if not deduction_success:
logging.error("扣款失败。")
return
mqtt_message = {
"order_details": parsed_data,
"account_balance_before": account_balance,
"rate_model": rate_model,
"deduction_amount": total_fee,
"deduction_success": deduction_success
}
try:
self.mqtt_client.publish(MQTT_TOPIC, json.dumps(mqtt_message), qos=1)
logging.info(f"已发布到 MQTT{mqtt_message}")
except Exception as e:
logging.error(f"发布到 MQTT 失败:{str(e)}")
def run(self):
try:
while True:
message_data = self.fetch_latest_message()
if message_data:
parsed_data = self.parse_message(message_data)
if parsed_data:
self.process_message(parsed_data)
time.sleep(5)
except KeyboardInterrupt:
logging.info("程序被用户中断。")
except Exception as e:
logging.error(f"主循环中发生意外错误:{str(e)}")
finally:
self.cleanup()
def cleanup(self):
try:
if self.td_cursor:
self.td_cursor.close()
if self.td_conn:
self.td_conn.close()
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
logging.info("资源已清理TDengine 和 MQTT 连接已关闭。")
except Exception as e:
logging.error(f"关闭连接时出错:{str(e)}")
def main():
processor = ChargingPileProcessor()
processor.run()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,201 @@
import paho.mqtt.client as mqtt
import psycopg2
import json
import logging
import sys
import time
from datetime import datetime
logging.basicConfig(
filename='mqtt_to_postgres.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
encoding='utf-8'
)
MQTT_BROKER = "123.6.102.119"
MQTT_PORT = 1883
MQTT_KEEPALIVE = 60
MQTT_USERNAME = "emqx_test"
MQTT_PASSWORD = "emqx_test"
MQTT_TOPIC = "hejin/order_processed"
PG_CONFIG = {
'host': '123.6.102.119',
'port': 5432,
'database': 'tms-design',
'user': 'postgres',
'password': '687315e66ae24eeab8bb5c0441a40d79'
}
class MqttToPostgres:
def __init__(self):
self.mqtt_client = None
self.pg_conn = None
self.pg_cursor = None
self.setup_logging()
self.connect_mqtt()
self.connect_postgres()
def setup_logging(self):
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logging.getLogger().addHandler(console_handler)
def connect_mqtt(self):
try:
self.mqtt_client = mqtt.Client(client_id="MqttToPostgres", protocol=mqtt.MQTTv5)
self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
self.mqtt_client.on_connect = self.on_mqtt_connect
self.mqtt_client.on_disconnect = self.on_mqtt_disconnect
self.mqtt_client.on_message = self.on_mqtt_message
self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, MQTT_KEEPALIVE)
self.mqtt_client.loop_start()
logging.info("MQTT 客户端已初始化。")
except Exception as e:
logging.error(f"无法连接到 MQTT 服务器:{str(e)}")
raise
def on_mqtt_connect(self, client, userdata, flags, rc, properties=None):
if rc == 0:
logging.info("成功连接到 MQTT 服务器。")
# 订阅主题
self.mqtt_client.subscribe(MQTT_TOPIC, qos=1)
logging.info(f"已订阅主题:{MQTT_TOPIC}")
else:
logging.error(f"连接 MQTT 服务器失败,错误码:{rc}")
def on_mqtt_disconnect(self, client, userdata, rc, properties=None):
logging.warning("与 MQTT 服务器断开连接。")
def on_mqtt_message(self, client, userdata, msg):
try:
payload = msg.payload.decode('utf-8')
message = json.loads(payload)
logging.info(f"收到 MQTT 消息:{message}")
parsed_data = message.get("order_details")
if not parsed_data:
logging.warning("消息中未找到 order_details跳过。")
return
self.write_to_postgres(parsed_data)
except Exception as e:
logging.error(f"处理 MQTT 消息时出错:{str(e)}")
def connect_postgres(self):
max_retries = 3
retry_delay = 10
for attempt in range(max_retries):
try:
logging.info(f"尝试连接到 PostgreSQL (第 {attempt + 1} 次): {PG_CONFIG}")
self.pg_conn = psycopg2.connect(
host=PG_CONFIG['host'],
port=PG_CONFIG['port'],
database=PG_CONFIG['database'],
user=PG_CONFIG['user'],
password=PG_CONFIG['password']
)
self.pg_conn.autocommit = True
self.pg_cursor = self.pg_conn.cursor()
logging.info("成功连接到 PostgreSQL")
break
except Exception as e:
logging.error(f"连接 PostgreSQL 错误 (第 {attempt + 1} 次): {str(e)}")
if attempt < max_retries - 1:
logging.info(f"将在 {retry_delay} 秒后重试...")
time.sleep(retry_delay)
else:
raise
def write_to_postgres(self, parsed_data):
"""将 parsed_data 写入 PostgreSQL 数据库"""
try:
# 检查记录是否已存在(基于 order_number 唯一性)
check_query = """
SELECT 1 FROM charge_orders WHERE order_number = %s
"""
self.pg_cursor.execute(check_query, (parsed_data["order_number"],))
exists = self.pg_cursor.fetchone() is not None
if exists:
logging.info(f"订单 {parsed_data['order_number']} 已存在,跳过插入。")
return
insert_query = """
INSERT INTO charge_orders (
order_time, gun_number, record_index, order_number, user_id, user_type, org_code,
card_balance, vin, start_time, end_time, start_energy, end_energy, electricity_fee,
service_fee, parking_fee, time_segments, battery_serial, created_at, updated_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
time_segments_json = json.dumps(parsed_data["time_segments"])
# 当前时间作为 created_at 和 updated_at
current_time = datetime.now()
values = (
parsed_data["order_time"],
parsed_data["gun_number"],
parsed_data["record_index"],
parsed_data["order_number"],
parsed_data["user_id"],
parsed_data["user_type"],
parsed_data["org_code"],
parsed_data["card_balance"],
parsed_data["vin"],
parsed_data["start_time"],
parsed_data["end_time"],
parsed_data["start_energy"],
parsed_data["end_energy"],
parsed_data["electricity_fee"],
parsed_data["service_fee"],
parsed_data["parking_fee"],
time_segments_json,
parsed_data["battery_serial"],
current_time,
current_time
)
self.pg_cursor.execute(insert_query, values)
logging.info(f"成功插入订单 {parsed_data['order_number']} 到 PostgreSQL 数据库。")
except Exception as e:
logging.error(f"写入 PostgreSQL 时出错:{str(e)}")
def run(self):
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info("程序被用户中断。")
except Exception as e:
logging.error(f"主循环中发生意外错误:{str(e)}")
finally:
self.cleanup()
def cleanup(self):
try:
if self.pg_cursor:
self.pg_cursor.close()
if self.pg_conn:
self.pg_conn.close()
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
logging.info("资源已清理PostgreSQL 和 MQTT 连接已关闭。")
except Exception as e:
logging.error(f"关闭连接时出错:{str(e)}")
def main():
processor = MqttToPostgres()
processor.run()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,25 @@
2025-03-28 10:32:53,335 - INFO - MQTT 客户端已初始化。
2025-03-28 10:32:53,363 - INFO - 成功连接到 MQTT 服务器。
2025-03-28 10:32:54,892 - ERROR - 无法连接到 TDengine 数据库:[0x000b]: Unable to establish connection
2025-03-28 10:36:11,678 - INFO - MQTT 客户端已初始化。
2025-03-28 10:36:11,679 - INFO - 尝试连接到 TDengine (第 1 次): {'host': '123.6.102.119', 'port': 6041, 'user': 'readonly_user', 'password': 'Aassword123', 'database': 'antsev'}
2025-03-28 10:36:11,696 - INFO - 成功连接到 MQTT 服务器。
2025-03-28 10:36:43,002 - INFO - 成功连接到 TDengine
2025-03-28 10:36:43,046 - INFO - TDengine 服务器版本: 3.3.3.0
2025-03-28 10:36:43,095 - INFO - 报文类型 0x4a 不是充电订单23H跳过。
2025-03-28 10:36:48,145 - INFO - 报文类型 0x4a 不是充电订单23H跳过。
2025-03-28 10:36:53,188 - INFO - 报文类型 0x4a 不是充电订单23H跳过。
2025-03-28 10:36:58,235 - INFO - 报文类型 0x4a 不是充电订单23H跳过。
2025-03-28 10:37:03,283 - INFO - 报文类型 0x4a 不是充电订单23H跳过。
2025-03-28 10:37:03,705 - INFO - 程序被用户中断。
2025-03-28 10:37:04,231 - WARNING - 与 MQTT 服务器断开连接。
2025-03-28 10:37:04,231 - INFO - 资源已清理TDengine 和 MQTT 连接已关闭。
2025-03-28 10:41:04,135 - INFO - MQTT 客户端已初始化。
2025-03-28 10:41:04,135 - INFO - 尝试连接到 TDengine (第 1 次): {'host': '123.6.102.119', 'port': 6041, 'user': 'readonly_user', 'password': 'Aassword123', 'database': 'antsev'}
2025-03-28 10:41:04,150 - INFO - 成功连接到 MQTT 服务器。
2025-03-28 10:41:35,383 - INFO - 成功连接到 TDengine
2025-03-28 10:41:35,421 - INFO - TDengine 服务器版本: 3.3.3.0
2025-03-28 10:41:35,461 - INFO - 报文类型 0x4a 不是充电订单23H跳过。
2025-03-28 10:44:01,204 - INFO - 程序被用户中断。
2025-03-28 10:44:01,741 - WARNING - 与 MQTT 服务器断开连接。
2025-03-28 10:44:01,742 - INFO - 资源已清理TDengine 和 MQTT 连接已关闭。