|
@@ -7,6 +7,7 @@ import com.example.unusualsounds.framework.minio.MinioUtil;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.Data;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import okhttp3.ConnectionPool;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.http.*;
|
|
@@ -14,10 +15,7 @@ import org.springframework.stereotype.Component;
|
|
|
import org.springframework.web.client.RestTemplate;
|
|
|
|
|
|
import java.io.*;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
+import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
|
import java.util.regex.Matcher;
|
|
|
import java.util.regex.Pattern;
|
|
@@ -38,16 +36,17 @@ public class VideoTaskManager {
|
|
|
private String postAnalysisUrl;
|
|
|
|
|
|
@Autowired
|
|
|
- private MinioUtil minioUtil;
|
|
|
+ private MinioUtil minioUtil;
|
|
|
|
|
|
|
|
|
private final ConcurrentMap<String, VideoTask> taskMap = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
|
- public boolean startTask(String streamId,String deviceName,String skillID, String rtspUrl,String skillName,String openUuid) {
|
|
|
+ public boolean startTask(String streamId, String deviceName, String skillID, String rtspUrl, String skillName, String openUuid) {
|
|
|
String compositeKey = streamId + "_" + skillName;
|
|
|
+
|
|
|
return taskMap.computeIfAbsent(compositeKey, key -> {
|
|
|
- VideoTask task = new VideoTask(key,rtspUrl,skillName, skillID,deviceName,openUuid);
|
|
|
+ VideoTask task = new VideoTask(key, rtspUrl, skillName, skillID, deviceName, openUuid);
|
|
|
task.start();
|
|
|
return task;
|
|
|
}) != null;
|
|
@@ -76,7 +75,7 @@ public class VideoTaskManager {
|
|
|
public List<TaskStatus> listTasks() {
|
|
|
|
|
|
return taskMap.entrySet().stream()
|
|
|
- .map(e -> new TaskStatus(e.getKey(), e.getValue().getRtspUrl(),e.getValue().openUuid, e.getValue().isRunning()))
|
|
|
+ .map(e -> new TaskStatus(e.getKey(), e.getValue().getRtspUrl(), e.getValue().openUuid, e.getValue().isRunning()))
|
|
|
.collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
@@ -100,32 +99,52 @@ public class VideoTaskManager {
|
|
|
private Process process;
|
|
|
private Future<?> future;
|
|
|
|
|
|
- VideoTask(String streamId, String rtspUrl,String skillName,String skillId, String deviceName,String openUuid) {
|
|
|
+ VideoTask(String streamId, String rtspUrl, String skillName, String skillId, String deviceName, String openUuid) {
|
|
|
this.streamId = streamId;
|
|
|
this.rtspUrl = rtspUrl;
|
|
|
- this.skillName=skillName;
|
|
|
- this.skillId=skillId;
|
|
|
- this.deviceName=deviceName;
|
|
|
- this.openUuid= openUuid;
|
|
|
+ this.skillName = skillName;
|
|
|
+ this.skillId = skillId;
|
|
|
+ this.deviceName = deviceName;
|
|
|
+ this.openUuid = openUuid;
|
|
|
}
|
|
|
|
|
|
void start() {
|
|
|
+// List<String> command = Arrays.asList(
|
|
|
+//// "D:\\file\\ffmpeg\\ffmpeg.exe",
|
|
|
+// "ffmpeg",
|
|
|
+// "-rtsp_transport", "tcp",
|
|
|
+// "-i", rtspUrl,
|
|
|
+// "-vn",
|
|
|
+// "-c", "copy",
|
|
|
+// "-f", "segment",
|
|
|
+// "-segment_time", "300", //100秒
|
|
|
+// "-segment_format", "mp4",
|
|
|
+// "-reset_timestamps", "1",
|
|
|
+// "-force_key_frames", "expr:gte(t,n_floor(t/100)*100)",
|
|
|
+// "-write_empty_segments", "1",
|
|
|
+// "-segment_atclocktime", "1",
|
|
|
+// "-strftime", "1",
|
|
|
+// videoDir+streamId+"-%Y-%m-%d_%H-%M-%S"+".mp4"
|
|
|
+// );
|
|
|
+
|
|
|
+ //现场环境内ffmpeg commandList
|
|
|
List<String> command = Arrays.asList(
|
|
|
-// "D:\\file\\ffmpeg\\ffmpeg.exe",
|
|
|
"ffmpeg",
|
|
|
"-rtsp_transport", "tcp",
|
|
|
"-i", rtspUrl,
|
|
|
"-vn",
|
|
|
- "-c", "copy",
|
|
|
+ "-c:v", "copy",
|
|
|
+ "-c:a", "aac", // 现场音频为pcm_alaw转mp4不兼容,需要先转aac
|
|
|
"-f", "segment",
|
|
|
- "-segment_time", "300", //100秒
|
|
|
+ "-segment_time", "300", //5分钟
|
|
|
"-segment_format", "mp4",
|
|
|
"-reset_timestamps", "1",
|
|
|
"-force_key_frames", "expr:gte(t,n_floor(t/100)*100)",
|
|
|
"-write_empty_segments", "1",
|
|
|
"-segment_atclocktime", "1",
|
|
|
"-strftime", "1",
|
|
|
- videoDir+streamId+"-%Y-%m-%d_%H-%M-%S"+".mp4"
|
|
|
+ videoDir + streamId + "-%Y-%m-%d_%H-%M-%S" + ".mp4"
|
|
|
+
|
|
|
);
|
|
|
|
|
|
ProcessBuilder pb = new ProcessBuilder(command)
|
|
@@ -166,6 +185,7 @@ public class VideoTaskManager {
|
|
|
|
|
|
|
|
|
private Void monitorProcess() {
|
|
|
+ List<CompletableFuture<?>> futures = new ArrayList<>();
|
|
|
try (BufferedReader reader = new BufferedReader(
|
|
|
new InputStreamReader(process.getInputStream()))) {
|
|
|
String line;
|
|
@@ -174,25 +194,41 @@ public class VideoTaskManager {
|
|
|
if (Thread.interrupted()) {
|
|
|
log.info("[{}] 进程监控被中断", streamId);
|
|
|
return null;
|
|
|
- };
|
|
|
+ }
|
|
|
+ ;
|
|
|
//[segment @ 000002bcb71e5680] Opening 'D:\tmp\data\vox\output-2025-02-14_17-53-20499aec0d-0579-4ce8-8a19-0cdea2c8a648.mp4' for writing
|
|
|
-// log.info("[{}]FFmpeg: {}", streamId, line);
|
|
|
+ //不打印rtsp的过程日志
|
|
|
+ //log.info("[{}]FFmpeg: {}", streamId, line);
|
|
|
Matcher matcher = compile.matcher(line);
|
|
|
if (matcher.find()) {
|
|
|
String filePath = matcher.group(1).trim();
|
|
|
log.info("[{}] 新的视频文件已生成: {}", streamId, filePath);
|
|
|
- //上传文件、插入数据表告知算法解析;
|
|
|
- uploadFile(filePath,skillName,streamId,skillId,deviceName);
|
|
|
+ //使用CompletableFuture进行检查
|
|
|
+ CompletableFuture<CompletableFuture<?>> exceptionally = checkUploadFileAsync(filePath)
|
|
|
+ .thenApply(isCompleted -> {
|
|
|
+ if (isCompleted) {
|
|
|
+ return uploadFileAsync(filePath, skillName, streamId, skillId, deviceName);
|
|
|
+ } else {
|
|
|
+ log.warn("文件 {} 未完成,跳过上传", filePath);
|
|
|
+ return CompletableFuture.completedFuture(null);
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .exceptionally(ex -> {
|
|
|
+ log.error("处理文件 {} 时发生错误: {}", filePath, ex.getMessage(), ex);
|
|
|
+ return null;
|
|
|
+ });
|
|
|
+ futures.add(exceptionally);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
- int exitCode = process.waitFor();
|
|
|
- log.info("[{}] 进程退出,显示代码 {}", streamId, exitCode);
|
|
|
+ int exitCode = process.waitFor();
|
|
|
+ log.info("[{}] 进程退出,显示代码 {}", streamId, exitCode);
|
|
|
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
|
|
+
|
|
|
} catch (InterruptedException e) {
|
|
|
Thread.currentThread().interrupt(); // 重新设置中断状态
|
|
|
log.warn("[{}] 进程监控被中断", streamId, e);
|
|
|
- }
|
|
|
- catch (IOException e) {
|
|
|
+ } catch (IOException e) {
|
|
|
log.error("[{}] 进程监控失败", streamId, e);
|
|
|
} finally {
|
|
|
cleanup();
|
|
@@ -212,66 +248,224 @@ public class VideoTaskManager {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 上传切分的文件
|
|
|
+ * 检查切片文件是否完成
|
|
|
* @param filePath
|
|
|
* @return
|
|
|
*/
|
|
|
- private CompletableFuture<Void> uploadFile(String filePath,String skillName,String streamId,String skillId, String deviceName){
|
|
|
-
|
|
|
- return CompletableFuture.runAsync(()->{
|
|
|
+ private CompletableFuture<Boolean> checkUploadFileAsync(String filePath) {
|
|
|
+ return CompletableFuture.supplyAsync(() -> {
|
|
|
+ Process process = null;
|
|
|
try {
|
|
|
- while (true){
|
|
|
- //检查文件是否完成
|
|
|
+ int attempts = 0;
|
|
|
+ int waitTime = 2*60*1000; //2分钟
|
|
|
+ while (attempts < 5) {
|
|
|
List<String> checkVoxFinish = Arrays.asList(
|
|
|
-// "D:\\file\\ffmpeg\\ffmpeg.exe",
|
|
|
- "ffmpeg",
|
|
|
- "-v","error",
|
|
|
+ "ffprobe",
|
|
|
+ "-v", "error",
|
|
|
"-i", filePath,
|
|
|
- "-f", "null",
|
|
|
- "-"
|
|
|
+ "-show_entries", "format=duration",
|
|
|
+ "-of", "default=noprint_wrappers=1:nokey=1"
|
|
|
);
|
|
|
- Process process = new ProcessBuilder(checkVoxFinish).start();
|
|
|
- BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
|
|
|
+
|
|
|
+ process = new ProcessBuilder(checkVoxFinish).start();
|
|
|
StringBuilder errorOutput = new StringBuilder();
|
|
|
|
|
|
- String line;
|
|
|
- while ((line = reader.readLine()) != null) {
|
|
|
- errorOutput.append(line).append("\n");
|
|
|
+ // 读取错误输出
|
|
|
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
|
|
|
+ String line;
|
|
|
+ while ((line = reader.readLine()) != null) {
|
|
|
+ errorOutput.append(line).append("\n");
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
int exitCode = process.waitFor();
|
|
|
if (exitCode != 0 || !errorOutput.toString().trim().isEmpty()) {
|
|
|
- log.error("checkVoxFinish:[{}]文件未完成",filePath);
|
|
|
- //重置状态errorOutput状态
|
|
|
- errorOutput.delete(0, errorOutput.length());
|
|
|
- //等待2分钟后再次检查
|
|
|
- Thread.sleep(120000);
|
|
|
- }else {
|
|
|
- //上传文件
|
|
|
- File file = new File(filePath);
|
|
|
- FileToMultipartFile fileToMultipartFile = new FileToMultipartFile(file);
|
|
|
- //上传文件
|
|
|
- String upload = minioUtil.upload(fileToMultipartFile,sourceDir);
|
|
|
- log.info("上传文件:{}",upload);
|
|
|
-
|
|
|
- //发送消息
|
|
|
- Map<String, Object> map = new HashMap<>();
|
|
|
- map.put("filePath",upload);
|
|
|
- map.put("skillName",skillName);
|
|
|
- map.put("streamId",streamId);
|
|
|
- map.put("deviceName",deviceName);
|
|
|
- map.put("skillId",skillId);
|
|
|
- log.info(VoiceAnalysisUtils.postVoxAnalysis(map, postAnalysisUrl));
|
|
|
- break;
|
|
|
+ log.error("checkUploadFile:[{}]文件未完成,错误信息:{}", filePath, errorOutput.toString());
|
|
|
+ Thread.sleep(waitTime);
|
|
|
+ attempts++;
|
|
|
+ } else {
|
|
|
+ log.info("checkUploadFile:[{}]文件已完成", filePath);
|
|
|
+ return true;
|
|
|
}
|
|
|
}
|
|
|
- } catch (InterruptedException e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- } catch (IOException e) {
|
|
|
+ return false; // 超过最大尝试次数,返回 false
|
|
|
+ } catch (InterruptedException | IOException e) {
|
|
|
+ log.error("checkUploadFile:检查文件[{}]失败", filePath, e);
|
|
|
throw new RuntimeException(e);
|
|
|
+ } finally {
|
|
|
+ if (process != null) {
|
|
|
+ process.destroy();
|
|
|
+ }
|
|
|
}
|
|
|
});
|
|
|
-
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 上传切片音频文件
|
|
|
+ * @param filePath
|
|
|
+ * @param skillName
|
|
|
+ * @param streamId
|
|
|
+ * @param skillId
|
|
|
+ * @param deviceName
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private CompletableFuture<Void> uploadFileAsync(String filePath, String skillName, String streamId, String skillId, String deviceName) {
|
|
|
+ return CompletableFuture.runAsync(() -> {
|
|
|
+ int maxRetries = 4; // 最大重试次数
|
|
|
+ int retryCount = 0; // 当前重试次数
|
|
|
+ boolean uploadSuccess = false; // 标记上传是否成功
|
|
|
+ while (retryCount < maxRetries && !uploadSuccess) {
|
|
|
+ try {
|
|
|
+ // 上传文件
|
|
|
+ File file = new File(filePath);
|
|
|
+ FileToMultipartFile fileToMultipartFile = new FileToMultipartFile(file);
|
|
|
+ String upload = minioUtil.upload(fileToMultipartFile, sourceDir);
|
|
|
+ log.info("上传文件:{}", upload);
|
|
|
+ if (upload == null || upload.isEmpty()) {
|
|
|
+ // 手动抛出异常
|
|
|
+ throw new Exception();
|
|
|
+ } else {
|
|
|
+ // 发送消息
|
|
|
+ Map<String, Object> map = new HashMap<>();
|
|
|
+ map.put("filePath", upload);
|
|
|
+ map.put("skillName", skillName);
|
|
|
+ map.put("streamId", streamId);
|
|
|
+ map.put("deviceName", deviceName);
|
|
|
+ map.put("skillId", skillId);
|
|
|
|
|
|
-}
|
|
|
+ log.info(VoiceAnalysisUtils.postVoxAnalysis(map, postAnalysisUrl));
|
|
|
+ // 标记上传成功
|
|
|
+ uploadSuccess = true;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 记录错误日志
|
|
|
+ log.error("上传文件{}或发送消息失败,重试次数: {}/{},错误信息: {}", filePath, retryCount + 1, maxRetries, e.getMessage(), e);
|
|
|
+ // 增加重试计数
|
|
|
+ retryCount++;
|
|
|
+ // 如果达到最大重试次数,抛出异常
|
|
|
+ if (retryCount >= maxRetries) {
|
|
|
+ log.error("已达到最大重试次数,上传{}失败", filePath);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(20000); // 等待 20 秒
|
|
|
+ } catch (InterruptedException interruptedException) {
|
|
|
+ log.error("线程等待中断: {}", interruptedException.getMessage());
|
|
|
+ Thread.currentThread().interrupt(); // 恢复中断状态
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+// /**
|
|
|
+// * 只检查文件是否完成
|
|
|
+// *
|
|
|
+// * @param filePath
|
|
|
+// * @return
|
|
|
+// */
|
|
|
+// private CompletableFuture<Void> checkUploadFile(String filePath,String skillName,String streamId,String skillId, String deviceName){
|
|
|
+//
|
|
|
+// return CompletableFuture.runAsync(()->{
|
|
|
+// Process process = null;
|
|
|
+// BufferedReader reader = null;
|
|
|
+// try {
|
|
|
+// //不能无限检查3*5 15分钟
|
|
|
+// int isNum=0;
|
|
|
+// boolean isFlag =false;
|
|
|
+// while (isNum<5){
|
|
|
+// //检查文件是否完成
|
|
|
+// List<String> checkVoxFinish = Arrays.asList(
|
|
|
+//// "D:\\file\\ffmpeg\\ffmpeg.exe",
|
|
|
+// "ffmpeg",
|
|
|
+// "-v","error",
|
|
|
+// "-i", filePath,
|
|
|
+// "-f", "null",
|
|
|
+// "-"
|
|
|
+// );
|
|
|
+// process = new ProcessBuilder(checkVoxFinish).start();
|
|
|
+// reader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
|
|
|
+// StringBuilder errorOutput = new StringBuilder();
|
|
|
+//
|
|
|
+// String line;
|
|
|
+// while ((line = reader.readLine()) != null) {
|
|
|
+// errorOutput.append(line).append("\n");
|
|
|
+// }
|
|
|
+// int exitCode = process.waitFor();
|
|
|
+// if (exitCode != 0 || !errorOutput.toString().trim().isEmpty()) {
|
|
|
+// log.error("checkVoxFinish:[{}]文件未完成",filePath);
|
|
|
+// //重置状态errorOutput状态
|
|
|
+// errorOutput.delete(0, errorOutput.length());
|
|
|
+// //等待3分钟后再次检查
|
|
|
+// Thread.sleep(180000);
|
|
|
+// isNum++;
|
|
|
+// }else {
|
|
|
+// //跳出循环,上传文件
|
|
|
+// isFlag =true;
|
|
|
+// break;
|
|
|
+// }
|
|
|
+// }
|
|
|
+// if (isFlag) {
|
|
|
+// int maxRetries = 4; // 最大重试次数
|
|
|
+// int retryCount = 0; // 当前重试次数
|
|
|
+// boolean uploadSuccess = false; // 标记上传是否成功
|
|
|
+// while (retryCount < maxRetries && !uploadSuccess) {
|
|
|
+// try {
|
|
|
+// // 上传文件
|
|
|
+// File file = new File(filePath);
|
|
|
+// FileToMultipartFile fileToMultipartFile = new FileToMultipartFile(file);
|
|
|
+// String upload = minioUtil.upload(fileToMultipartFile, sourceDir);
|
|
|
+// log.info("上传文件:{}", upload);
|
|
|
+// // 发送消息
|
|
|
+// Map<String, Object> map = new HashMap<>();
|
|
|
+// map.put("filePath", upload);
|
|
|
+// map.put("skillName", skillName);
|
|
|
+// map.put("streamId", streamId);
|
|
|
+// map.put("deviceName", deviceName);
|
|
|
+// map.put("skillId", skillId);
|
|
|
+// log.info(VoiceAnalysisUtils.postVoxAnalysis(map, postAnalysisUrl));
|
|
|
+// // 标记上传成功
|
|
|
+// uploadSuccess = true;
|
|
|
+// } catch (Exception e) {
|
|
|
+// // 记录错误日志
|
|
|
+// log.error("上传文件或发送消息失败,重试次数: {}/{},错误信息: {}", retryCount + 1, maxRetries, e.getMessage(), e);
|
|
|
+// // 增加重试计数
|
|
|
+// retryCount++;
|
|
|
+// // 如果达到最大重试次数,抛出异常
|
|
|
+// if (retryCount >= maxRetries) {
|
|
|
+// log.error("已达到最大重试次数,上传失败");
|
|
|
+// throw new RuntimeException("上传文件失败,已达到最大重试次数", e);
|
|
|
+// }
|
|
|
+// // 可选:等待一段时间再重试(例如 2 秒)
|
|
|
+// try {
|
|
|
+// Thread.sleep(2000); // 等待 2 秒
|
|
|
+// } catch (InterruptedException interruptedException) {
|
|
|
+// log.error("线程等待中断: {}", interruptedException.getMessage());
|
|
|
+// Thread.currentThread().interrupt(); // 恢复中断状态
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// } catch (InterruptedException e) {
|
|
|
+// throw new RuntimeException(e);
|
|
|
+// } catch (IOException e) {
|
|
|
+// throw new RuntimeException(e);
|
|
|
+// } finally {
|
|
|
+// if(reader !=null){
|
|
|
+// try {
|
|
|
+// reader.close();
|
|
|
+// } catch (IOException e) {
|
|
|
+// log.error("reader关闭失败{}",e.getMessage());
|
|
|
+// }
|
|
|
+// }
|
|
|
+// if(process !=null){
|
|
|
+// process.destroy();
|
|
|
+// }
|
|
|
+//
|
|
|
+// }
|
|
|
+// });
|
|
|
+//
|
|
|
+// }
|