feat(video): 实现基于视频流播放WebSocket的FLV功能

- 修改了Java后端代码,将视频转码服务从HLS改为FLV格式输出
- 删除了原有的HLS相关控制器和转码逻辑
- 新增了VideoWebSocketServer和VideoServerPool类来管理WebSocket连接和视频流传输- 更新了前端Vue组件,使用flv.js替代hls.js来播放视频流- 增加了WebSocket通信机制,通过二进制数据传输FLV视频流
- 移除了旧的转码状态检查和控制逻辑
- 优化了FFmpeg命令参数以适配FLV流媒体传输
- 添加了WebSocket服务器启动配置,并集成到应用启动流程中
dev_flv
wangyan 2025-11-01 20:56:12 +08:00
parent 2063d36571
commit 6bbd9cd4b5
7 changed files with 602 additions and 375 deletions

View File

@ -2,6 +2,7 @@ package com.zcloud.config;
import com.zcloud.plugins.websocketFace.FaceServer; import com.zcloud.plugins.websocketFace.FaceServer;
import com.zcloud.util.Const; import com.zcloud.util.Const;
import com.zcloud.websocket.VideoWebSocketServer;
import org.java_websocket.WebSocketImpl; import org.java_websocket.WebSocketImpl;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
@ -19,9 +20,11 @@ import org.springframework.stereotype.Component;
public class StartWebsocketServer implements ApplicationRunner{ public class StartWebsocketServer implements ApplicationRunner{
@Value("${smb.viteFileUrl}") @Value("${smb.viteFileUrl}")
private String HTTPFILEURL; private String HTTPFILEURL;
@Override @Override
public void run(ApplicationArguments var1) throws Exception{ public void run(ApplicationArguments var1) throws Exception{
startWebsocketInstantFace(); //启动定时人脸识别服务 // startWebsocketInstantFace(); //启动定时人脸识别服务
startVideoWebSocketServer(); //启动视频WebSocket服务
Const.HTTPFILEURL = HTTPFILEURL; Const.HTTPFILEURL = HTTPFILEURL;
System.out.println("-------------------系统启动成功-------------------"); System.out.println("-------------------系统启动成功-------------------");
} }
@ -40,4 +43,20 @@ public class StartWebsocketServer implements ApplicationRunner{
e.printStackTrace(); e.printStackTrace();
} }
} }
}
/**
* WebSocket
*/
public void startVideoWebSocketServer() {
WebSocketImpl.DEBUG = false;
VideoWebSocketServer v;
try {
String port = "8888"; // 视频流WebSocket端口
v = new VideoWebSocketServer(Integer.parseInt(port));
v.start();
System.out.println("视频WebSocket服务器已在端口 " + port + " 启动");
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -174,126 +174,173 @@ public class PlatformvideomanagementController extends BaseController {
// }); // });
// }); // });
ArrayList<PageData> remoteDataList = new ArrayList<>(); ArrayList<PageData> remoteDataList = new ArrayList<>();
// // 炼钢渣跨-炉渣跨中 PageData v1 = new PageData();
// PageData v1 = new PageData(); v1.put("regionPathName", "炼钢厂-通道1");
// v1.put("regionPathName", "炼钢渣跨-炉渣跨中"); v1.put("regionName", "调度");
// v1.put("regionName", "调度"); v1.put("name", "炼钢厂-通道1");
// v1.put("name", "炼钢渣跨-炉渣跨中"); v1.put("PLS_ID", "1");
// v1.put("PLS_ID", "1"); v1.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=1&subtype=0");
// v1.put("url", "rtsp://admin:at123456@172.168.8.232/Streaming/Channels/902"); remoteDataList.add(v1);
// remoteDataList.add(v1);
// 炼钢渣跨-炉渣跨南
PageData v2 = new PageData(); PageData v2 = new PageData();
v2.put("regionPathName", "炼钢渣跨-炉渣跨南"); v2.put("regionPathName", "炼钢渣跨-炉渣跨南");
v2.put("regionName", "调度"); v2.put("regionName", "调度");
v2.put("name", "炼钢渣跨-炉渣跨南"); v2.put("name", "炼钢渣跨-炉渣跨南");
v2.put("PLS_ID", "2"); v2.put("PLS_ID", "2");
v2.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=2&subtype=1"); v2.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=2&subtype=0");
remoteDataList.add(v2); remoteDataList.add(v2);
// 转炉工作平台-1#转炉炉前
PageData v3 = new PageData(); PageData v3 = new PageData();
v3.put("regionPathName", "转炉工作平台-1#转炉炉前"); v3.put("regionPathName", "转炉工作平台-1#转炉炉前");
v3.put("regionName", "调度"); v3.put("regionName", "调度");
v3.put("name", "转炉工作平台-1#转炉炉前"); v3.put("name", "转炉工作平台-1#转炉炉前");
v3.put("PLS_ID", "3"); v3.put("PLS_ID", "3");
v3.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=3&subtype=1"); v3.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=3&subtype=0");
remoteDataList.add(v3); remoteDataList.add(v3);
// 转炉工作平台-2#炉平台
PageData v4 = new PageData(); PageData v4 = new PageData();
v4.put("regionPathName", "转炉工作平台-2#转炉炉前"); v4.put("regionPathName", "转炉工作平台-2#转炉炉前");
v4.put("regionName", "调度"); v4.put("regionName", "调度");
v4.put("name", "转炉工作平台-2#转炉炉前"); v4.put("name", "转炉工作平台-2#转炉炉前");
v4.put("PLS_ID", "4"); v4.put("PLS_ID", "4");
v4.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=4&subtype=1"); v4.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=4&subtype=0");
remoteDataList.add(v4); remoteDataList.add(v4);
// 精炼炉工作平台-1#精炼炉炉前
// PageData v5 = new PageData(); PageData v5 = new PageData();
// v5.put("regionPathName", "精炼炉工作平台-1#精炼炉炉前"); v5.put("regionPathName", "转炉炉下区域-1#马道");
// v5.put("regionName", "调度"); v5.put("regionName", "调度");
// v5.put("name", "精炼炉工作平台-1#精炼炉炉前"); v5.put("name", "转炉炉下区域-1#马道");
// v5.put("PLS_ID", "5"); v5.put("PLS_ID", "5");
// v5.put("url", "172.168.8.136"); v5.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=5&subtype=0");
// remoteDataList.add(v5); remoteDataList.add(v5);
// 精炼炉工作平台-2#精炼炉炉前
// PageData v6 = new PageData(); PageData v6 = new PageData();
// v6.put("regionPathName", "精炼炉工作平台-2#精炼炉炉前"); v6.put("regionPathName", "转炉炉下区域-2#马道");
// v6.put("regionName", "调度"); v6.put("regionName", "调度");
// v6.put("name", "精炼炉工作平台-2#精炼炉炉前"); v6.put("name", "转炉炉下区域-2#马道");
// v6.put("PLS_ID", "6"); v6.put("PLS_ID", "6");
// v6.put("url", "172.168.8.155"); v6.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=6&subtype=0");
// remoteDataList.add(v6); remoteDataList.add(v6);
// 转炉炉下区域-1#马道
PageData v7 = new PageData(); PageData v7 = new PageData();
v7.put("regionPathName", "转炉炉下区域-1#马道"); v7.put("regionPathName", "连铸平台3#");
v7.put("regionName", "调度"); v7.put("regionName", "调度");
v7.put("name", "转炉炉下区域-1#马道"); v7.put("name", "连铸平台3#");
v7.put("PLS_ID", "7"); v7.put("PLS_ID", "7");
v7.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=5&subtype=1"); v7.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=7&subtype=0");
remoteDataList.add(v7); remoteDataList.add(v7);
// 转炉炉下区域-2#马道
PageData v8 = new PageData(); PageData v8 = new PageData();
v8.put("regionPathName", "转炉炉下区域-2#马道"); v8.put("regionPathName", "连铸平台2#");
v8.put("regionName", "调度"); v8.put("regionName", "调度");
v8.put("name", "转炉炉下区域-2#马道"); v8.put("name", "连铸平台2#");
v8.put("PLS_ID", "8"); v8.put("PLS_ID", "8");
v8.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=6&subtype=0"); v8.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=8&subtype=0");
remoteDataList.add(v8); remoteDataList.add(v8);
// 精炼炉炉下区域-1#炉
// PageData v9 = new PageData(); PageData v9 = new PageData();
// v9.put("regionPathName", "精炼炉炉下区域-1#炉"); v9.put("regionPathName", "连铸平台1#");
// v9.put("regionName", "调度"); v9.put("regionName", "调度");
// v9.put("name", "精炼炉炉下区域-1#炉"); v9.put("name", "连铸平台1#");
// v9.put("PLS_ID", "9"); v9.put("PLS_ID", "9");
// v9.put("url", "无"); v9.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=9&subtype=0");
// remoteDataList.add(v9); remoteDataList.add(v9);
// 精炼炉炉下区域-2#炉
// PageData v10 = new PageData(); PageData v10 = new PageData();
// v10.put("regionPathName", "精炼炉炉下区域-2#炉"); v10.put("regionPathName", "炼钢厂-通道10");
// v10.put("regionName", "调度"); v10.put("regionName", "调度");
// v10.put("name", "精炼炉炉下区域-2#炉"); v10.put("name", "炼钢厂-通道10");
// v10.put("PLS_ID", "10"); v10.put("PLS_ID", "10");
// v10.put("url", "无"); v10.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=10&subtype=0");
// remoteDataList.add(v10); remoteDataList.add(v10);
// 连铸平台1#
PageData v11 = new PageData(); PageData v11 = new PageData();
v11.put("regionPathName", "连铸平台1#"); v11.put("regionPathName", "炼钢厂-通道11");
v11.put("regionName", "调度"); v11.put("regionName", "调度");
v11.put("name", "连铸平台1#"); v11.put("name", "炼钢厂-通道11");
v11.put("PLS_ID", "11"); v11.put("PLS_ID", "11");
v11.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=9&subtype=1"); v11.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=11&subtype=0");
remoteDataList.add(v11); remoteDataList.add(v11);
// 连铸平台2#
PageData v12 = new PageData(); PageData v12 = new PageData();
v12.put("regionPathName", "连铸平台2#"); v12.put("regionPathName", "炼钢厂-通道12");
v12.put("regionName", "调度"); v12.put("regionName", "调度");
v12.put("name", "连铸平台2#"); v12.put("name", "炼钢厂-通道12");
v12.put("PLS_ID", "12"); v12.put("PLS_ID", "12");
v12.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=8&subtype=1"); v12.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=12&subtype=0");
remoteDataList.add(v12); remoteDataList.add(v12);
// 连铸平台3#
PageData v13 = new PageData(); PageData v13 = new PageData();
v13.put("regionPathName", "连铸平台3#"); v13.put("regionPathName", "炼钢厂-通道13");
v13.put("regionName", "调度"); v13.put("regionName", "调度");
v13.put("name", "连铸平台3#"); v13.put("name", "炼钢厂-通道13");
v13.put("PLS_ID", "13"); v13.put("PLS_ID", "13");
v13.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=7&subtype=1"); v13.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=13&subtype=0");
remoteDataList.add(v13); remoteDataList.add(v13);
// // 钢水跨南#
// PageData v14 = new PageData(); PageData v14 = new PageData();
// v14.put("regionPathName", "调试#"); v14.put("regionPathName", "炼铁厂-通道1");
// v14.put("regionName", "调度"); v14.put("regionName", "调度");
// v14.put("name", "调试#"); v14.put("name", "炼铁厂-通道1");
// v14.put("PLS_ID", "14"); v14.put("PLS_ID", "14");
// v14.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=2&subtype=1"); v14.put("url", "rtsp://admin:xintai@1190@172.16.70.167:554/Streaming/Channels/101");
// remoteDataList.add(v14); remoteDataList.add(v14);
// // 钢水跨南#
// PageData v15 = new PageData(); PageData v15 = new PageData();
// v15.put("regionPathName", "264测试#"); v15.put("regionPathName", "炼铁厂-通道2");
// v15.put("regionName", "调度"); v15.put("regionName", "调度");
// v15.put("name", "264测试#"); v15.put("name", "炼铁厂-通道2");
// v15.put("PLS_ID", "15"); v15.put("PLS_ID", "15");
// v15.put("url", "rtsp://admin:at123456@172.16.70.168:554/cam/realmonitor?channel=3&subtype=1"); v15.put("url", "rtsp://admin:xintai@1190@172.16.70.167:554/Streaming/Channels/201");
// remoteDataList.add(v15); remoteDataList.add(v15);
PageData v16 = new PageData();
v16.put("regionPathName", "炼铁厂-通道3");
v16.put("regionName", "调度");
v16.put("name", "炼铁厂-通道3");
v16.put("PLS_ID", "16");
v16.put("url", "rtsp://admin:xintai@1190@172.16.70.167:554/Streaming/Channels/301");
remoteDataList.add(v16);
PageData v17 = new PageData();
v17.put("regionPathName", "炼铁厂-通道4");
v17.put("regionName", "调度");
v17.put("name", "炼铁厂-通道4");
v17.put("PLS_ID", "17");
v17.put("url", "rtsp://admin:xintai@1190@172.16.70.167:554/Streaming/Channels/401");
remoteDataList.add(v17);
PageData v18 = new PageData();
v18.put("regionPathName", "炼铁厂-通道5");
v18.put("regionName", "调度");
v18.put("name", "炼铁厂-通道5");
v18.put("PLS_ID", "18");
v18.put("url", "rtsp://admin:xintai@1190@172.16.70.167:554/Streaming/Channels/501");
remoteDataList.add(v18);
PageData v19 = new PageData();
v19.put("regionPathName", "炼铁厂-通道6");
v19.put("regionName", "调度");
v19.put("name", "炼铁厂-通道6");
v19.put("PLS_ID", "19");
v19.put("url", "rtsp://admin:xintai@1190@172.16.70.167:554/Streaming/Channels/601");
remoteDataList.add(v19);
PageData v20 = new PageData();
v20.put("regionPathName", "炼铁厂-通道7");
v20.put("regionName", "调度");
v20.put("name", "炼铁厂-通道7");
v20.put("PLS_ID", "20");
v20.put("url", "rtsp://admin:xintai@1190@172.16.70.167:554/Streaming/Channels/701");
remoteDataList.add(v20);
PageData v21 = new PageData();
v21.put("regionPathName", "炼铁厂-通道8");
v21.put("regionName", "调度");
v21.put("name", "炼铁厂-通道8");
v21.put("PLS_ID", "21");
v21.put("url", "rtsp://admin:xintai@1190@172.16.70.167:554/Streaming/Channels/801");
remoteDataList.add(v21);
return ReturnMap.ok().put("page", page).put("varList", remoteDataList); return ReturnMap.ok().put("page", page).put("varList", remoteDataList);
} }

