物联网之适配器扩展设备(五)

本文详细介绍如何通过适配器模式扩展支持新的设备类型。

一、适配器架构概述

系统采用三层适配器架构,实现设备类型的可扩展性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
┌─────────────────────────────────────────────────────────────┐
│ 前端展示层 (Web) │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────┐ │
│ │HotWaterBoiler │ │SteamBoiler │ │NewDevice │ │
│ │Adapter │ │Adapter │ │Adapter │ │
│ └─────────────────┘ └─────────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ 服务端业务层 (Server) │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────┐ │
│ │HotWaterBoiler │ │SteamBoiler │ │NewDevice │ │
│ │ServiceAdapter │ │ServiceAdapter │ │ServiceAdapter│
│ └─────────────────┘ └─────────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ 数据解析层 (Types) │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────┐ │
│ │HotWaterBoiler │ │SteamBoiler │ │NewDevice │ │
│ │ParserAdapter │ │ParserAdapter │ │ParserAdapter│ │
│ └─────────────────┘ └─────────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘

三层适配器职责

层级 文件位置 职责
数据解析层 packages/types/src/payload-parser.ts 解析 MQTT 原始数据,转换为标准化格式
服务端业务层 apps/server/src/common/adapters/deviceAdapter.ts 告警提取、状态判断、数据验证
前端展示层 apps/web/src/adapters/deviceAdapter.ts 详情页数据分组、状态文本/样式转换

二、扩展步骤概览

1
2
3
4
5
6
步骤 1: 定义设备类型 (packages/types/src/mqtt.ts)
步骤 2: 创建数据解析适配器 (packages/types/src/payload-parser.ts)
步骤 3: 创建服务端业务适配器 (apps/server/src/common/adapters/deviceAdapter.ts)
步骤 4: 创建前端展示适配器 (apps/web/src/adapters/deviceAdapter.ts)
步骤 5: 添加国际化翻译 (apps/web/src/i18n/locales/)
步骤 6: 创建模拟器脚本 (scripts/simulator-*.js)

三、步骤详解

步骤 1: 定义设备类型

编辑 packages/types/src/mqtt.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 1. 添加设备类型
export type DeviceType =
| 'hot_water_boiler'
| 'steam_boiler'
| 'new_device_type'; // 新增设备类型

// 2. 如果有旧类型需要兼容,添加映射
export type LegacyDeviceType =
| 'IPT2'
| 'BOILER'
| 'OLD_TYPE'; // 旧类型名称

export const LEGACY_DEVICE_TYPE_MAP: Record<LegacyDeviceType, DeviceType> = {
IPT2: 'hot_water_boiler',
BOILER: 'steam_boiler',
OLD_TYPE: 'new_device_type', // 旧类型映射到新类型
};

步骤 2: 创建数据解析适配器

编辑 packages/types/src/payload-parser.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 新设备解析器适配器
*/
export class NewDeviceParserAdapter implements DeviceParserAdapter {
readonly deviceType: DeviceType = 'new_device_type';

parse(dataMap: Map<string, MqttDataItem>, _params: MqttPayloadNew['params']): Record<string, any> {
// 获取原始值的辅助函数
const getValue = (name: string): string => {
return dataMap.get(name)?.value ?? '0';
};

const getNumericValue = (name: string): number => {
return parseFloat(getValue(name)) || 0;
};

// 解析状态字 (如果需要 bit 位解析)
const parseStatusWord = (value: number) => {
return {
autoMode: ((value >> 14) & 0x03) === 0,
mainPump: ((value >> 12) & 0x03) === 0,
running: ((value >> 8) & 0x0f) === 0x0f,
output: value & 0xff,
};
};

// 解析报警状态 (根据设备协议)
const parseAlarmStatus = (name: string): boolean => {
const value = getNumericValue(name);
return value === 43690; // 或其他判断逻辑
};

// 返回标准化数据结构
return {
// 温度数据
temperature: getNumericValue('temp') / 10, // 根据协议换算
pressure: getNumericValue('press') / 100,

// 状态数据
status: getNumericValue('status'),
statusWord: parseStatusWord(getNumericValue('status_reg')),

// 报警数据
alarms: {
highTemp: parseAlarmStatus('high_temp_alm'),
lowPressure: parseAlarmStatus('low_press_alm'),
// ... 其他报警
},
};
}
}

