泰安市网站建设_网站建设公司_Python_seo优化
2026/3/2 23:25:49 网站建设 项目流程

智能鱼缸水质监控与管理系统

一、实际应用场景描述

场景背景

在煤矿企业的员工生活区、休息区、办公楼大堂等场所,观赏鱼缸是常见的装饰和减压设施。煤矿工作环境特殊,员工工作压力大,鱼缸的存在有助于缓解压力、改善环境。然而,传统鱼缸管理存在诸多问题,特别是煤矿调度员在24小时轮班期间,鱼缸维护不及时可能导致鱼类死亡,影响工作环境质量。

痛点分析

1. 水质管理困难:煤矿工作人员轮班制,鱼缸维护不及时

2. 监测不连续:人工监测间隔长,无法及时发现水质问题

3. 调节不及时:水温、酸碱度异常时无法快速响应

4. 能耗浪费:加热棒、过滤系统常开,能耗高

5. 数据缺失:缺乏水质历史数据,无法分析问题原因

6. 专业知识缺乏:煤矿员工缺乏养鱼专业知识

7. 紧急处理不足:突发水质问题时缺乏自动应急措施

二、核心逻辑讲解

系统架构

传感器数据采集 → 水质数据分析 → 超标判断 → 设备控制 → 状态反馈

控制策略

1. 水温控制:温度低于阈值自动加热,高于阈值停止加热

2. 酸碱度控制:pH值超出范围自动换水调节

3. 紧急处理:水质急剧变化时启动紧急措施

4. 节能模式:根据时间自动调节设备运行

5. 学习优化:记录调节习惯,优化控制参数

6. 远程监控:支持煤矿调度中心远程监控

水质控制逻辑

if 温度 < 最低舒适值:

启动加热棒

elif 温度 > 最高舒适值:

停止加热棒

if 温度 > 危险值:

启动紧急降温

if pH < 最低值:

启动碱性调节

elif pH > 最高值:

启动酸性调节

if pH > 危险值:

启动紧急换水

三、代码实现

项目结构

smart_aquarium/

├── main.py # 主程序

├── sensors/

│ ├── __init__.py

│ ├── water_sensor.py # 水质传感器

│ ├── temperature_sensor.py # 温度传感器

│ └── ph_sensor.py # pH传感器

├── actuators/

│ ├── __init__.py

│ ├── heater_controller.py # 加热棒控制器

│ ├── pump_controller.py # 水泵控制器

│ └── valve_controller.py # 阀门控制器

├── control/

│ ├── __init__.py

│ ├── water_controller.py # 水质控制器

│ ├── emergency_controller.py # 紧急控制器

│ └── schedule_controller.py # 日程控制器

├── config/

│ ├── __init__.py

│ ├── settings.py # 系统配置

│ └── fish_species.py # 鱼类配置

├── utils/

│ ├── __init__.py

│ ├── logger.py # 日志模块

│ ├── data_manager.py # 数据管理

│ ├── alert_system.py # 报警系统

│ └── energy_calculator.py # 能耗计算

├── web/

│ ├── __init__.py

│ └── dashboard.py # Web仪表板

├── tests/ # 测试代码

├── docs/ # 文档

├── requirements.txt # 依赖包

└── README.md # 说明文档

1. 配置文件 (config/settings.py)

"""

智能鱼缸系统配置文件

"""

from datetime import time

from enum import Enum

class WaterQualityStandard(Enum):

"""水质标准"""

EXCELLENT = "excellent" # 优

GOOD = "good" # 良

FAIR = "fair" # 一般

POOR = "poor" # 差

DANGER = "danger" # 危险

class SystemMode(Enum):

"""系统工作模式"""

AUTO = "auto" # 自动模式

MANUAL = "manual" # 手动模式

VACATION = "vacation" # 假期模式

MAINTENANCE = "maintenance" # 维护模式

class FishSpecies(Enum):

"""常见鱼类"""

GOLDFISH = "goldfish" # 金鱼