View File

@ -1,47 +0,0 @@
package com.zcloud.controller.video;
import com.zcloud.controller.base.BaseController;
import com.zcloud.entity.PageData;
import com.zcloud.service.video.RtspToHlsService;
import com.zcloud.util.Jurisdiction;
import com.zcloud.util.R;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import java.util.Map;
@Controller
@RequestMapping("/playVideo")
public class PlayVideoController extends BaseController {
@Resource
private RtspToHlsService rtspToHlsService;
@RequestMapping(value = "/startTranscode")
@ResponseBody
public Object startTranscode() throws Exception {
PageData param = this.getPageData();
param.put("USER_ID", Jurisdiction.getUSER_ID());
rtspToHlsService.startTranscode(param);
return R.ok().put("videoUrl", "/file/hls/" + param.getString("id") + "/" + param.getString("USER_ID") + "/");
}
@RequestMapping(value = "/stopTranscode")
@ResponseBody
public Object stopTranscode() throws Exception {
PageData param = this.getPageData();
param.put("USER_ID", Jurisdiction.getUSER_ID());
rtspToHlsService.stopTranscode(param);
return R.ok();
}
@RequestMapping("/getTranscodeStatus")
@ResponseBody
public Object getTranscodeStatus() {
PageData param = this.getPageData();
param.put("USER_ID", Jurisdiction.getUSER_ID());
Map<String, Object> transcodeStatus = rtspToHlsService.getTranscodeStatus(param);
return R.ok(transcodeStatus);
}
}