// 在 DeviceParserFactory 中注册
export class DeviceParserFactory {
private static adapters: Map<DeviceType, DeviceParserAdapter> = new Map();

static {
this.register(new HotWaterBoilerParserAdapter());
this.register(new SteamBoilerParserAdapter());
this.register(new NewDeviceParserAdapter()); // 注册新适配器
}

static register(adapter: DeviceParserAdapter): void {
this.adapters.set(adapter.deviceType, adapter);
}

static getAdapter(deviceType: DeviceType): DeviceParserAdapter | undefined {
return this.adapters.get(deviceType);
}
}

步骤 3: 创建服务端业务适配器

编辑 apps/server/src/common/adapters/deviceAdapter.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* 新设备服务适配器
*/
export class NewDeviceServiceAdapter implements DeviceServiceAdapter {
readonly deviceType: DeviceType = 'new_device_type';

// 定义告警映射
private alarmDefinitions: AlarmDefinition[] = [
{ code: 'HIGH_TEMP', name: '温度过高', field: 'alarms.highTemp' },
{ code: 'LOW_PRESSURE', name: '压力过低', field: 'alarms.lowPressure' },
// ... 其他告警定义
];

/**
* 获取告警定义列表
*/
getAlarmDefinitions(): AlarmDefinition[] {
return this.alarmDefinitions;
}

/**
* 从数据中提取活动告警
*/
extractActiveAlarms(data: Record<string, any>): Array<{ code: string; name: string }> {
const alarms: Array<{ code: string; name: string }> = [];
const alarmData = data.alarms as Record<string, boolean> | undefined;

if (!alarmData) return alarms;

for (const def of this.alarmDefinitions) {
const fieldKey = def.field.replace('alarms.', '');
if (alarmData[fieldKey]) {
alarms.push({ code: def.code, name: def.name });
}
}

return alarms;
}

/**
* 验证数据完整性
*/
validateData(data: Record<string, any>): boolean {
return (
typeof data.temperature === 'number' &&
typeof data.status === 'number'
);
}

/**
* 获取设备状态文本
*/
getStatusText(data: Record<string, any>): string {
const status = data.status;
const statusMap: Record<number, string> = {
0: '停止',
1: '待机',
2: '运行中',
3: '报警',
};
return statusMap[status] ?? '未知';
}

/**
* 获取设备状态级别 (用于排序和显示)
*/
getStatusLevel(data: Record<string, any>): 'alarm' | 'running' | 'standby' | 'off' {
const status = data.status;
if (status === 3) return 'alarm';
if (status === 2) return 'running';
if (status === 1) return 'standby';
return 'off';
}
}

// 在工厂中注册
export class DeviceServiceAdapterFactory {
private static adapters: Map<DeviceType, DeviceServiceAdapter> = new Map();

static {
this.register(new HotWaterBoilerServiceAdapter());
this.register(new SteamBoilerServiceAdapter());
this.register(new NewDeviceServiceAdapter()); // 注册新适配器
}

static register(adapter: DeviceServiceAdapter): void {
this.adapters.set(adapter.deviceType, adapter);
}

static getAdapter(deviceType: DeviceType): DeviceServiceAdapter | undefined {
return this.adapters.get(deviceType);
}
}

步骤 4: 创建前端展示适配器

编辑 apps/web/src/adapters/deviceAdapter.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/**
* 新设备适配器
*/
export class NewDeviceAdapter implements DeviceAdapter {
deviceType: DeviceType = 'new_device_type';

private data: Record<string, any>;
private t: TranslationFunction; // 国际化函数

constructor(data: Record<string, any>, t: TranslationFunction) {
this.data = data;
this.t = t;
}

/**
* 获取状态文本
*/
getStatusText(): string {
const status = this.data.status;
if (status === undefined || status === null) return '-';

const statusMap: Record<number, string> = {
0: this.t('device.stopped'),
1: this.t('device.standby'),
2: this.t('device.running'),
3: this.t('device.alarm'),
};
return statusMap[status] ?? '-';
}

/**
* 获取状态样式类
*/
getStatusClass(): string {
const status = this.data.status;
if (status === 3) return 'status-alarm';
if (status === 2) return 'status-running';
if (status === 1) return 'status-standby';
return 'status-off';
}

/**
* 获取详情页分组数据
*/
getSections(): DetailSection[] {
return [
{
title: this.t('device.tempData'),
fields: [
{
key: 'temperature',
label: this.t('device.temperature'),
value: this.data.temperature?.toFixed(1) ?? '-',
unit: '℃',
},
{
key: 'pressure',
label: this.t('device.pressure'),
value: this.data.pressure?.toFixed(2) ?? '-',
unit: 'MPa',
},
],
},
{
title: this.t('device.deviceStatus'),
fields: [
{
key: 'status',
label: this.t('device.status'),
value: this.getStatusText(),
statusClass: this.getStatusClass(),
},
],
},
{
title: this.t('device.alarmStatus'),
fields: this.getAlarmFields(),
},
];
}

/**
* 获取报警字段列表
*/
private getAlarmFields(): DetailField[] {
const alarms = this.data.alarms as Record<string, boolean> | undefined;
if (!alarms) return [];

const alarmLabels: Record<string, string> = {
highTemp: this.t('device.highTempAlarm'),
lowPressure: this.t('device.lowPressureAlarm'),
};

return Object.entries(alarms)
.filter(([, active]) => active)
.map(([key]) => ({
key,
label: alarmLabels[key] ?? key,
value: this.t('device.alarm'),
statusClass: 'status-alarm',
}));
}
}

