提交 0407748c authored 作者: 彭甘宇's avatar 彭甘宇

增加mqtt程序:还没对接完成,待调整参数内容

上级 49e29954
# 默认忽略的文件
/shelf/
/workspace.xml
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N812" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.10" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/mqttprogram.iml" filepath="$PROJECT_DIR$/.idea/mqttprogram.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
pip install paho-mqtt
\ No newline at end of file
File added
File added
import logging
from flask import Flask, request, jsonify, render_template
from mysql_connect import get_data_dict
from mqtt import MQTTClient
import time
# 配置日志记录器
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
app = Flask(__name__)
# 获取 data_dict
data_dict = get_data_dict()
# 获取所有公司的ID
company_ids = set(row['roommaster_company_id'] for rows in data_dict.values() for row in rows)
# 创建MQTT客户端实例
mqtt_client = MQTTClient('config.json')
# 启动MQTT客户端
mqtt_client.start()
# 获取 MQTT 客户端对象和主题
mqtt_client_obj = mqtt_client.client
topic_publish = "/maintain/room/master/client/"
topic_publish_online = "/maintain/room/master/client/online/"
@app.route('/')
def index():
return render_template('index.html', company_ids=sorted(company_ids))
@app.route('/get_regions', methods=['GET'])
def get_regions():
company_id = int(request.args.get('company_id'))
logging.debug(f"收到公司ID: {company_id}")
regions = set()
for row in data_dict.get(company_id, []):
regions.add((row['roommaster_id'], row['roommaster_name']))
return jsonify(list(regions))
@app.route('/get_devices', methods=['GET'])
def get_devices():
region_ids = request.args.get('region_ids')
logging.debug(f"收到区域ID: {region_ids}")
region_ids = [int(id) for id in region_ids.split(',')]
devices = []
for company_id, rows in data_dict.items():
for row in rows:
if row['roommaster_id'] in region_ids:
devices.append({
'device_id': row['roomdevice_udid'],
'device_name': row['roomdevice_name']
})
return jsonify(devices)
@app.route('/toggle_power', methods=['POST'])
def toggle_power():
data = request.json
regions = data.get('regions', [])
on = data.get('on', False)
logging.debug(f"收到请求参数: regions={regions}, on={on}")
if not regions:
logging.warning("未收到任何区域信息")
else:
for region in regions:
roommaster_udid = get_roommaster_udid(region)
if roommaster_udid:
payload = {"udid": roommaster_udid, "action": "power", "value": 1 if on else 0}
logging.debug(f"调用 mqtt.open_conference_room 区域: {region}, payload: {payload}")
result = mqtt_client.open_conference_room(topic_publish, payload)
logging.debug(f"mqtt.open_conference_room 的结果: {result}")
else:
logging.warning(f"未找到区域 {region} 对应的 roommaster_udid")
return jsonify({'success': True})
@app.route('/toggle_status', methods=['POST'])
def toggle_status():
data = request.json
regions = data.get('regions', [])
online = data.get('online', False)
logging.debug(f"收到请求参数: regions={regions}, online={online}")
if not regions:
logging.warning("未收到任何区域信息")
else:
for region in regions:
roommaster_udid = get_roommaster_udid(region)
if roommaster_udid:
payload = {"udid": roommaster_udid, "action": "status", "value": 1 if online else 0}
logging.debug(f"调用 mqtt.conference_room_online 区域: {region}, payload: {payload}")
if online:
result = mqtt_client.conference_room_online(topic_publish_online, payload)
else:
result = mqtt_client.conference_room_offline(topic_publish_online, payload)
logging.debug(f"mqtt.conference_room_online/offline 的结果: {result}")
else:
logging.warning(f"未找到区域 {region} 对应的 roommaster_udid")
return jsonify({'success': True})
def get_roommaster_udid(roommaster_id):
for rows in data_dict.values():
for row in rows:
if row['roommaster_id'] == roommaster_id:
return row['roommaster_udid']
return None
if __name__ == '__main__':
try:
app.run(debug=True)
except KeyboardInterrupt:
logging.info("已手动中断Flask应用进程!")
finally:
# 停止MQTT客户端
mqtt_client.stop()
\ No newline at end of file
{
"client_id": "Pgy_local_test",
"username": "mqtt@cmdb",
"password": "mqtt@webpassw0RD",
"broker_address": "192.168.5.215",
"port": 1883,
"host_name": "192.168.5.215",
"mysql_port": 8306,
"user_name": "root",
"user_password": "Ubains@123",
"db_name": "devops"
}
\ No newline at end of file
# -*- coding: utf-8 -*-
# cython: language_level=3
import json
import time
import paho.mqtt.client as mqtt
import logging
# 自定义过滤器类
class CustomFilter(logging.Filter):
def filter(self, record):
# 屏蔽特定的调试日志内容
if record.levelno == logging.DEBUG:
message = record.getMessage()
if "发送->主题" in message and "发送->数据" in message:
data = message.split("发送->数据: ")[1]
try:
data_dict = json.loads(data)
if data_dict.get("action") == "test" and data_dict.get("data") == "Hello, MQTT!":
return False
except json.JSONDecodeError:
pass
elif "Published message with message id" in message:
return False
return True
# 配置日志记录器
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
logger.addFilter(CustomFilter())
class MQTTClient:
def __init__(self, config_file):
self.config = self.load_config(config_file)
self.client = mqtt.Client(client_id=self.config['client_id'], protocol=mqtt.MQTTv311)
self.client.username_pw_set(self.config['username'], self.config['password'])
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
self.client.on_publish = self.on_publish
self.client.on_subscribe = self.on_subscribe
def load_config(self, file_path):
"""从指定路径加载配置文件"""
with open(file_path, 'r') as file:
return json.load(file)
def on_connect(self, client, userdata, flags, rc):
"""连接回调函数"""
if rc == 0:
logging.info("连接MQTT成功.")
# 订阅某个主题(可选)
client_udid = '3d53149e-408f-56c6-99ef-29fae67f8efc'
topic_subscribe = f"/maintain/room/master/{client_udid}/"
self.client.subscribe(topic_subscribe, qos=1) # 尝试使用 QoS 1
logging.info(f"已订阅主题: {topic_subscribe}")
else:
logging.error(f"连接MQTT失败,返回代码为: {rc}.")
self.reconnect()
def on_disconnect(self, client, userdata, rc):
"""断开连接回调函数"""
if rc != 0:
logging.error(f"异常断开,返回代码为: {rc}.")
self.reconnect()
def on_message(self, client, userdata, msg):
"""接收消息回调函数"""
logging.debug(f"接收<-主题:'{msg.topic}'")
try:
payload = msg.payload.decode()
logging.debug(f"接收<-数据:{payload}")
except UnicodeDecodeError:
logging.error("无法解码消息内容,可能是编码问题")
def on_publish(self, client, userdata, mid):
"""发送消息回调函数"""
logging.debug(f"Published message with message id {mid}")
def on_subscribe(self, client, userdata, mid, granted_qos):
"""订阅成功回调函数"""
logging.info(f"订阅成功,mid: {mid}, granted_qos: {granted_qos}")
self.perform_post_subscription_tasks(client)
def perform_post_subscription_tasks(self, client):
"""订阅成功后执行的任务"""
logging.info("执行订阅成功后的任务...")
# 注释掉或删除以下行以防止发送特定消息
# topic_publish = "/maintain/room/master/client/"
# message = {"action": "test", "data": "Hello, MQTT!"}
# self.publish_message(client, topic_publish, message)
def publish_message(self, client, topic, message, qos=0):
"""发布消息到指定主题"""
logging.debug(f"发送->主题:'{topic}',\n发送->数据: {json.dumps(message)}")
result = client.publish(topic, json.dumps(message), qos)
return result[0] == 0 # 返回是否成功
def open_conference_room(self, topic, client_udid):
"""开启会议室电源"""
open_message = {
"action": "_updatemaster",
"client_udid": client_udid,
"data": [
{
"item": "environmental",
"pm25": 20,
"co2": 2,
"temp": 20,
"tvoc": 20,
"humi": 20,
"hcho": 20
},
{
"item": "conference",
"power": 1,
"exist": 1,
"run": "在线"
}
]
}
return self.publish_message(self.client, topic, open_message, qos=0)
def close_conference_room(self, topic, client_udid):
"""关闭会议室电源"""
close_message = {
"action": "_updatemaster",
"client_udid": client_udid,
"data": [
{
"item": "environmental",
"pm25": 20,
"co2": 2,
"temp": 20,
"tvoc": 20,
"humi": 20,
"hcho": 20
},
{
"item": "conference",
"power": 0,
"exist": 1,
"run": "离线"
}
]
}
return self.publish_message(self.client, topic, close_message, qos=0)
def conference_room_online(self, topic, client_udid):
"""会议室在线"""
online_message = {
"udid": client_udid,
"action": "online",
"value": 1
}
return self.publish_message(self.client, topic, online_message, qos=0)
def conference_room_offline(self, topic, client_udid):
"""会议室离线"""
offline_message = {
"udid": client_udid,
"action": "offline",
"value": 0
}
return self.publish_message(self.client, topic, offline_message, qos=0)
def start(self):
"""连接服务器并启动网络循环"""
try:
self.client.connect(self.config['broker_address'], self.config['port'])
except Exception as e:
logging.error(f"连接MQTT失败,错误为: {e}")
else:
self.client.loop_start()
def stop(self):
"""停止网络循环并断开连接"""
self.client.loop_stop()
self.client.disconnect()
def reconnect(self):
"""重连机制"""
logging.info("尝试重新连接MQTT服务器...")
time.sleep(5) # 等待5秒后重试
self.start()
# # 示例使用
# if __name__ == '__main__':
# mqtt_client = MQTTClient('config.json')
# mqtt_client.start()
#
# try:
# while True:
# logging.debug("正在运行...")
# time.sleep(600) # 保持脚本运行
# except KeyboardInterrupt:
# logging.info("已手动中断MQTT连接进程!")
# finally:
# mqtt_client.stop()
\ No newline at end of file
# import mysql.connector
# from mysql.connector import Error
# import json
#
#
# def create_connection():
# with open('config.json') as config_file:
# config = json.load(config_file)
#
# connection = None
# try:
# connection = mysql.connector.connect(
# host=config['host_name'],
# port=config['mysql_port'],
# user=config['user_name'],
# password=config['user_password'],
# database=config['db_name']
# )
# print("Connection to MySQL DB successful")
# except Error as e:
# print(f"The error '{e}' occurred")
#
# return connection
#
#
# def execute_query(connection, query):
# if connection is not None:
# try:
# cursor = connection.cursor(dictionary=True)
# cursor.execute(query)
# results = cursor.fetchall()
# return results
# except Error as e:
# print(f"The error '{e}' occurred")
# finally:
# cursor.close()
#
#
# def save_results(results):
# data_dict = {}
# for row in results:
# company_id = row['roommaster_company_id']
# if company_id not in data_dict:
# data_dict[company_id] = []
# data_dict[company_id].append(row)
# return data_dict
#
#
# def get_data_dict():
# connection = create_connection()
# query = """
# SELECT
# rd.udid AS roomdevice_udid,
# rd.name AS roomdevice_name,
# rm.id AS roommaster_id,
# rm.name AS roommaster_name,
# rm.udid AS roommaster_udid,
# rm.company_id AS roommaster_company_id
# FROM
# cmdb_roomdevice rd
# JOIN
# cmdb_roommaster rm
# ON
# rd.master_id = rm.id;
# """
# results = execute_query(connection, query)
# data_dict = save_results(results)
# if connection.is_connected():
# connection.close()
# return data_dict
#
#
# def main():
# data_dict = get_data_dict()
# print(data_dict)
#
# # 打印数据字典的键,确保数据保存正确
# print("Data dictionary keys:", list(data_dict.keys()))
#
# # 打印每个公司ID对应的数据,确保数据保存正确
# for company_id, rows in data_dict.items():
# print(f"Company ID {company_id} has {len(rows)} records:")
# for row in rows:
# print(row)
#
#
# if __name__ == "__main__":
# main()
import mysql.connector
from mysql.connector import Error
import json
from flask import Flask, request, jsonify
import logging
app = Flask(__name__)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
def create_connection():
with open('config.json') as config_file:
config = json.load(config_file)
connection = None
try:
connection = mysql.connector.connect(
host=config['host_name'],
port=config['mysql_port'],
user=config['user_name'],
password=config['user_password'],
database=config['db_name']
)
print("Connection to MySQL DB successful")
except Error as e:
print(f"The error '{e}' occurred")
return connection
def execute_query(connection, query):
if connection is not None:
try:
cursor = connection.cursor(dictionary=True)
cursor.execute(query)
results = cursor.fetchall()
return results
except Error as e:
print(f"The error '{e}' occurred")
finally:
cursor.close()
def save_results(results):
data_dict = {}
for row in results:
company_id = row['roommaster_company_id']
if company_id not in data_dict:
data_dict[company_id] = []
data_dict[company_id].append(row)
return data_dict
def get_data_dict():
connection = create_connection()
query = """
SELECT
rd.udid AS roomdevice_udid,
rd.name AS roomdevice_name,
rm.id AS roommaster_id,
rm.name AS roommaster_name,
rm.udid AS roommaster_udid,
rm.company_id AS roommaster_company_id
FROM
cmdb_roomdevice rd
JOIN
cmdb_roommaster rm
ON
rd.master_id = rm.id;
"""
results = execute_query(connection, query)
data_dict = save_results(results)
if connection.is_connected():
connection.close()
return data_dict
def main():
global data_dict
data_dict = get_data_dict()
print(data_dict)
# 打印数据字典的键,确保数据保存正确
print("Data dictionary keys:", list(data_dict.keys()))
# 打印每个公司ID对应的数据,确保数据保存正确
for company_id, rows in data_dict.items():
print(f"Company ID {company_id} has {len(rows)} records:")
for row in rows:
print(row)
@app.route('/get_regions', methods=['GET'])
def get_regions():
try:
company_id = int(request.args.get('company_id'))
logging.debug(f"收到公司ID: {company_id}")
regions = []
for row in data_dict.get(company_id, []):
region_info = {
'roommaster_id': row['roommaster_id'],
'roommaster_name': row['roommaster_name'],
'roommaster_udid': row['roommaster_udid']
}
regions.append(region_info)
if not regions:
logging.warning(f"未找到公司ID {company_id} 对应的区域信息")
return jsonify({"error": f"未找到公司ID {company_id} 对应的区域信息"}), 404
return jsonify(regions)
except (ValueError, TypeError) as e:
logging.error(f"无效的公司ID: {request.args.get('company_id')}")
return jsonify({"error": "无效的公司ID"}), 400
if __name__ == '__main__':
main()
app.run(debug=True)
\ No newline at end of file
body {
background-color: #f8f9fa;
}
.container {
max-width: 800px;
}
.form-select {
width: 100%;
}
select[multiple] {
height: 200px;
}
function getRegions() {
const companyId = $('#company_id').val();
$.ajax({
url: '/get_regions',
method: 'GET',
data: { company_id: companyId },
success: function(response) {
const regionsSelect = $('#regions');
regionsSelect.empty();
response.forEach(region => {
regionsSelect.append(`<option value="${region[0]}">${region[1]}</option>`);
});
},
error: function(error) {
console.error('Error fetching regions:', error);
}
});
}
function getDevices() {
const selectedRegions = $('#regions').val();
if (selectedRegions.length === 0) {
alert('请选择至少一个会议室');
return;
}
$.ajax({
url: '/get_devices',
method: 'GET',
data: { region_ids: selectedRegions.join(',') },
success: function(response) {
const devicesSelect = $('#devices');
devicesSelect.empty();
response.forEach(device => {
devicesSelect.append(`<option value="${device.device_id}">${device.device_name}</option>`);
});
},
error: function(error) {
console.error('Error fetching devices:', error);
}
});
}
function togglePower(on) {
const selectedRegions = $('#regions').val();
if (!selectedRegions || selectedRegions.length === 0) {
alert('请先选择会议室');
return;
}
const action = on ? '开启' : '关闭';
$.ajax({
url: '/toggle_power',
method: 'POST',
data: JSON.stringify({ regions: selectedRegions, on: on }),
contentType: 'application/json',
success: function(response) {
alert(`已${action} ${selectedRegions.join(', ')} 的电源`);
},
error: function(error) {
console.error('Error toggling power:', error);
}
});
}
function toggleStatus(online) {
const selectedRegions = $('#regions').val();
if (!selectedRegions || selectedRegions.length === 0) {
alert('请先选择会议室');
return;
}
const status = online ? '在线' : '离线';
$.ajax({
url: '/toggle_status',
method: 'POST',
data: JSON.stringify({ regions: selectedRegions, online: online }),
contentType: 'application/json',
success: function(response) {
alert(`已将 ${selectedRegions.join(', ')} 设置为 ${status}`);
},
error: function(error) {
console.error('Error toggling status:', error);
}
});
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>会议室和设备选择</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body>
<div class="container mt-5">
<h1 class="mb-4">选择会议室和设备</h1>
<form id="region-form" class="mb-4">
<div class="mb-3">
<label for="company_id" class="form-label">选择公司:</label>
<select class="form-select" id="company_id" name="company_id" required>
<option value="" disabled selected>请选择公司</option>
{% for company_id in company_ids %}
<option value="{{ company_id }}">{{ company_id }}</option>
{% endfor %}
</select>
</div>
<button type="button" class="btn btn-primary" onclick="getRegions()">获取会议室</button>
</form>
<div id="regions-container" class="mb-4">
<label for="regions" class="form-label">选择会议室:</label>
<select id="regions" class="form-select" multiple size="10"></select>
<button type="button" class="btn btn-primary mt-2" onclick="getDevices()">获取设备</button>
</div>
<div id="devices-container">
<label for="devices" class="form-label">选择设备:</label>
<select id="devices" class="form-select" multiple size="10"></select>
</div>
<div class="mt-4">
<button type="button" class="btn btn-success" onclick="togglePower(true)">会议室开启电源</button>
<button type="button" class="btn btn-danger" onclick="togglePower(false)">会议室关闭电源</button>
<button type="button" class="btn btn-success" onclick="toggleStatus(true)">会议室在线</button>
<button type="button" class="btn btn-danger" onclick="toggleStatus(false)">会议室离线</button>
</div>
</div>
<script src="{{ url_for('static', filename='js/script.js') }}"></script>
</body>
</html>
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论