-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrtmp-server.js
More file actions
286 lines (250 loc) · 7.44 KB
/
rtmp-server.js
File metadata and controls
286 lines (250 loc) · 7.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
/**
* RTMP 服务器模块
* 接收 OBS 推流,转换为 HLS 供浏览器播放
*/
const NodeMediaServer = require('node-media-server');
const path = require('path');
const fs = require('fs');
// 直播文件存储目录 (使用 uploads 目录本身作为 mediaroot)
const UPLOADS_DIR = path.join(__dirname, 'uploads');
const LIVE_DIR = path.join(UPLOADS_DIR, 'live');
// 确保直播目录存在
if (!fs.existsSync(LIVE_DIR)) {
fs.mkdirSync(LIVE_DIR, { recursive: true });
}
// 存储活跃的推流会话 { streamKey -> { roomId, socketId, startedAt } }
const activeStreams = new Map();
// 回调函数存储
let onStreamStart = null;
let onStreamEnd = null;
// 检测 FFmpeg 路径
function detectFfmpegPath() {
const { execSync } = require('child_process');
const possiblePaths = [
'/usr/local/bin/ffmpeg',
'/opt/homebrew/bin/ffmpeg',
'/usr/bin/ffmpeg',
'ffmpeg' // fallback to PATH
];
for (const ffmpegPath of possiblePaths) {
try {
execSync(`${ffmpegPath} -version`, { stdio: 'ignore' });
console.log(`[RTMP] 检测到 FFmpeg: ${ffmpegPath}`);
return ffmpegPath;
} catch (e) {
// continue to next path
}
}
console.warn('[RTMP] 警告: 未检测到 FFmpeg,HLS 转码将无法工作');
return 'ffmpeg';
}
const FFMPEG_PATH = detectFfmpegPath();
/**
* node-media-server 配置
*/
const config = {
rtmp: {
port: 1935,
chunk_size: 60000,
gop_cache: true,
ping: 30,
ping_timeout: 60
},
http: {
port: 8888,
mediaroot: UPLOADS_DIR, // 使用 uploads 作为 mediaroot
allow_origin: '*'
},
trans: {
ffmpeg: FFMPEG_PATH,
tasks: [
{
app: 'live',
hls: true,
hlsFlags: '[hls_time=2:hls_list_size=5:hls_flags=delete_segments+append_list]',
hlsKeep: false, // 直播结束后删除 HLS 文件
dash: false
}
]
},
logType: 4 // 1: Fatal, 2: Error, 3: Normal, 4: Debug (启用调试日志)
};
let nms = null;
/**
* 启动 RTMP 服务器
* @param {Object} callbacks - 回调函数
* @param {Function} callbacks.onStreamStart - 推流开始回调
* @param {Function} callbacks.onStreamEnd - 推流结束回调
*/
function start(callbacks = {}) {
onStreamStart = callbacks.onStreamStart || null;
onStreamEnd = callbacks.onStreamEnd || null;
nms = new NodeMediaServer(config);
// 推流前验证
nms.on('prePublish', (id, streamPath, args) => {
console.log('[RTMP] 推流请求:', streamPath, 'Session:', id);
// streamPath 格式: /live/{streamKey}
const parts = streamPath.split('/');
const streamKey = parts[parts.length - 1];
// 检查 streamKey 是否已注册
const streamInfo = activeStreams.get(streamKey);
if (!streamInfo) {
console.log('[RTMP] 拒绝未注册的推流:', streamKey);
const session = nms.getSession(id);
if (session) {
session.reject();
}
return;
}
console.log('[RTMP] 推流已验证:', streamKey, '房间:', streamInfo.roomId);
// 更新推流状态
streamInfo.isLive = true;
streamInfo.sessionId = id;
activeStreams.set(streamKey, streamInfo);
// 触发回调 - HLS 文件路径: uploads/live/{streamKey}/index.m3u8
if (onStreamStart) {
const hlsUrl = `/uploads/live/${streamKey}/index.m3u8`;
onStreamStart({
streamKey,
roomId: streamInfo.roomId,
socketId: streamInfo.socketId,
hlsUrl
});
}
});
// 推流结束
nms.on('donePublish', (id, streamPath, args) => {
console.log('[RTMP] 推流结束:', streamPath);
const parts = streamPath.split('/');
const streamKey = parts[parts.length - 1];
const streamInfo = activeStreams.get(streamKey);
if (streamInfo) {
// 触发回调
if (onStreamEnd) {
onStreamEnd({
streamKey,
roomId: streamInfo.roomId,
socketId: streamInfo.socketId
});
}
// 清理推流信息
activeStreams.delete(streamKey);
// 延迟清理 HLS 文件
setTimeout(() => {
cleanupStreamFiles(streamKey);
}, 5000);
}
});
// 连接事件
nms.on('preConnect', (id, args) => {
console.log('[RTMP] 客户端连接:', id);
});
nms.on('doneConnect', (id, args) => {
console.log('[RTMP] 客户端断开:', id);
});
nms.run();
console.log(`[RTMP] 服务器已启动,端口: ${config.rtmp.port}`);
}
/**
* 停止 RTMP 服务器
*/
function stop() {
if (nms) {
nms.stop();
nms = null;
console.log('[RTMP] 服务器已停止');
}
}
/**
* 注册推流密钥
* @param {string} streamKey - 推流密钥
* @param {Object} info - 推流信息
* @returns {Object} - RTMP 推流地址信息
*/
function registerStream(streamKey, info) {
activeStreams.set(streamKey, {
...info,
isLive: false,
registeredAt: Date.now()
});
console.log('[RTMP] 注册推流密钥:', streamKey, '房间:', info.roomId);
return {
rtmpUrl: `rtmp://localhost:${config.rtmp.port}/live`,
streamKey,
hlsUrl: `/uploads/live/${streamKey}/index.m3u8`
};
}
/**
* 注销推流密钥
* @param {string} streamKey - 推流密钥
*/
function unregisterStream(streamKey) {
const streamInfo = activeStreams.get(streamKey);
if (streamInfo) {
// 如果正在推流,断开连接
if (streamInfo.isLive && streamInfo.sessionId && nms) {
const session = nms.getSession(streamInfo.sessionId);
if (session) {
session.reject();
}
}
activeStreams.delete(streamKey);
console.log('[RTMP] 注销推流密钥:', streamKey);
}
}
/**
* 获取推流状态
* @param {string} streamKey - 推流密钥
* @returns {Object|null} - 推流信息
*/
function getStreamStatus(streamKey) {
return activeStreams.get(streamKey) || null;
}
/**
* 根据房间 ID 获取推流信息
* @param {string} roomId - 房间 ID
* @returns {Object|null} - 推流信息
*/
function getStreamByRoomId(roomId) {
for (const [streamKey, info] of activeStreams.entries()) {
if (info.roomId === roomId && info.isLive) {
return { streamKey, ...info };
}
}
return null;
}
/**
* 清理推流文件
* @param {string} streamKey - 推流密钥
*/
function cleanupStreamFiles(streamKey) {
const streamDir = path.join(LIVE_DIR, streamKey);
if (fs.existsSync(streamDir)) {
try {
fs.rmSync(streamDir, { recursive: true, force: true });
console.log('[RTMP] 已清理直播文件:', streamKey);
} catch (err) {
console.error('[RTMP] 清理直播文件失败:', err.message);
}
}
}
/**
* 获取 RTMP 配置信息
* @returns {Object} - 配置信息
*/
function getConfig() {
return {
rtmpPort: config.rtmp.port,
httpPort: config.http.port
};
}
module.exports = {
start,
stop,
registerStream,
unregisterStream,
getStreamStatus,
getStreamByRoomId,
getConfig,
LIVE_DIR
};