// 在工厂函数中注册
export function createDeviceAdapter(data: Record<string, any>, t: TranslationFunction): DeviceAdapter {
const deviceType = normalizeDeviceType(data.deviceType as DeviceTypeWithLegacy);

if (deviceType === 'hot_water_boiler') {
return new HotWaterBoilerAdapter(data, t);
}

if (deviceType === 'steam_boiler') {
return new SteamBoilerAdapter(data, t);
}

if (deviceType === 'new_device_type') {
return new NewDeviceAdapter(data, t); // 新设备适配器
}

// 默认返回热水锅炉适配器
return new HotWaterBoilerAdapter(data, t);
}

步骤 5: 添加国际化翻译

编辑 apps/web/src/i18n/locales/zh-CN.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
export default {
// ...
dashboard: {
// ...
newDeviceType: '新设备类型',
newDeviceSection: '新设备',
},

device: {
// ...
temperature: '温度',
pressure: '压力',
status: '运行状态',
highTempAlarm: '温度过高报警',
lowPressureAlarm: '压力过低报警',
// ... 其他字段翻译
},
};

编辑 apps/web/src/i18n/locales/en-US.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
export default {
// ...
dashboard: {
// ...
newDeviceType: 'New Device Type',
newDeviceSection: 'New Device',
},

device: {
// ...
temperature: 'Temperature',
pressure: 'Pressure',
status: 'Status',
highTempAlarm: 'High Temperature Alarm',
lowPressureAlarm: 'Low Pressure Alarm',
},
};

步骤 6: 创建模拟器脚本

创建 scripts/simulator-new-device.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 新设备模拟器
* 用法: node scripts/simulator-new-device.js <设备ID> <设备名称>
*/

const mqtt = require('mqtt');
const fs = require('fs');
const path = require('path');

const configPath = path.join(__dirname, '../configs/db.json');
const config = JSON.parse(fs.readFileSync(configPath, 'utf8'));
const mqttConfig = config.mqtt;

const deviceId = process.argv[2] || 'default_device_id';
const deviceName = process.argv[3] || '默认设备';
const deviceType = 'new_device_type';

const client = mqtt.connect(`mqtt://${mqttConfig.host}:${mqttConfig.port}`, {
clientId: `simulator_${deviceId}_${Date.now()}`,
username: mqttConfig.username,
password: mqttConfig.password,
});

client.on('connect', () => {
console.log('✅ 已连接到 MQTT Broker');

setInterval(() => {
// 模拟数据
const rData = [
{ name: 'temp', value: String(Math.round(250 + Math.random() * 50)), err: '0' },
{ name: 'press', value: String(Math.round(80 + Math.random() * 20)), err: '0' },
{ name: 'status', value: String(Math.random() > 0.1 ? 2 : 3), err: '0' },
{ name: 'high_temp_alm', value: String(Math.random() > 0.9 ? 43690 : 21845), err: '0' },
{ name: 'low_press_alm', value: '21845', err: '0' },
];

const payload = {
params: {
dir: 'up',
id: deviceId,
MAC: 'D4AD20D579E8',
r_data: rData,
},
};

const topic = `device/${deviceType}/${deviceName}`;
client.publish(topic, JSON.stringify(payload));
console.log(`📤 发送数据到 ${topic}`);
}, 5000);
});

client.on('error', (err) => {
console.error('❌ MQTT 连接错误:', err.message);
});

四、适配器接口定义

4.1 数据解析适配器接口

1
2
3
4
5
6
7
8
9
10
11
12
13
// packages/types/src/payload-parser.ts

export interface DeviceParserAdapter {
readonly deviceType: DeviceType;

/**
* 解析 MQTT 原始数据
* @param dataMap r_data 数组转换的 Map
* @param params MQTT payload 中的 params 对象
* @returns 解析后的标准化数据
*/
parse(dataMap: Map<string, MqttDataItem>, params: MqttPayloadNew['params']): Record<string, any>;
}