KOI = "koi" # 锦鲤

TROPICAL = "tropical" # 热带鱼

COLDWATER = "coldwater" # 冷水鱼

MARINE = "marine" # 海水鱼

class SystemConfig:

"""系统配置参数"""

# 水温控制参数 (°C)

TEMPERATURE_SETTINGS = {

'tropical_fish': {

'min': 24.0, # 最低温度

'max': 28.0, # 最高温度

'optimal': 26.0, # 最佳温度

'danger_low': 20.0, # 危险低温

'danger_high': 32.0 # 危险高温

},

'goldfish': {

'min': 18.0,

'max': 24.0,

'optimal': 22.0,

'danger_low': 10.0,

'danger_high': 30.0

},

'koi': {

'min': 15.0,

'max': 25.0,

'optimal': 20.0,

'danger_low': 5.0,

'danger_high': 30.0

}

}

# pH值控制参数

PH_SETTINGS = {

'freshwater': {

'min': 6.5, # 最小值

'max': 7.5, # 最大值

'optimal': 7.0, # 最佳值

'danger_low': 6.0, # 危险低值

'danger_high': 8.0 # 危险高值

},

'marine': {

'min': 8.0,

'max': 8.4,

'optimal': 8.2,

'danger_low': 7.8,

'danger_high': 8.6

}

}

# 设备控制参数

DEVICE_SETTINGS = {

'heater': {

'power': 100, # 加热棒功率(W)

'min_on_time': 60, # 最小开启时间(秒)

'min_off_time': 60, # 最小关闭时间(秒)

'max_temp_increase': 2.0, # 最大升温幅度(°C/小时)

},

'water_pump': {

'flow_rate': 1000, # 流量(L/小时)

'min_run_time': 300, # 最小运行时间(秒)

'max_run_time': 3600, # 最大运行时间(秒)

},

'water_change': {

'max_volume': 0.2, # 最大换水量(占总水量比例)

'min_interval': 3600, # 最小换水间隔(秒)

'duration': 300, # 换水持续时间(秒)

}

}

# 监测参数

MONITOR_SETTINGS = {

'update_interval': 10, # 更新间隔(秒)

'data_log_interval': 60, # 数据记录间隔(秒)

'sensor_calibration_interval': 86400, # 传感器校准间隔(秒)

'emergency_check_interval': 5, # 紧急检查间隔(秒)

}

# 报警参数

ALERT_SETTINGS = {

'enable_sms': False, # 启用短信报警

'enable_email': True, # 启用邮件报警

'enable_push': True, # 启用推送报警

'alert_cooldown': 300, # 报警冷却时间(秒)

'phone_numbers': [], # 报警电话

'email_addresses': [], # 报警邮箱

}

# 能源管理参数

ENERGY_SETTINGS = {

'night_mode': True, # 夜间模式

'night_start': time(22, 0), # 夜间开始

'night_end': time(6, 0), # 夜间结束

'power_save': True, # 节能模式

'max_daily_energy': 5.0, # 最大日能耗(kWh)

}

# 系统参数

SYSTEM_SETTINGS = {

'tank_volume': 200, # 鱼缸容量(L)

'fish_count': 10, # 鱼的数量

'fish_species': 'goldfish', # 鱼类品种

'location': '煤矿调度中心休息区', # 位置

'emergency_water_source': True, # 是否有紧急水源

}

class AlertLevel(Enum):

"""报警级别"""

INFO = "info" # 信息

WARNING = "warning" # 警告

ERROR = "error" # 错误

CRITICAL = "critical" # 严重

2. 水质传感器模块 (sensors/water_sensor.py)

"""

水质传感器模块

"""

import random

import time

import math

from abc import ABC, abstractmethod

from datetime import datetime

from typing import Dict, Optional, Tuple

import smbus2

class WaterSensor(ABC):

"""水质传感器抽象基类"""

def __init__(self, sensor_id: str, sensor_type: str, location: str = "main"):