View File

@ -0,0 +1,111 @@
package com.zcloud.service.video;
import cn.hutool.core.io.resource.ClassPathResource;
import com.zcloud.entity.PageData;
import org.springframework.stereotype.Service;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class RtspToFlvService {
// 转码任务状态管理 - 使用ConcurrentHashMap保证线程安全
private final Map<String, TranscodeTask> transcodeTasks = new ConcurrentHashMap<>();
// 转码任务内部类,用于跟踪每个任务的状态
private static class TranscodeTask {
String videoId;
String userId;
Process ffmpegProcess;
String status; // 状态processing, stopped, completed, failed
int progress; // 进度百分比 0-100
long startTime;
public TranscodeTask(String videoId, String userId) {
this.videoId = videoId;
this.userId = userId;
this.status = "processing";
this.progress = 0;
this.startTime = System.currentTimeMillis();
}
}
// 获取项目内的FFmpeg路径
private String getFfmpegPath() {
// 判断操作系统
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("linux")) {
// 直接返回系统ffmpeg路径
return "/usr/bin/ffmpeg";
} else {
// 其他系统保持原有逻辑
String ffmpegRelativePath;
if (os.contains("win")) {
ffmpegRelativePath = "ffmpeg/bin/ffmpeg.exe";
} else if (os.contains("mac")) {
ffmpegRelativePath = "ffmpeg/macos/ffmpeg";
} else {
throw new RuntimeException("不支持的操作系统");
}
try {
ClassPathResource resource = new ClassPathResource(ffmpegRelativePath);
File ffmpegFile = resource.getFile();
if (!os.contains("win")) {
ffmpegFile.setExecutable(true);
}
return ffmpegFile.getAbsolutePath();
} catch (Exception e) {
throw new RuntimeException("无法获取FFmpeg文件路径", e);
}
}
}
// 启动转码为FLV用于WebSocket流传输
public Process startFlvTranscode(String rtspUrl) throws IOException {
String ffmpegPath = getFfmpegPath();
// 构建FFmpeg命令确保生成标准FLV格式
String[] cmd = {
ffmpegPath,
"-rtsp_transport", "tcp",
"-i", rtspUrl,
"-c:v", "libx264",
"-preset", "ultrafast", // 使用快速编码预设
"-tune", "zerolatency", // 零延迟优化
"-profile:v", "baseline", // 使用baseline配置提高兼容性
"-an", // 禁用音频(不处理音频流)
"-f", "flv",
"-flvflags", "no_duration_filesize", // 不写入duration和filesize字段避免非标准FLV头部
"pipe:1"
};
ProcessBuilder pb = new ProcessBuilder(cmd);
pb.redirectErrorStream(false); // 不合并错误输出到标准输出,避免干扰视频流
Process process = pb.start();
System.out.println("FFmpeg FLV转码已启动RTSP地址" + rtspUrl);
return process;
}
// 生成唯一任务ID
private String generateTaskId(String videoId, String userId) {
return videoId + "_" + userId;
}
// 停止所有转码任务(用于服务关闭等场景)
public void stopAllTranscodes() {
for (TranscodeTask task : transcodeTasks.values()) {
if (task.ffmpegProcess != null) {
task.ffmpegProcess.destroy();
task.status = "stopped";
}
}
transcodeTasks.clear();
System.out.println("所有转码任务已停止");
}
}