4.2 服务端业务适配器接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// apps/server/src/common/adapters/deviceAdapter.ts

export interface DeviceServiceAdapter {
readonly deviceType: DeviceType;

/**
* 获取该设备类型的告警定义
*/
getAlarmDefinitions(): AlarmDefinition[];

/**
* 从数据中提取活动告警
*/
extractActiveAlarms(data: Record<string, any>): Array<{ code: string; name: string }>;

/**
* 验证数据完整性
*/
validateData(data: Record<string, any>): boolean;

/**
* 获取设备状态文本
*/
getStatusText(data: Record<string, any>): string;

/**
* 获取设备状态级别
*/
getStatusLevel(data: Record<string, any>): 'alarm' | 'running' | 'standby' | 'off';
}

4.3 前端展示适配器接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// apps/web/src/adapters/deviceAdapter.ts

export interface DeviceAdapter {
deviceType: DeviceType;

/**
* 获取详情页分组数据
*/
getSections(): DetailSection[];

/**
* 获取状态文本
*/
getStatusText(): string;

/**
* 获取状态样式类
*/
getStatusClass(): string;
}

export interface DetailSection {
title: string;
fields: DetailField[];
}

export interface DetailField {
key: string;
label: string;
value: string | number;
unit?: string;
statusClass?: string;
}

五、数据流转过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1. 设备上报 MQTT 消息

2. 服务端接收消息 (apps/server/src/modules/mqtt/mqtt.service.ts)

3. 调用数据解析适配器 (packages/types/src/payload-parser.ts)
- 将 r_data 数组转换为 Map
- 根据设备类型获取对应适配器
- 解析为标准化数据结构

4. 调用服务端业务适配器 (apps/server/src/common/adapters/deviceAdapter.ts)
- 提取活动告警
- 判断设备状态
- 验证数据完整性

5. 存储到数据库
- 更新实时数据表
- 插入历史数据表
- 记录告警

6. 前端通过 WebSocket 接收实时数据

7. 调用前端展示适配器 (apps/web/src/adapters/deviceAdapter.ts)
- 生成详情页分组数据
- 转换状态文本和样式

8. 渲染设备卡片和详情页

六、开发检查清单

添加新设备类型时,请确保完成以下检查:

  • packages/types/src/mqtt.ts 中定义设备类型
  • packages/types/src/payload-parser.ts 中创建解析适配器并注册
  • apps/server/src/common/adapters/deviceAdapter.ts 中创建服务适配器并注册
  • apps/web/src/adapters/deviceAdapter.ts 中创建展示适配器
  • 添加中英文国际化翻译
  • 创建模拟器脚本用于测试
  • 测试数据上报、解析、展示流程
  • 测试告警提取和恢复逻辑

七、常见问题

Q1: 如何处理旧设备类型兼容?

LEGACY_DEVICE_TYPE_MAP 中添加映射关系,系统会自动转换:

1
2
3
export const LEGACY_DEVICE_TYPE_MAP: Record<LegacyDeviceType, DeviceType> = {
OLD_TYPE: 'new_device_type',
};

Q2: 如何处理不同的数据换算规则?

在解析适配器中处理:

1
2
3
4
5
6
7
8
// 温度需要 ÷10
temperature: getNumericValue('temp') / 10,

// 压力需要 ÷100
pressure: getNumericValue('press') / 100,

// 直接使用
status: getNumericValue('status'),

Q3: 如何处理 bit 位解析?

使用位运算:

1
2
3
4
5
const parseStatusWord = (value: number) => ({
autoMode: ((value >> 14) & 0x03) === 0,
running: ((value >> 8) & 0x0f) === 0x0f,
output: value & 0xff,
});

Q4: 如何添加新的告警类型?

  1. 在解析适配器中解析告警字段
  2. 在服务适配器中添加告警定义
  3. 在前端适配器中添加告警标签翻译

八、小结

本文详细介绍了通过适配器模式扩展设备类型的完整流程:

  1. 三层适配器架构 - 数据解析层、服务端业务层、前端展示层
  2. 六个扩展步骤 - 从类型定义到模拟器测试
  3. 接口定义 - 每层适配器的标准接口
  4. 数据流转 - 从 MQTT 上报到前端展示的完整过程

关键要点:

  • 每层适配器职责单一,便于维护
  • 工厂模式统一管理适配器注册和获取
  • 国际化支持多语言
  • 模拟器方便开发和测试

物联网之适配器扩展设备(五)
https://cszy.top/20260406-物联网之适配器扩展设备(五)/
作者
csorz
发布于
2026年4月6日
许可协议