69 lines
2.2 KiB
Python
69 lines
2.2 KiB
Python
import struct
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
class Command07:
|
|
def __init__(self):
|
|
self.command = 0x07
|
|
|
|
def process_07h(self, data, sock):
|
|
try:
|
|
if len(data) < 14 or data[0:2] != b'JX' or data[2] != self.command:
|
|
logging.error("07H帧格式错误")
|
|
return False
|
|
|
|
pile_id = data[3:11]
|
|
response = self.build_07h_response(pile_id)
|
|
if response and sock.send(response):
|
|
logging.info(f"发送07H响应: {response.hex().upper()}")
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logging.error(f"处理07H失败: {str(e)}")
|
|
return False
|
|
|
|
def build_07h_response(self, pile_id):
|
|
try:
|
|
frame = bytearray()
|
|
frame.extend(b'JX')
|
|
frame.append(self.command)
|
|
frame.extend(pile_id)
|
|
frame.append(0x01)
|
|
|
|
data = bytearray()
|
|
now = datetime.now()
|
|
data.extend(struct.pack("<BBBBBB",
|
|
now.year - 2000, now.month, now.day,
|
|
now.hour, now.minute, now.second))
|
|
|
|
frame.extend(struct.pack("<H", len(data)))
|
|
frame.extend(data)
|
|
|
|
check = 0
|
|
for b in frame[2:]:
|
|
check ^= b
|
|
frame.append(check)
|
|
|
|
return bytes(frame)
|
|
except Exception as e:
|
|
logging.error(f"构建07H失败: {str(e)}")
|
|
return None
|
|
|
|
def test_command(self):
|
|
"""测试07H命令"""
|
|
print("开始测试07H命令...")
|
|
class MockSocket:
|
|
def send(self, data):
|
|
print(f"模拟发送07H响应: {data.hex().upper()}")
|
|
|
|
mock_sock = MockSocket()
|
|
test_pile_id = bytes.fromhex("0317665611360637")
|
|
test_data = bytes.fromhex("4A58070317665611360637011000190109093715") # 示例07H数据
|
|
|
|
print(f"测试07H数据: {test_data.hex().upper()}")
|
|
result = self.process_07h(test_data, mock_sock)
|
|
print(f"测试结果: {'成功' if result else '失败'}")
|
|
|
|
if __name__ == "__main__":
|
|
cmd = Command07()
|
|
cmd.test_command() |