View File

@ -1,244 +0,0 @@
package com.zcloud.service.video;
import cn.hutool.core.io.resource.ClassPathResource;
import com.zcloud.entity.PageData;
import org.springframework.stereotype.Service;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class RtspToHlsService {
// 输出目录改为项目外部避免打包后无法写入推荐用绝对路径如D:/hls或/opt/hls
private static final String HLS_PLAY_URL = "/hls/stream.m3u8";
// 转码任务状态管理 - 使用ConcurrentHashMap保证线程安全
private final Map<String, TranscodeTask> transcodeTasks = new ConcurrentHashMap<>();
// 转码任务内部类,用于跟踪每个任务的状态
private static class TranscodeTask {
String videoId;
String userId;
Process ffmpegProcess;
String outputPath;
String status; // 状态processing, stopped, completed, failed
int progress; // 进度百分比 0-100
long startTime;
String errorMessage;
public TranscodeTask(String videoId, String userId, String outputPath) {
this.videoId = videoId;
this.userId = userId;
this.outputPath = outputPath;
this.status = "processing";
this.progress = 0;
this.startTime = System.currentTimeMillis();
}
}
// 获取项目内的FFmpeg路径
private String getFfmpegPath() {
// 判断操作系统
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("linux")) {
// 直接返回系统ffmpeg路径
return "/usr/bin/ffmpeg";
} else {
// 其他系统保持原有逻辑
String ffmpegRelativePath;
if (os.contains("win")) {
ffmpegRelativePath = "ffmpeg/bin/ffmpeg.exe";
} else if (os.contains("mac")) {
ffmpegRelativePath = "ffmpeg/macos/ffmpeg";
} else {
throw new RuntimeException("不支持的操作系统");
}
try {
ClassPathResource resource = new ClassPathResource(ffmpegRelativePath);
File ffmpegFile = resource.getFile();
if (!os.contains("win")) {
ffmpegFile.setExecutable(true);
}
return ffmpegFile.getAbsolutePath();
} catch (Exception e) {
throw new RuntimeException("无法获取FFmpeg文件路径", e);
}
}
}
// 启动转码
public void startTranscode(PageData param) throws IOException {
String videoId = param.getString("id");
String userId = param.getString("USER_ID");
String RTSP_URL = param.getString("url");
String HLS_OUTPUT_PATH = "/mnt/file/hls/" + videoId + "/" + userId + "/";
// 创建输出目录
File outputDir = new File(HLS_OUTPUT_PATH);
if (!outputDir.exists()) {
outputDir.mkdirs();
}
String ffmpegPath = getFfmpegPath();
// 创建转码任务并存储
String taskId = generateTaskId(videoId, userId);
TranscodeTask task = new TranscodeTask(videoId, userId, HLS_OUTPUT_PATH);
transcodeTasks.put(taskId, task);
String[] cmd = {
ffmpegPath,
"-rtsp_transport", "tcp",
"-i", RTSP_URL,
"-c:v", "libx264",
"-c:a", "aac",
"-hls_time", "10",
"-hls_list_size", "3",
"-hls_flags", "delete_segments",
HLS_OUTPUT_PATH + "stream.m3u8"
};
ProcessBuilder pb = new ProcessBuilder(cmd);
pb.redirectErrorStream(true); // 合并错误输出到标准输出
Process process = pb.start();
task.ffmpegProcess = process;
// 异步读取FFmpeg输出日志并解析进度
new Thread(() -> {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
System.out.println("FFmpeg日志" + line);
// 尝试解析进度信息
parseProgress(line, task);
}
// 进程结束后更新状态
int exitCode = process.waitFor();
if (exitCode == 0) {
task.status = "completed";
task.progress = 100;
} else {
task.status = "failed";
task.errorMessage = "FFmpeg进程异常退出退出码" + exitCode;
}
System.out.println("转码任务结束,状态:" + task.status);
} catch (Exception e) {
if ("stopped".equals(task.status)) {
// 如果是主动停止,不记录为错误
return;
}
task.status = "failed";
task.errorMessage = e.getMessage();
e.printStackTrace();
}
}).start();
System.out.println("FFmpeg路径" + ffmpegPath);
System.out.println("RTSP转HLS已启动输出路径" + HLS_OUTPUT_PATH);
}
// 解析FFmpeg输出日志获取进度
private void parseProgress(String logLine, TranscodeTask task) {
// 简单的进度解析逻辑,根据实际日志格式调整
if (logLine.contains("frame=") && logLine.contains("time=")) {
// 这里只是示例,实际需要根据你的视频长度估算进度
// 对于实时流,可以根据运行时间简单递增进度
long runTime = (System.currentTimeMillis() - task.startTime) / 1000;
// 假设10分钟转码完成超过时间后保持95%
if (runTime < 600) {
task.progress = (int) (runTime * 100 / 600);
} else {
task.progress = 95;
}
}
}
// 停止指定视频的转码
public void stopTranscode(PageData param) {
String videoId = param.getString("id");
String userId = param.getString("USER_ID");
String taskId = generateTaskId(videoId, userId);
TranscodeTask task = transcodeTasks.get(taskId);
if (task != null && task.ffmpegProcess != null) {
task.ffmpegProcess.destroy();
task.status = "stopped";
System.out.println("RTSP转HLS已停止视频ID" + videoId + "用户ID" + userId);
}
}
// 获取转码状态
public Map<String, Object> getTranscodeStatus(PageData param) {
String videoId = param.getString("id");
String userId = param.getString("USER_ID");
String taskId = generateTaskId(videoId, userId);
TranscodeTask task = transcodeTasks.get(taskId);
Map<String, Object> statusInfo = new HashMap<>();
if (task == null) {
statusInfo.put("status", "not_found");
statusInfo.put("progress", 0);
return statusInfo;
}
statusInfo.put("status", task.status);
statusInfo.put("progress", task.progress);
statusInfo.put("videoId", task.videoId);
statusInfo.put("startTime", task.startTime);
if (task.errorMessage != null) {
statusInfo.put("error", task.errorMessage);
}
return statusInfo;
}
// 新增:查询加载进度
public Map<String, Object> getLoadProgress(PageData param) {
String videoId = param.getString("id");
String userId = param.getString("USER_ID");
String taskId = generateTaskId(videoId, userId);
TranscodeTask task = transcodeTasks.get(taskId);
Map<String, Object> progressInfo = new HashMap<>();
progressInfo.put("videoId", videoId);
progressInfo.put("userId", userId);
if (task != null) {
progressInfo.put("loadProgress", task.progress);
progressInfo.put("status", task.status);
} else {
progressInfo.put("loadProgress", 0);
progressInfo.put("status", "not_found");
}
return progressInfo;
}
// 生成唯一任务ID
private String generateTaskId(String videoId, String userId) {
return videoId + "_" + userId;
}
// 停止所有转码任务(用于服务关闭等场景)
public void stopAllTranscodes() {
for (TranscodeTask task : transcodeTasks.values()) {
if (task.ffmpegProcess != null) {
task.ffmpegProcess.destroy();
task.status = "stopped";
}
}
transcodeTasks.clear();
System.out.println("所有转码任务已停止");
}
}