"""

初始化传感器

参数:

sensor_id: 传感器ID

sensor_type: 传感器类型

location: 传感器位置

"""

self.sensor_id = sensor_id

self.sensor_type = sensor_type

self.location = location

self.status = "disconnected"

self.last_reading = None

self.last_calibration = None

self.calibration_offset = 0.0

self.accuracy = 0.1 # 精度

@abstractmethod

def connect(self) -> bool:

"""连接传感器"""

pass

@abstractmethod

def disconnect(self) -> bool:

"""断开传感器连接"""

pass

@abstractmethod

def read_value(self) -> float:

"""

读取传感器值

返回:

传感器读数

"""

pass

def calibrate(self, reference_value: float) -> float:

"""

校准传感器

参数:

reference_value: 参考值

返回:

校准偏移量

"""

actual_value = self.read_value()

self.calibration_offset = reference_value - actual_value

self.last_calibration = time.time()

return self.calibration_offset

def get_status(self) -> Dict:

"""获取传感器状态"""

return {

"sensor_id": self.sensor_id,

"sensor_type": self.sensor_type,

"location": self.location,

"status": self.status,

"last_reading": self.last_reading,

"last_calibration": self.last_calibration,

"calibration_offset": self.calibration_offset,

"accuracy": self.accuracy

}

class TemperatureSensor(WaterSensor):

"""温度传感器"""

def __init__(self, sensor_id: str, location: str = "main",

sensor_type: str = "DS18B20", bus_number: int = 1):

"""

初始化温度传感器

参数:

sensor_id: 传感器ID

location: 位置

sensor_type: 传感器型号

bus_number: 总线号

"""

super().__init__(sensor_id, "temperature", location)

self.sensor_type = sensor_type

self.bus_number = bus_number

self.bus = None

def connect(self) -> bool:

"""连接温度传感器"""

try:

if self.sensor_type == "DS18B20":

# DS18B20连接逻辑

import os

os.system('modprobe w1-gpio')

os.system('modprobe w1-therm')

device_folder = f'/sys/bus/w1/devices/28-*/w1_slave'

if not os.path.exists(device_folder):

raise FileNotFoundError("DS18B20传感器未找到")

elif self.sensor_type == "DHT22":

import Adafruit_DHT

self.sensor = Adafruit_DHT.DHT22

self.status = "connected"

print(f"[INFO] 温度传感器 {self.sensor_id} 连接成功")

return True

except Exception as e:

print(f"[ERROR] 连接温度传感器失败: {e}")

self.status = "error"

return False

def disconnect(self) -> bool:

"""断开传感器连接"""

self.status = "disconnected"

return True

def read_value(self) -> float:

"""读取温度值"""

if self.status != "connected":

raise ConnectionError("传感器未连接")

try:

if self.sensor_type == "simulated":

# 模拟温度值

base_temp = 25.0

hour = datetime.now().hour

# 模拟昼夜变化

time_effect = 2.0 * math.sin(2 * math.pi * hour / 24)

# 模拟随机波动

random_effect = random.uniform(-0.5, 0.5)

# 模拟设备影响

device_effect = 0.5 if random.random() < 0.1 else 0.0

temperature = base_temp + time_effect + random_effect + device_effect

elif self.sensor_type == "DS18B20":

# 读取DS18B20

with open('/sys/bus/w1/devices/28-*/w1_slave', 'r') as f:

lines = f.readlines()

if lines[0].strip()[-3:] == 'YES':

equals_pos = lines[1].find('t=')

if equals_pos != -1:

temp_string = lines[1][equals_pos+2:]

temperature = float(temp_string) / 1000.0

else:

raise ValueError("温度数据格式错误")

else:

raise ValueError("传感器校验失败")

elif self.sensor_type == "DHT22":

import Adafruit_DHT

humidity, temperature = Adafruit_DHT.read_retry(self.sensor, 4)

if temperature is None:

raise ValueError("读取温度失败")

else:

