import taosrest import psycopg2 import json from datetime import datetime import binascii import logging import uuid import time # 配置日志 logging.basicConfig( filename='pile.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', encoding='utf-8' ) class PileMigrator: def __init__(self): # TDengine连接参数 self.tdengine_config = { 'host': '123.6.102.119', 'port': 6041, 'user': 'readonly_user', 'password': 'Aassword123', 'database': 'antsev' } # PostgreSQL连接参数 self.pg_config = { 'host': '123.6.102.119', 'port': 5432, 'database': 'tms-design', 'user': 'postgres', 'password': '687315e66ae24eeab8bb5c0441a40d79' } self.td_conn = None self.td_cursor = None self.pg_conn = None self.pg_cursor = None self.last_processed_ts = None self.processed_pile_ids = set() self.processed_uuids = set() def connect(self): """建立与两个数据库的连接""" max_retries = 3 retry_delay = 10 # 秒 for attempt in range(max_retries): try: # 连接到TDengine logging.info(f"尝试连接到TDengine (第 {attempt + 1} 次): {self.tdengine_config}") rest_url = f"http://{self.tdengine_config['host']}:{self.tdengine_config['port']}" self.td_conn = taosrest.connect( url=rest_url, user=self.tdengine_config['user'], password=self.tdengine_config['password'], database=self.tdengine_config['database'] ) self.td_cursor = self.td_conn.cursor() logging.info("成功连接到TDengine") # 测试查询以验证连接 self.td_cursor.execute("SELECT SERVER_VERSION()") version = self.td_cursor.fetchone() logging.info(f"TDengine 服务器版本: {version[0]}") # 连接到PostgreSQL logging.info(f"尝试连接到PostgreSQL: {self.pg_config}") self.pg_conn = psycopg2.connect( host=self.pg_config['host'], port=self.pg_config['port'], database=self.pg_config['database'], user=self.pg_config['user'], password=self.pg_config['password'] ) self.pg_conn.autocommit = True self.pg_cursor = self.pg_conn.cursor() logging.info("成功连接到PostgreSQL") break # 连接成功,退出重试循环 except Exception as e: logging.error(f"连接错误 (第 {attempt + 1} 次): {str(e)}") if attempt < max_retries - 1: logging.info(f"将在 {retry_delay} 秒后重试...") time.sleep(retry_delay) else: raise def parse_hex_data(self, hex_data, timestamp, pile_id): """根据协议解析十六进制数据""" try: # 移除空格并将十六进制字符串转换为字节 hex_bytes = bytes.fromhex(hex_data.replace(" ", "")) # 验证帧起始(应该是"JX") if hex_bytes[0:2] != b'JX': return None # 提取命令 command = hex_bytes[2:3].hex().upper() # 默认费率清单 default_rate_record = [ { "discountAfterTotalPrice": 1.779, "discountElecPrice": 0, "discountServicePrice": 0, "elecPrice": 0.889, "endTime": "18:00:00", "phaseType": 4, "servicePrice": 0 } ] # 初始化数据字典 data = { 'command': command, 'uuid': str(uuid.uuid4()).replace('-', ''), 'classification_id': 'S5F', 'manufacturer_id': 'cf17ad0840f79ab4483e4c4c8bda875b', 'product_id': '111818', 'station_id': '200076', 'name': '直流充电桩A04', 'code': pile_id, 'dc_or_ac': '直流', 'gun_number': 2, 'software_version': '2.100', 'hardware_version': None, 'startup_mode': json.dumps([1]), 'startup_type': json.dumps([22]), 'created_at': timestamp, 'created_by': None, 'updated_at': timestamp, 'updated_by': None, 'charge_person': None, 'label': None, 'power': 240.0, 'address': None, 'longitude': 109.997164, 'dimension': 38.634963, 'service_time': None, 'device_type': 1, 'operator_id': 'K1TUBMOLH', 'remark': None, 'production_date': None, 'entity_id': 'MACKSFTXX', 'org_code': 'MACKSFTXX', 'merchant_id': '18633849140684009482', 'rate_record_list': json.dumps(default_rate_record) } # 21H - 充电桩软件版本 if command == '21': # 软件版本(字节127-142) software_version = hex_bytes[127:143].decode('ascii', errors='ignore').strip() # 硬件版本(字节143-158) hardware_version = hex_bytes[143:159].decode('ascii', errors='ignore').strip() data.update({ 'software_version': software_version if software_version else '2.100', 'hardware_version': hardware_version if hardware_version else None }) # 22H - 充电桩信息(假设包含功率和枪数) elif command == '22': # 功率(假设字节127-130,分辨率1W) power = int.from_bytes(hex_bytes[127:131], byteorder='little') * 0.001 # 转换为kW # 枪数(假设字节131) gun_number = int(hex_bytes[131]) if len(hex_bytes) > 131 else 2 # 判断直流/交流 dc_or_ac = '直流' if power > 60 else '交流' data.update({ 'power': power if power > 0 else 240.0, 'gun_number': gun_number, 'dc_or_ac': dc_or_ac }) return data except Exception as e: logging.error(f"解析十六进制数据时出错: {str(e)}") return None def migrate_data(self): """将新数据从TDengine迁移到PostgreSQL的pile表""" while True: try: # 如果last_processed_ts为空,初始化为当前时间 if self.last_processed_ts is None: try: # 避免使用 MAX(ts),改用 ORDER BY ts DESC LIMIT 1 获取最新时间戳 self.td_cursor.execute("SELECT ts FROM antsev.charge_jiuxing ORDER BY ts DESC LIMIT 1") result = self.td_cursor.fetchone() self.last_processed_ts = result[0] if result and result[0] else datetime.now() except Exception as e: logging.error(f"获取最新时间戳失败: {str(e)},使用当前时间作为默认值") self.last_processed_ts = datetime.now() logging.info(f"初始化last_processed_ts: {self.last_processed_ts}") # 查询新数据 query = f"SELECT * FROM antsev.charge_jiuxing WHERE ts > '{self.last_processed_ts}' ORDER BY ts" self.td_cursor.execute(query) rows = self.td_cursor.fetchall() if not rows: logging.info("没有新数据,休眠10秒") time.sleep(10) continue for row in rows: try: # 从TDengine行中提取数据 timestamp = row[0] # 时间戳 pile_id = row[3] # 充电桩ID hex_data = row[12] # 十六进制数据 # 记录原始数据 logging.info(f"处理记录: ts={timestamp}, pile_id={pile_id}, hex_data={hex_data}") # 跳过已处理的充电桩 if pile_id in self.processed_pile_ids: logging.info(f"充电桩已处理,pile_id: {pile_id},跳过") continue # 解析十六进制数据 parsed_data = self.parse_hex_data(hex_data, timestamp, pile_id) if not parsed_data: logging.warning(f"无法解析 hex_data: {hex_data},跳过此记录") continue # 检查记录是否已存在 check_query = """ SELECT 1 FROM pile WHERE uuid = %s """ self.pg_cursor.execute(check_query, (parsed_data['uuid'],)) exists = self.pg_cursor.fetchone() is not None # 如果记录已存在,跳过 if exists or parsed_data['uuid'] in self.processed_uuids: logging.info(f"充电桩记录已存在,UUID: {parsed_data['uuid']},跳过") continue # 准备插入PostgreSQL的数据 insert_query = """ INSERT INTO public.pile ( uuid, classification_id, manufacturer_id, product_id, station_id, name, code, dc_or_ac, gun_number, software_version, hardware_version, startup_mode, startup_type, created_at, created_by, updated_at, updated_by, charge_person, label, power, address, longitude, dimension, service_time, device_type, operator_id, remark, production_date, entity_id, org_code, merchant_id, rate_record_list ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ values = ( parsed_data['uuid'], parsed_data['classification_id'], parsed_data['manufacturer_id'], parsed_data['product_id'], parsed_data['station_id'], parsed_data['name'], parsed_data['code'], parsed_data['dc_or_ac'], parsed_data['gun_number'], parsed_data['software_version'], parsed_data['hardware_version'], parsed_data['startup_mode'], parsed_data['startup_type'], parsed_data['created_at'], parsed_data['created_by'], parsed_data['updated_at'], parsed_data['updated_by'], parsed_data['charge_person'], parsed_data['label'], parsed_data['power'], parsed_data['address'], parsed_data['longitude'], parsed_data['dimension'], parsed_data['service_time'], parsed_data['device_type'], parsed_data['operator_id'], parsed_data['remark'], parsed_data['production_date'], parsed_data['entity_id'], parsed_data['org_code'], parsed_data['merchant_id'], parsed_data['rate_record_list'] ) self.pg_cursor.execute(insert_query, values) self.processed_pile_ids.add(pile_id) self.processed_uuids.add(parsed_data['uuid']) logging.info(f"插入充电桩,UUID: {parsed_data['uuid']}, 充电桩编号: {pile_id}") # 记录插入的完整数据 log_values = { 'uuid': parsed_data['uuid'], 'classification_id': parsed_data['classification_id'], 'manufacturer_id': parsed_data['manufacturer_id'], 'product_id': parsed_data['product_id'], 'station_id': parsed_data['station_id'], 'name': parsed_data['name'], 'code': parsed_data['code'], 'dc_or_ac': parsed_data['dc_or_ac'], 'gun_number': parsed_data['gun_number'], 'software_version': parsed_data['software_version'], 'hardware_version': parsed_data['hardware_version'], 'startup_mode': parsed_data['startup_mode'], 'startup_type': parsed_data['startup_type'], 'created_at': parsed_data['created_at'], 'created_by': parsed_data['created_by'], 'updated_at': parsed_data['updated_at'], 'updated_by': parsed_data['updated_by'], 'charge_person': parsed_data['charge_person'], 'label': parsed_data['label'], 'power': parsed_data['power'], 'address': parsed_data['address'], 'longitude': parsed_data['longitude'], 'dimension': parsed_data['dimension'], 'service_time': parsed_data['service_time'], 'device_type': parsed_data['device_type'], 'operator_id': parsed_data['operator_id'], 'remark': parsed_data['remark'], 'production_date': parsed_data['production_date'], 'entity_id': parsed_data['entity_id'], 'org_code': parsed_data['org_code'], 'merchant_id': parsed_data['merchant_id'], 'rate_record_list': parsed_data['rate_record_list'] } logging.info(f"插入到 pile 表的数据: {log_values}") # 更新last_processed_ts self.last_processed_ts = max(self.last_processed_ts, timestamp) except Exception as e: logging.error(f"处理时间戳为 {timestamp} 的记录时出错: {str(e)}") continue except Exception as e: logging.error(f"迁移过程中出错: {str(e)}") time.sleep(10) # 出错后休眠10秒后重试 def close(self): """关闭数据库连接""" try: if self.td_cursor: self.td_cursor.close() if self.td_conn: self.td_conn.close() if self.pg_cursor: self.pg_cursor.close() if self.pg_conn: self.pg_conn.close() logging.info("数据库连接已关闭") except Exception as e: logging.error(f"关闭连接时出错: {str(e)}") raise def run(self): """运行迁移的主方法""" try: self.connect() self.migrate_data() except Exception as e: logging.error(f"迁移失败: {str(e)}") raise finally: self.close() if __name__ == "__main__": migrator = PileMigrator() migrator.run()