View File

@ -0,0 +1,118 @@
package com.zcloud.websocket;
import org.java_websocket.WebSocket;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* WebSocket
*/
public class VideoServerPool {
// 存储用户连接
private static final Map<WebSocket, String> userConnections = new ConcurrentHashMap<>();
// 存储用户会话
private static final Map<String, WebSocket> userSessions = new ConcurrentHashMap<>();
/**
*
* @param conn WebSocket
* @param userId ID
*/
public static void addUser(String userId, WebSocket conn) {
userConnections.put(conn, userId);
userSessions.put(userId, conn);
}
/**
*
* @param conn WebSocket
* @return
*/
public static boolean removeUser(WebSocket conn) {
String userId = userConnections.get(conn);
if (userId != null) {
userSessions.remove(userId);
}
return userConnections.remove(conn) != null;
}
/**
* IDWebSocket
* @param userId ID
* @return WebSocket
*/
public static WebSocket getWebSocketByUser(String userId) {
return userSessions.get(userId);
}
/**
* ID
* @param conn WebSocket
* @return ID
*/
public static String getUserByWebSocket(WebSocket conn) {
return userConnections.get(conn);
}
/**
* 线ID
* @return 线ID
*/
public static Collection<String> getOnlineUsers() {
return userSessions.keySet();
}
/**
* 线
* @return 线
*/
public static int getOnlineUserCount() {
return userSessions.size();
}
/**
* 线
* @param userId ID
* @return 线
*/
public static boolean isUserOnline(String userId) {
return userSessions.containsKey(userId);
}
/**
*
* @param userId ID
* @param message
*/
public static void sendMessageToUser(String userId, String message) {
WebSocket conn = userSessions.get(userId);
if (conn != null) {
conn.send(message);
}
}
/**
* WebSocket
* @param conn WebSocket
* @param message
*/
public static void sendMessageToUser(WebSocket conn, String message) {
if (conn != null) {
conn.send(message);
}
}
/**
* WebSocket
* @return WebSocket
*/
public static Set<WebSocket> getAllConnections() {
return new HashSet<>(userConnections.keySet());
}
}