raise ValueError(f"不支持的传感器类型: {self.sensor_type}")

# 应用校准

temperature += self.calibration_offset

self.last_reading = temperature

return temperature

except Exception as e:

print(f"[ERROR] 读取温度失败: {e}")

# 返回上次读数或默认值

return self.last_reading if self.last_reading is not None else 25.0

class PHSensor(WaterSensor):

"""pH传感器"""

def __init__(self, sensor_id: str, location: str = "main",

sensor_type: str = "simulated", i2c_address: int = 0x63):

"""

初始化pH传感器

参数:

sensor_id: 传感器ID

location: 位置

sensor_type: 传感器型号

i2c_address: I2C地址

"""

super().__init__(sensor_id, "ph", location)

self.sensor_type = sensor_type

self.i2c_address = i2c_address

self.bus = None

def connect(self) -> bool:

"""连接pH传感器"""

try:

if self.sensor_type == "real":

self.bus = smbus2.SMBus(1)

# 测试连接

self.bus.write_byte(self.i2c_address, 0x00)

self.status = "connected"

print(f"[INFO] pH传感器 {self.sensor_id} 连接成功")

return True

except Exception as e:

print(f"[ERROR] 连接pH传感器失败: {e}")

self.status = "error"

return False

def disconnect(self) -> bool:

"""断开传感器连接"""

if self.bus:

self.bus.close()

self.status = "disconnected"

return True

def read_value(self) -> float:

"""读取pH值"""

if self.status != "connected":

raise ConnectionError("传感器未连接")

try:

if self.sensor_type == "simulated":

# 模拟pH值

base_ph = 7.0

# 模拟自然变化

time_factor = 0.1 * math.sin(time.time() / 3600)

# 模拟喂食影响

feed_effect = -0.3 if random.random() < 0.05 else 0.0

# 模拟换水影响

water_change_effect = 0.2 if random.random() < 0.02 else 0.0

# 模拟随机波动

random_effect = random.uniform(-0.1, 0.1)

ph_value = base_ph + time_factor + feed_effect + water_change_effect + random_effect

elif self.sensor_type == "real":

# 读取真实pH传感器

self.bus.write_byte(self.i2c_address, 0x01) # 读取命令

time.sleep(0.1)

data = self.bus.read_i2c_block_data(self.i2c_address, 0x00, 2)

ph_value = ((data[0] << 8) + data[1]) / 100.0

else:

raise ValueError(f"不支持的传感器类型: {self.sensor_type}")

# 应用校准

ph_value += self.calibration_offset

self.last_reading = ph_value

return ph_value

except Exception as e:

print(f"[ERROR] 读取pH值失败: {e}")

return self.last_reading if self.last_reading is not None else 7.0

class WaterLevelSensor(WaterSensor):

"""水位传感器"""

def __init__(self, sensor_id: str, location: str = "main",

max_level: float = 100.0, min_level: float = 0.0):

"""

初始化水位传感器

参数:

sensor_id: 传感器ID

location: 位置

max_level: 最高水位(cm)

min_level: 最低水位(cm)

"""

super().__init__(sensor_id, "water_level", location)

self.max_level = max_level

self.min_level = min_level

self.initial_level = 80.0 # 初始水位

def connect(self) -> bool:

"""连接水位传感器"""

self.status = "connected"

return True

def disconnect(self) -> bool:

"""断开传感器连接"""

self.status = "disconnected"

return True

def read_value(self) -> float:

"""读取水位值"""

if self.status != "connected":

raise ConnectionError("传感器未连接")

try:

# 模拟水位变化

base_level = self.initial_level

# 模拟蒸发

evaporation = 0.1 * (time.time() % 86400) / 86400

# 模拟换水

water_change = -5.0 if random.random() < 0.01 else 0.0

# 模拟补水

water_add = 2.0 if random.random() < 0.005 else 0.0

# 模拟随机波动

random_effect = random.uniform(-0.5, 0.5)

