376 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import taosrest
import psycopg2
import json
from datetime import datetime
import binascii
import logging
import uuid
import time
# 配置日志
logging.basicConfig(
filename='pile.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
encoding='utf-8'
)
class PileMigrator:
def __init__(self):
# TDengine连接参数
self.tdengine_config = {
'host': '123.6.102.119',
'port': 6041,
'user': 'readonly_user',
'password': 'Aassword123',
'database': 'antsev'
}
# PostgreSQL连接参数
self.pg_config = {
'host': '123.6.102.119',
'port': 5432,
'database': 'tms-design',
'user': 'postgres',
'password': '687315e66ae24eeab8bb5c0441a40d79'
}
self.td_conn = None
self.td_cursor = None
self.pg_conn = None
self.pg_cursor = None
self.last_processed_ts = None
self.processed_pile_ids = set()
self.processed_uuids = set()
def connect(self):
"""建立与两个数据库的连接"""
max_retries = 3
retry_delay = 10 # 秒
for attempt in range(max_retries):
try:
# 连接到TDengine
logging.info(f"尝试连接到TDengine (第 {attempt + 1} 次): {self.tdengine_config}")
rest_url = f"http://{self.tdengine_config['host']}:{self.tdengine_config['port']}"
self.td_conn = taosrest.connect(
url=rest_url,
user=self.tdengine_config['user'],
password=self.tdengine_config['password'],
database=self.tdengine_config['database']
)
self.td_cursor = self.td_conn.cursor()
logging.info("成功连接到TDengine")
# 测试查询以验证连接
self.td_cursor.execute("SELECT SERVER_VERSION()")
version = self.td_cursor.fetchone()
logging.info(f"TDengine 服务器版本: {version[0]}")
# 连接到PostgreSQL
logging.info(f"尝试连接到PostgreSQL: {self.pg_config}")
self.pg_conn = psycopg2.connect(
host=self.pg_config['host'],
port=self.pg_config['port'],
database=self.pg_config['database'],
user=self.pg_config['user'],
password=self.pg_config['password']
)
self.pg_conn.autocommit = True
self.pg_cursor = self.pg_conn.cursor()
logging.info("成功连接到PostgreSQL")
break # 连接成功,退出重试循环
except Exception as e:
logging.error(f"连接错误 (第 {attempt + 1} 次): {str(e)}")
if attempt < max_retries - 1:
logging.info(f"将在 {retry_delay} 秒后重试...")
time.sleep(retry_delay)
else:
raise
def parse_hex_data(self, hex_data, timestamp, pile_id):
"""根据协议解析十六进制数据"""
try:
# 移除空格并将十六进制字符串转换为字节
hex_bytes = bytes.fromhex(hex_data.replace(" ", ""))
# 验证帧起始(应该是"JX"
if hex_bytes[0:2] != b'JX':
return None
# 提取命令
command = hex_bytes[2:3].hex().upper()
# 默认费率清单
default_rate_record = [
{
"discountAfterTotalPrice": 1.779,
"discountElecPrice": 0,
"discountServicePrice": 0,
"elecPrice": 0.889,
"endTime": "18:00:00",
"phaseType": 4,
"servicePrice": 0
}
]
# 初始化数据字典
data = {
'command': command,
'uuid': str(uuid.uuid4()).replace('-', ''),
'classification_id': 'S5F',
'manufacturer_id': 'cf17ad0840f79ab4483e4c4c8bda875b',
'product_id': '111818',
'station_id': '200076',
'name': '直流充电桩A04',
'code': pile_id,
'dc_or_ac': '直流',
'gun_number': 2,
'software_version': '2.100',
'hardware_version': None,
'startup_mode': json.dumps([1]),
'startup_type': json.dumps([22]),
'created_at': timestamp,
'created_by': None,
'updated_at': timestamp,
'updated_by': None,
'charge_person': None,
'label': None,
'power': 240.0,
'address': None,
'longitude': 109.997164,
'dimension': 38.634963,
'service_time': None,
'device_type': 1,
'operator_id': 'K1TUBMOLH',
'remark': None,
'production_date': None,
'entity_id': 'MACKSFTXX',
'org_code': 'MACKSFTXX',
'merchant_id': '18633849140684009482',
'rate_record_list': json.dumps(default_rate_record)
}
# 21H - 充电桩软件版本
if command == '21':
# 软件版本字节127-142
software_version = hex_bytes[127:143].decode('ascii', errors='ignore').strip()
# 硬件版本字节143-158
hardware_version = hex_bytes[143:159].decode('ascii', errors='ignore').strip()
data.update({
'software_version': software_version if software_version else '2.100',
'hardware_version': hardware_version if hardware_version else None
})
# 22H - 充电桩信息(假设包含功率和枪数)
elif command == '22':
# 功率假设字节127-130分辨率1W
power = int.from_bytes(hex_bytes[127:131], byteorder='little') * 0.001 # 转换为kW
# 枪数假设字节131
gun_number = int(hex_bytes[131]) if len(hex_bytes) > 131 else 2
# 判断直流/交流
dc_or_ac = '直流' if power > 60 else '交流'
data.update({
'power': power if power > 0 else 240.0,
'gun_number': gun_number,
'dc_or_ac': dc_or_ac
})
return data
except Exception as e:
logging.error(f"解析十六进制数据时出错: {str(e)}")
return None
def migrate_data(self):
"""将新数据从TDengine迁移到PostgreSQL的pile表"""
while True:
try:
# 如果last_processed_ts为空初始化为当前时间
if self.last_processed_ts is None:
try:
# 避免使用 MAX(ts),改用 ORDER BY ts DESC LIMIT 1 获取最新时间戳
self.td_cursor.execute("SELECT ts FROM antsev.charge_jiuxing ORDER BY ts DESC LIMIT 1")
result = self.td_cursor.fetchone()
self.last_processed_ts = result[0] if result and result[0] else datetime.now()
except Exception as e:
logging.error(f"获取最新时间戳失败: {str(e)},使用当前时间作为默认值")
self.last_processed_ts = datetime.now()
logging.info(f"初始化last_processed_ts: {self.last_processed_ts}")
# 查询新数据
query = f"SELECT * FROM antsev.charge_jiuxing WHERE ts > '{self.last_processed_ts}' ORDER BY ts"
self.td_cursor.execute(query)
rows = self.td_cursor.fetchall()
if not rows:
logging.info("没有新数据休眠10秒")
time.sleep(10)
continue
for row in rows:
try:
# 从TDengine行中提取数据
timestamp = row[0] # 时间戳
pile_id = row[3] # 充电桩ID
hex_data = row[12] # 十六进制数据
# 记录原始数据
logging.info(f"处理记录: ts={timestamp}, pile_id={pile_id}, hex_data={hex_data}")
# 跳过已处理的充电桩
if pile_id in self.processed_pile_ids:
logging.info(f"充电桩已处理pile_id: {pile_id},跳过")
continue
# 解析十六进制数据
parsed_data = self.parse_hex_data(hex_data, timestamp, pile_id)
if not parsed_data:
logging.warning(f"无法解析 hex_data: {hex_data},跳过此记录")
continue
# 检查记录是否已存在
check_query = """
SELECT 1 FROM pile WHERE uuid = %s
"""
self.pg_cursor.execute(check_query, (parsed_data['uuid'],))
exists = self.pg_cursor.fetchone() is not None
# 如果记录已存在,跳过
if exists or parsed_data['uuid'] in self.processed_uuids:
logging.info(f"充电桩记录已存在UUID: {parsed_data['uuid']},跳过")
continue
# 准备插入PostgreSQL的数据
insert_query = """
INSERT INTO public.pile (
uuid, classification_id, manufacturer_id, product_id, station_id,
name, code, dc_or_ac, gun_number, software_version,
hardware_version, startup_mode, startup_type, created_at, created_by,
updated_at, updated_by, charge_person, label, power,
address, longitude, dimension, service_time, device_type,
operator_id, remark, production_date, entity_id, org_code,
merchant_id, rate_record_list
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
values = (
parsed_data['uuid'],
parsed_data['classification_id'],
parsed_data['manufacturer_id'],
parsed_data['product_id'],
parsed_data['station_id'],
parsed_data['name'],
parsed_data['code'],
parsed_data['dc_or_ac'],
parsed_data['gun_number'],
parsed_data['software_version'],
parsed_data['hardware_version'],
parsed_data['startup_mode'],
parsed_data['startup_type'],
parsed_data['created_at'],
parsed_data['created_by'],
parsed_data['updated_at'],
parsed_data['updated_by'],
parsed_data['charge_person'],
parsed_data['label'],
parsed_data['power'],
parsed_data['address'],
parsed_data['longitude'],
parsed_data['dimension'],
parsed_data['service_time'],
parsed_data['device_type'],
parsed_data['operator_id'],
parsed_data['remark'],
parsed_data['production_date'],
parsed_data['entity_id'],
parsed_data['org_code'],
parsed_data['merchant_id'],
parsed_data['rate_record_list']
)
self.pg_cursor.execute(insert_query, values)
self.processed_pile_ids.add(pile_id)
self.processed_uuids.add(parsed_data['uuid'])
logging.info(f"插入充电桩UUID: {parsed_data['uuid']}, 充电桩编号: {pile_id}")
# 记录插入的完整数据
log_values = {
'uuid': parsed_data['uuid'],
'classification_id': parsed_data['classification_id'],
'manufacturer_id': parsed_data['manufacturer_id'],
'product_id': parsed_data['product_id'],
'station_id': parsed_data['station_id'],
'name': parsed_data['name'],
'code': parsed_data['code'],
'dc_or_ac': parsed_data['dc_or_ac'],
'gun_number': parsed_data['gun_number'],
'software_version': parsed_data['software_version'],
'hardware_version': parsed_data['hardware_version'],
'startup_mode': parsed_data['startup_mode'],
'startup_type': parsed_data['startup_type'],
'created_at': parsed_data['created_at'],
'created_by': parsed_data['created_by'],
'updated_at': parsed_data['updated_at'],
'updated_by': parsed_data['updated_by'],
'charge_person': parsed_data['charge_person'],
'label': parsed_data['label'],
'power': parsed_data['power'],
'address': parsed_data['address'],
'longitude': parsed_data['longitude'],
'dimension': parsed_data['dimension'],
'service_time': parsed_data['service_time'],
'device_type': parsed_data['device_type'],
'operator_id': parsed_data['operator_id'],
'remark': parsed_data['remark'],
'production_date': parsed_data['production_date'],
'entity_id': parsed_data['entity_id'],
'org_code': parsed_data['org_code'],
'merchant_id': parsed_data['merchant_id'],
'rate_record_list': parsed_data['rate_record_list']
}
logging.info(f"插入到 pile 表的数据: {log_values}")
# 更新last_processed_ts
self.last_processed_ts = max(self.last_processed_ts, timestamp)
except Exception as e:
logging.error(f"处理时间戳为 {timestamp} 的记录时出错: {str(e)}")
continue
except Exception as e:
logging.error(f"迁移过程中出错: {str(e)}")
time.sleep(10) # 出错后休眠10秒后重试
def close(self):
"""关闭数据库连接"""
try:
if self.td_cursor:
self.td_cursor.close()
if self.td_conn:
self.td_conn.close()
if self.pg_cursor:
self.pg_cursor.close()
if self.pg_conn:
self.pg_conn.close()
logging.info("数据库连接已关闭")
except Exception as e:
logging.error(f"关闭连接时出错: {str(e)}")
raise
def run(self):
"""运行迁移的主方法"""
try:
self.connect()
self.migrate_data()
except Exception as e:
logging.error(f"迁移失败: {str(e)}")
raise
finally:
self.close()
if __name__ == "__main__":
migrator = PileMigrator()
migrator.run()