View File

@ -0,0 +1,223 @@
package com.zcloud.websocket;
import com.zcloud.service.video.RtspToFlvService;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class VideoWebSocketServer extends WebSocketServer {
// 存储连接的客户端和对应的FFmpeg进程
private static final Map<WebSocket, Process> clientProcesses = new ConcurrentHashMap<>();
private static final Map<WebSocket, String> clientSessions = new ConcurrentHashMap<>();
public VideoWebSocketServer(int port) {
super(new InetSocketAddress(port));
}
@Override
public void onOpen(WebSocket conn, ClientHandshake handshake) {
System.out.println("新的WebSocket连接: " + conn.getRemoteSocketAddress());
// 检查是否在握手URL中提供了RTSP源
String path = handshake.getResourceDescriptor();
if (path != null && path.contains("?")) {
try {
URI uri = new URI("ws://localhost" + path);
String query = uri.getQuery();
if (query != null) {
if (query.startsWith("src=")) {
String rtspUrl = java.net.URLDecoder.decode(query.substring(4), "UTF-8");
startStreaming(conn, rtspUrl);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
System.out.println("WebSocket连接关闭: " + conn.getRemoteSocketAddress() + ", code: " + code + ", reason: " + reason + ", remote: " + remote);
// 确保停止对应的FFmpeg进程
stopStreaming(conn);
}
@Override
public void onMessage(WebSocket conn, String message) {
System.out.println("收到消息: " + message);
// 解析消息,期望格式为 "play:rtsp://admin:xintai123456@172.168.8.242:554/Streaming/Channels/501"
if (message.startsWith("play:")) {
String rtspUrl = message.substring(5);
startStreaming(conn, rtspUrl);
} else if (message.equals("stop")) {
stopStreaming(conn);
}
}
@Override
public void onError(WebSocket conn, Exception ex) {
System.err.println("WebSocket错误: " + ex.getMessage());
ex.printStackTrace();
if (conn != null) {
// 发生错误时也要确保清理资源
stopStreaming(conn);
}
}
@Override
public void onStart() {
System.out.println("Video WebSocket服务器启动端口: " + getPort());
}
private void startStreaming(WebSocket conn, String rtspUrl) {
try {
// 如果已有进程在运行,先停止它
stopStreaming(conn);
// 使用RtspToHlsService中的方法启动FFmpeg转码进程
RtspToFlvService service = new RtspToFlvService();
Process process = service.startFlvTranscode(rtspUrl);
// 存储进程引用
clientProcesses.put(conn, process);
clientSessions.put(conn, rtspUrl);
// 启动线程读取FFmpeg输出并发送到WebSocket客户端
startOutputReader(conn, process);
// 不再发送"started"文本消息避免干扰flv.js播放器
} catch (Exception e) {
e.printStackTrace();
// 错误信息通过二进制形式发送避免干扰flv.js播放器
}
}
private void stopStreaming(WebSocket conn) {
Process process = clientProcesses.get(conn);
if (process != null) {
System.out.println("正在终止FFmpeg进程连接: " + conn.getRemoteSocketAddress());
// 强制终止进程并等待其结束
process.destroyForcibly();
try {
process.waitFor(); // 等待进程终止
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
clientProcesses.remove(conn);
System.out.println("FFmpeg进程已终止连接: " + conn.getRemoteSocketAddress());
}
clientSessions.remove(conn);
}
private void startOutputReader(WebSocket conn, Process process) {
new Thread(() -> {
try (InputStream is = process.getInputStream()) {
byte[] buffer = new byte[4096];
int bytesRead;
// 读取并验证FLV头
byte[] headerBuffer = new byte[13]; // FLV头通常是13字节
int headerBytesRead = is.read(headerBuffer);
if (headerBytesRead == -1) {
System.err.println("FFmpeg没有输出数据");
return;
}
if (headerBytesRead >= 9) { // FLV头最少9字节
// 验证是否是有效的FLV头 (前3字节应该是 'F', 'L', 'V')
if (headerBuffer[0] == 'F' && headerBuffer[1] == 'L' && headerBuffer[2] == 'V') {
// 发送FLV头
conn.send(ByteBuffer.wrap(headerBuffer, 0, headerBytesRead));
System.out.println("FLV头已发送");
} else {
// 如果不是标准FLV头构造一个标准的FLV头
System.err.println("警告: FFmpeg输出不是标准FLV格式正在构造标准FLV头");
// 构造标准FLV头 (FLV + 版本1 + 音视频标志 + 头部长度)
byte[] standardHeader = new byte[] {
'F', 'L', 'V', 0x01, 0x05, 0x00, 0x00, 0x00, 0x09,
0x00, 0x00, 0x00, 0x00
};
conn.send(ByteBuffer.wrap(standardHeader));
// 然后发送原始数据(跳过原始头部)
conn.send(ByteBuffer.wrap(headerBuffer, 0, headerBytesRead));
}
}
// 继续发送剩余数据
int totalBytes = headerBytesRead > 0 ? headerBytesRead : 0;
while ((bytesRead = is.read(buffer)) != -1 && clientProcesses.containsKey(conn)) {
if (conn.isOpen()) {
conn.send(ByteBuffer.wrap(buffer, 0, bytesRead));
totalBytes += bytesRead;
} else {
break;
}
}
System.out.println("总共发送了 " + totalBytes + " 字节的数据");
// 检查错误流
try (InputStream errorStream = process.getErrorStream()) {
byte[] errorBuffer = new byte[1024];
int errorBytesRead;
StringBuilder errorLog = new StringBuilder();
while ((errorBytesRead = errorStream.read(errorBuffer)) != -1) {
errorLog.append(new String(errorBuffer, 0, errorBytesRead));
}
if (errorLog.length() > 0) {
System.err.println("FFmpeg错误输出: " + errorLog.toString());
}
}
int exitCode = process.waitFor();
System.out.println("FFmpeg进程退出退出码: " + exitCode);
if (exitCode != 0) {
System.err.println("FFmpeg进程异常退出退出码: " + exitCode);
}
} catch (Exception e) {
if (!(e instanceof InterruptedException)) {
e.printStackTrace();
try {
if (conn.isOpen()) {
conn.close(1011, "FFmpeg处理错误: " + e.getMessage());
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
} finally {
// 清理资源
if (clientProcesses.containsKey(conn)) {
Process storedProcess = clientProcesses.remove(conn);
if (storedProcess != null) {
System.out.println("在finally块中终止FFmpeg进程连接: " + conn.getRemoteSocketAddress());
storedProcess.destroyForcibly(); // 强制终止进程
try {
storedProcess.waitFor(); // 等待进程终止
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("FFmpeg进程已在finally块中终止连接: " + conn.getRemoteSocketAddress());
}
clientSessions.remove(conn);
}
}
}, "FFmpegOutputReader-" + conn.getRemoteSocketAddress()).start();
}
// 获取当前活跃的流数量
public static int getActiveStreamCount() {
return clientProcesses.size();
}
}