level = base_level - evaporation + water_change + water_add + random_effect

# 限制范围

level = max(self.min_level, min(self.max_level, level))

self.last_reading = level

return level

except Exception as e:

print(f"[ERROR] 读取水位失败: {e}")

return self.last_reading if self.last_reading is not None else 80.0

class WaterQualitySensor(WaterSensor):

"""水质综合传感器(模拟TDS、电导率等)"""

def __init__(self, sensor_id: str, location: str = "main"):

"""

初始化水质传感器

参数:

sensor_id: 传感器ID

location: 位置

"""

super().__init__(sensor_id, "water_quality", location)

def connect(self) -> bool:

"""连接水质传感器"""

self.status = "connected"

return True

def disconnect(self) -> bool:

"""断开传感器连接"""

self.status = "disconnected"

return True

def read_value(self) -> Dict[str, float]:

"""读取综合水质参数"""

if self.status != "connected":

raise ConnectionError("传感器未连接")

try:

# 模拟水质参数

tds = random.uniform(100, 300) # 总溶解固体(ppm)

conductivity = random.uniform(200, 500) # 电导率(μS/cm)

orp = random.uniform(200, 400) # 氧化还原电位(mV)

salinity = random.uniform(0, 5) # 盐度(ppt)

readings = {

'tds': tds,

'conductivity': conductivity,

'orp': orp,

'salinity': salinity

}

self.last_reading = readings

return readings

except Exception as e:

print(f"[ERROR] 读取水质参数失败: {e}")

return self.last_reading if self.last_reading is not None else {}

class SensorManager:

"""传感器管理器"""

def __init__(self):

"""初始化传感器管理器"""

self.sensors = {}

self.readings_cache = {}

self.calibration_data = {}

def add_sensor(self, sensor: WaterSensor) -> bool:

"""

添加传感器

参数:

sensor: 传感器实例

返回:

是否成功

"""

if sensor.sensor_id in self.sensors:

print(f"[WARNING] 传感器 {sensor.sensor_id} 已存在")

return False

if sensor.connect():

self.sensors[sensor.sensor_id] = sensor

return True

return False

def remove_sensor(self, sensor_id: str) -> bool:

"""

移除传感器

参数:

sensor_id: 传感器ID

返回:

是否成功

"""

if sensor_id in self.sensors:

self.sensors[sensor_id].disconnect()

del self.sensors[sensor_id]

return True

return False

def read_all_sensors(self) -> Dict:

"""

读取所有传感器数据

返回:

传感器数据字典

"""

readings = {}

for sensor_id, sensor in self.sensors.items():

try:

value = sensor.read_value()

readings[sensor_id] = {

'type': sensor.sensor_type,

'value': value,

'location': sensor.location,

'timestamp': datetime.now().isoformat(),

'status': 'success'

}

except Exception as e:

readings[sensor_id] = {

'type': sensor.sensor_type,

'error': str(e),

'timestamp': datetime.now().isoformat(),

'status': 'error'

}

self.readings_cache = readings

return readings

def get_sensor_value(self, sensor_id: str) -> Optional[float]:

"""

获取指定传感器值

参数:

sensor_id: 传感器ID

返回:

传感器值

"""

if sensor_id in self.sensors:

try:

return self.sensors[sensor_id].read_value()

except:

return None

return None

def calibrate_sensor(self, sensor_id: str, reference_value: float) -> bool:

"""

校准传感器

参数:

sensor_id: 传感器ID

reference_value: 参考值

返回:

是否成功

"""

if sensor_id in self.sensors:

try:

offset = self.sensors[sensor_id].calibrate(reference_value)

self.calibration_data[sensor_id] = {

'timestamp': time.time(),

'reference_value': reference_value,

'offset': offset

}

return True

except Exception as e:

print(f"[ERROR] 校准传感器 {sensor_id} 失败: {e}")

return False

return False

def get_sensor_status(self) -> Dict:

如果你觉得这个工具好用,欢迎关注我!

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询