Quellcode durchsuchen

1、新增VideoTaskManager回收监控线程、回收fileProcessingFuture

pc147123 vor 3 Monaten
Ursprung
Commit
7bf6848c13

+ 0 - 4
pom.xml

@@ -139,10 +139,6 @@
             <artifactId>druid-spring-boot-starter</artifactId>
             <version>${druid.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-data-redis</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>

+ 9 - 5
src/main/java/com/example/unusualsounds/project/device/service/impl/DevicesSoundSensorsServiceImpl.java

@@ -115,14 +115,17 @@ public class DevicesSoundSensorsServiceImpl extends ServiceImpl<DevicesSoundSens
         if ("file".equals(soundSensors.getSoundSensorType())) {
             System.out.println("获取:" + soundSensors.getSoundFileUUid().toString());
 
-            if(!soundSensors.getStreamUrl().isEmpty()&&soundSensors.getStreamUrl()!=null){
-                devicesSoundSensors.setStreamUrl(soundSensors.getStreamUrl());
-                devicesSoundSensors.setSrcUrl(soundSensors.getStreamUrl());
-            }else {
+            //存在,请求者和jdk在同一个redis环境中
+            if(RedisUtils.exists(soundSensors.getSoundFileUUid())){
                 Object o = RedisUtils.get(soundSensors.getSoundFileUUid());
                 devicesSoundSensors.setStreamUrl(o.toString());
                 devicesSoundSensors.setSrcUrl(o.toString());
+            }else {
+            //不存在,请求者和jdk不在同一个环境中,从soundSenors中取Url
+                devicesSoundSensors.setStreamUrl(soundSensors.getStreamUrl());
+                devicesSoundSensors.setSrcUrl(soundSensors.getStreamUrl());
             }
+
             devicesSoundSensors.setStatus("online");
             devicesSoundSensors.setVideoType("file");
             devicesSoundSensors.setFileUuid(soundSensors.getSoundFileUUid());
@@ -173,6 +176,8 @@ public class DevicesSoundSensorsServiceImpl extends ServiceImpl<DevicesSoundSens
             try {
                 soundSensors.setSoundSensorUuid(devicesSoundSensors.getUuid());
                 soundSensors.setSoundSensorIp(devicesSoundSensors.getIp().trim());
+                //中心端redis和边端redis不相同,这里给中心端发送请求时插入StreamUrl
+                soundSensors.setStreamUrl(devicesSoundSensors.getStreamUrl());
                 isFlag = this.insertSensors(url, devicesSoundSensors, soundSensors, soundSensors.getSoundFileUUid());
             } catch (Exception e) {
                 e.printStackTrace();
@@ -181,7 +186,6 @@ public class DevicesSoundSensorsServiceImpl extends ServiceImpl<DevicesSoundSens
             }
         }
 
-
         //对文件进行立刻通知算法解析(如果是文件类型,通知算法解析)
         if ("file".equals(soundSensors.getSoundSensorType()) && isFlag) {
             log.info("对文件进行立刻通知算法解析(如果是文件类型,通知算法解析) start");

+ 171 - 58
src/main/java/com/example/unusualsounds/project/vox/config/VideoTaskManager.java

@@ -4,6 +4,7 @@ import com.example.unusualsounds.common.utils.FileToMultipartFile;
 import com.example.unusualsounds.common.utils.RedisUtils;
 import com.example.unusualsounds.common.utils.VoiceAnalysisUtils;
 import com.example.unusualsounds.framework.minio.MinioUtil;
+import com.example.unusualsounds.project.vox.entity.VideoTask;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -34,7 +35,7 @@ public class VideoTaskManager {
      * @param deviceName
      * @return
      */
-    private final ExecutorService uploadExecutor = Executors.newFixedThreadPool(30);
+
     @Value("${vox.video-dir}")
     private String videoDir;
     @Value("${minio.source-dir}")
@@ -54,9 +55,15 @@ public class VideoTaskManager {
     private String commandA;
     @Value("${voice.commandAC}")
     private String commandAC;
+    @Value("${voice.commandSegmentTime}")
+    private String commandSegmentTime;
     @Value("spring.profiles.active")
     private String profilesActive;
 
+
+
+    private final ExecutorService uploadExecutor = Executors.newFixedThreadPool(30);
+
     public boolean startTask(String streamId, String deviceName, String skillID, String rtspUrl, String skillName, String openUuid) {
         String compositeKey = streamId + "_" + skillName;
 
@@ -90,12 +97,12 @@ public class VideoTaskManager {
     public List<TaskStatus> listTasks() {
 
         return taskMap.entrySet().stream()
-                .map(e -> new TaskStatus(e.getKey(), e.getValue().getRtspUrl(), e.getValue().openUuid, e.getValue().isRunning()))
+                .map(e -> new TaskStatus(e.getKey(), e.getValue().getRtspUrl(), e.getValue().openUuid, e.getValue().isRunning(),e.getValue().singleThreadExecutor.toString()))
                 .collect(Collectors.toList());
     }
 
     /**
-     * 检查切片文件是否完成
+     * 异步检查切片文件是否完成
      *
      * @param filePath
      * @return
@@ -107,13 +114,8 @@ public class VideoTaskManager {
                 int attempts = 0;
                 int waitTime = 2 * 60 * 1000; //2分钟
                 while (attempts < 5) {
-                    // todo ghjghj
-                    log.info("checkUploadFileAsync() ---> 不通环境切换对应值 profilesActive:{},commandProbeStr:{}", profilesActive, commandProbeStr);
+                    log.info("checkUploadFileAsync() ---> 不同环境切换对应值 profilesActive:{},commandProbeStr:{}", profilesActive, commandProbeStr);
                     List<String> checkVoxFinish = Arrays.asList(
-                            // todo 本地环境运行时,放开此代码行注释。
-//                            "D:\\file\\ffmpeg\\ffprobe.exe",
-                            // todo 正式环境运行时,放开此代码行注释。
-//                            "ffprobe",
                             commandProbeStr,
                             "-v", "error",
                             "-i", filePath,
@@ -154,6 +156,15 @@ public class VideoTaskManager {
         });
     }
 
+    /**
+     * 异步上传文件
+     * @param filePath
+     * @param skillName
+     * @param streamId
+     * @param skillId
+     * @param deviceName
+     * @return
+     */
     private CompletableFuture<Void> uploadFileAsync(String filePath, String skillName, String streamId, String skillId, String deviceName) {
         return CompletableFuture.runAsync(() -> {
             int maxRetries = 4; // 最大重试次数
@@ -206,6 +217,114 @@ public class VideoTaskManager {
         }, uploadExecutor);
     }
 
+    /**
+     * 文件上传
+     * @param filePath
+     * @param skillName
+     * @param streamId
+     * @param skillId
+     * @param deviceName
+     * @return
+     */
+    private boolean uploadFile(String filePath, String skillName, String streamId, String skillId, String deviceName) {
+        int maxRetries = 4; // 最大重试次数
+        int retryCount = 0; // 当前重试次数
+        boolean uploadSuccess = false; // 标记上传是否成功
+        while (retryCount < maxRetries && !uploadSuccess) {
+            try {
+                // 上传文件
+                File file = new File(filePath);
+                FileToMultipartFile fileToMultipartFile = new FileToMultipartFile(file);
+                String upload = minioUtil.upload(fileToMultipartFile, sourceDir);
+                log.info("上传文件:{}", upload);
+                if (upload == null || upload.isEmpty()) {
+                    // 手动抛出异常
+                    throw new Exception();
+                } else {
+                    // 发送消息
+                    Map<String, Object> map = new HashMap<>();
+                    map.put("filePath", upload);
+                    map.put("skillName", skillName);
+                    map.put("streamId", streamId);
+                    map.put("deviceName", deviceName);
+                    map.put("skillId", skillId);
+                    log.info(VoiceAnalysisUtils.postVoxAnalysis(map, postAnalysisUrl));
+                    // 标记上传成功
+                    uploadSuccess = true;
+                }
+            } catch (Exception e) {
+                // 记录错误日志
+                log.error("上传文件{}或发送消息失败,重试次数: {}/{},错误信息: {}", filePath, retryCount + 1, maxRetries, e.getMessage(), e);
+                // 增加重试计数
+                retryCount++;
+                // 如果达到最大重试次数,抛出异常
+                if (retryCount >= maxRetries) {
+                    log.error("已达到最大重试次数,上传{}失败", filePath);
+                    break;
+                }
+                try {
+                    Thread.sleep(20000); // 等待 20 秒
+                } catch (InterruptedException interruptedException) {
+                    log.error("线程等待中断: {}", interruptedException.getMessage());
+                }
+            }
+        }
+        return uploadSuccess;
+    }
+
+    /**
+     * 文件检查
+     * @param filePath
+     * @return
+     */
+    private boolean checkUploadFile(String filePath) {
+        Process process = null;
+        try {
+            int attempts = 0;
+            int waitTime = 2 * 60 * 1000; //2分钟
+            while (attempts < 5) {
+                log.info("checkUploadFile() ---> 不同环境切换对应值 profilesActive:{},commandProbeStr:{}", profilesActive, commandProbeStr);
+                List<String> checkVoxFinish = Arrays.asList(
+                        commandProbeStr,
+                        "-v", "error",
+                        "-i", filePath,
+                        "-show_entries", "format=duration",
+                        "-of", "default=noprint_wrappers=1:nokey=1"
+                );
+
+                process = new ProcessBuilder(checkVoxFinish).start();
+                StringBuilder errorOutput = new StringBuilder();
+
+                // 读取错误输出
+                try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
+                    String line;
+                    while ((line = reader.readLine()) != null) {
+                        errorOutput.append(line).append("\n");
+                    }
+                }
+
+                int exitCode = process.waitFor();
+                if (exitCode != 0 || !errorOutput.toString().trim().isEmpty()) {
+                    log.error("checkUploadFile:[{}]文件未完成,错误信息:{}", filePath, errorOutput.toString());
+                    Thread.sleep(waitTime);
+                    attempts++;
+                } else {
+                    log.info("checkUploadFile:[{}]文件已完成", filePath);
+                    return true;
+                }
+            }
+            return false; // 超过最大尝试次数,返回 false
+        } catch (InterruptedException | IOException e) {
+            log.error("checkUploadFile:检查文件[{}]失败", filePath, e);
+            throw new RuntimeException(e);
+        } finally {
+            if (process != null) {
+                process.destroy();
+            }
+        }
+    }
+
+
     @Data
     @AllArgsConstructor
     public static class TaskStatus {
@@ -213,6 +332,7 @@ public class VideoTaskManager {
         private String rtspUrl;
         private String openUuid;
         private boolean isRunning;
+        private String singleThreadExecutorName;
     }
 
     @Data
@@ -225,6 +345,7 @@ public class VideoTaskManager {
         private String openUuid;
         private Process process;
         private Future<?> future;
+        private ExecutorService singleThreadExecutor;
 
         VideoTask(String streamId, String rtspUrl, String skillName, String skillId, String deviceName, String openUuid) {
             this.streamId = streamId;
@@ -236,11 +357,10 @@ public class VideoTaskManager {
         }
 
         void start() {
-            // todo ghjghj
-            log.info("start() ---> 不通环境切换对应值 profilesActive:{},commandProbeStr:{}", profilesActive, commandProbeStr);
+            log.info("start() ---> 不同环境切换对应值 profilesActive:{},commandProbeStr:{}", profilesActive, commandProbeStr);
             List<String> command = new ArrayList<>();
             List<String> commandAnother = Arrays.asList("-f", "segment",
-                    "-segment_time", "300", //100秒
+                    "-segment_time", commandSegmentTime, //获取配置文件内时间
                     "-segment_format", "mp4",
                     "-reset_timestamps", "1",
                     "-force_key_frames", "expr:gte(t,n_floor(t/100)*100)",
@@ -266,49 +386,14 @@ public class VideoTaskManager {
                         commandA, commandAC));  // 现场音频为pcm_alaw转mp4不兼容,需要先转aac
             }
             command.addAll(commandAnother);
-//            // todo 本地环境运行时,执行此部分代码行。即需要放开注释。
-//            List<String> command = Arrays.asList(
-//                    commandStr,
-//                    "-rtsp_transport", "tcp",
-//                    "-i", rtspUrl,
-//                    "-vn",
-//                    commandCopy, "copy",
-//                    "-f", "segment",
-//                    "-segment_time", "300", //100秒
-//                    "-segment_format", "mp4",
-//                    "-reset_timestamps", "1",
-//                    "-force_key_frames", "expr:gte(t,n_floor(t/100)*100)",
-//                    "-write_empty_segments", "1",
-//                    "-segment_atclocktime", "1",
-//                    "-strftime", "1",
-//                    videoDir+streamId+"-%Y-%m-%d_%H-%M-%S"+".mp4"
-//            );
-//            // todo 打包项目时,执行此部分代码行。即需要放开注释。
-//            //现场环境内ffmpeg commandList
-////            List<String> command = Arrays.asList(
-////                    commandStr,
-////                    "-rtsp_transport", "tcp",
-////                    "-i", rtspUrl,
-////                    "-vn",
-////                    commandCopy, "copy",
-////                   commandA, commandAC,   // 现场音频为pcm_alaw转mp4不兼容,需要先转aac
-////                    "-f", "segment",
-////                    "-segment_time", "300", //5分钟
-////                    "-segment_format", "mp4",
-////                    "-reset_timestamps", "1",
-////                    "-force_key_frames", "expr:gte(t,n_floor(t/100)*100)",
-////                    "-write_empty_segments", "1",
-////                    "-segment_atclocktime", "1",
-////                    "-strftime", "1",
-////                    videoDir + streamId + "-%Y-%m-%d_%H-%M-%S" + ".mp4"
-////            );
 
             ProcessBuilder pb = new ProcessBuilder(command)
                     .redirectErrorStream(true);
 
             try {
                 this.process = pb.start();
-                this.future = Executors.newSingleThreadExecutor().submit(this::monitorProcess);
+                this.singleThreadExecutor = Executors.newSingleThreadExecutor();
+                this.future = singleThreadExecutor.submit(this::monitorProcess);
             } catch (IOException e) {
                 throw new RuntimeException("无法启动FFmpeg进程", e);
             }
@@ -335,12 +420,21 @@ public class VideoTaskManager {
                 }
 
             }
+            if (singleThreadExecutor != null) {
+                log.info("开始关闭单例线程池:{}",singleThreadExecutor);
+                singleThreadExecutor.shutdown(); // 关闭单例线程池
+                if (!singleThreadExecutor.awaitTermination(6, TimeUnit.SECONDS)) {
+                    log.info("线程池{}未能及时终止,正在强制关闭...",singleThreadExecutor);
+                    singleThreadExecutor.shutdownNow();
+                }
+            }
 
 
         }
 
 
         private Void monitorProcess() {
+            ExecutorService executorService = Executors.newFixedThreadPool(5);
             List<CompletableFuture<?>> futures = new ArrayList<>();
             try (BufferedReader reader = new BufferedReader(
                     new InputStreamReader(process.getInputStream()))) {
@@ -351,7 +445,6 @@ public class VideoTaskManager {
                         log.info("[{}] 进程监控被中断", streamId);
                         return null;
                     }
-                    ;
                     //[segment @ 000002bcb71e5680] Opening 'D:\tmp\data\vox\output-2025-02-14_17-53-20499aec0d-0579-4ce8-8a19-0cdea2c8a648.mp4' for writing
                     //不打印rtsp的过程日志
                     //log.info("[{}]FFmpeg: {}", streamId, line);
@@ -360,20 +453,37 @@ public class VideoTaskManager {
                         String filePath = matcher.group(1).trim();
                         log.info("[{}] 新的视频文件已生成: {}", streamId, filePath);
                         //使用CompletableFuture进行检查
-                        CompletableFuture<CompletableFuture<?>> exceptionally = checkUploadFileAsync(filePath)
-                                .thenApply(isCompleted -> {
+//                        CompletableFuture<CompletableFuture<?>> exceptionally = checkUploadFileAsync(filePath)
+//                                .thenApply(isCompleted -> {
+//                                    if (isCompleted) {
+//                                        return uploadFileAsync(filePath, skillName, streamId, skillId, deviceName);
+//                                    } else {
+//                                        log.warn("文件 {} 未完成,跳过上传", filePath);
+//                                        return CompletableFuture.completedFuture(null);
+//                                    }
+//
+//                                })
+//                                .exceptionally(ex -> {
+//                                    log.error("处理文件 {} 时发生错误: {}", filePath, ex.getMessage(), ex);
+//                                    return null;
+//                                });
+//
+//                        futures.add(exceptionally);
+                        CompletableFuture<?> fileProcessingFuture = CompletableFuture.supplyAsync(() -> {
+                                    boolean isCompleted = checkUploadFile(filePath);
                                     if (isCompleted) {
-                                        return uploadFileAsync(filePath, skillName, streamId, skillId, deviceName);
+                                        uploadFile(filePath, skillName, streamId, skillId, deviceName);
                                     } else {
                                         log.warn("文件 {} 未完成,跳过上传", filePath);
-                                        return CompletableFuture.completedFuture(null);
                                     }
-                                })
+                                    return null;
+                                }, executorService)
                                 .exceptionally(ex -> {
                                     log.error("处理文件 {} 时发生错误: {}", filePath, ex.getMessage(), ex);
-                                    return null;
+                                    throw new RuntimeException("文件处理失败", ex); // 抛出异常以便进一步处理
                                 });
-                        futures.add(exceptionally);
+                        futures.add(fileProcessingFuture);
+
                     }
 
                 }
@@ -387,14 +497,17 @@ public class VideoTaskManager {
             } catch (IOException e) {
                 log.error("[{}] 进程监控失败", streamId, e);
             } finally {
+                executorService.shutdown();
                 cleanup();
                 taskMap.remove(streamId);
             }
             return null;
         }
 
+
+
         private void cleanup() {
-            // 清理临时文件
+            // 此时生成的视频切片被占用,不能立即清除
         }
 
         boolean isRunning() {

+ 4 - 4
src/main/java/com/example/unusualsounds/project/vox/controller/CheckVoxController.java

@@ -49,10 +49,10 @@ public class CheckVoxController {
         String openUuid = UUID.generateUUIDWithoutHyphens();
         //校验设备是否开启(包含传感器、摄像头),当前只做摄像头校验
         CameraDTO camera  = voxServiceImpl.checkDevice(device.getDeviceId());
-        //-----测试(云环境的是mp4文件,不是rtsp流)
-        camera.setSrcURL("rtsp://192.168.1.112:8554/video");
-        camera.setVideoType("");
-        //-----测试
+        //-----测试(本地模拟rtsp流)
+//        camera.setSrcURL("rtsp://192.168.1.112:8554/video");
+//        camera.setVideoType("");
+        //-----测试(本地模拟rtsp流)
         if(camera == null || "offline".equals(camera.getStatus()) || "error".equals(camera.getStatus())){
             //保存启停接口的设备信息到数据库,便于后续定时任务调用
             voiceTicketLogsServiceImpl.saveStartStop(device,openUuid,"设备离线或出错");

+ 2 - 1
src/main/resources/application-dev.yml

@@ -98,9 +98,10 @@ Alarms:
 
 voice:
   commandStr: "D:\\file\\ffmpeg\\ffmpeg.exe"
-  commandProbeStr: "D:\file\ffmpeg\ffprobe.exe"
+  commandProbeStr: "D:\\file\\ffmpeg\\ffprobe.exe"
   commandCopy: "-c"
   commandA: "-c:a"
   commandAC: "aac"
+  commandSegmentTime: "100"
 
 

+ 2 - 1
src/main/resources/application-edge.yml

@@ -98,4 +98,5 @@ voice:
   commandProbeStr: "ffprobe"
   commandCopy: "-c:v"
   commandA: "-c:a"
-  commandAC: "aac"
+  commandAC: "aac"
+  commandSegmentTime: "300"

+ 1 - 0
src/main/resources/application-prod.yml

@@ -101,4 +101,5 @@ voice:
   commandCopy: "-c:v"
   commandA: "-c:a"
   commandAC: "aac"
+  commandSegmentTime: "300"
 

+ 9 - 0
src/main/resources/application-test.yml

@@ -95,3 +95,12 @@ Alarms:
   imageUrl: s3://windmill/store/abnormal-sound/source/2025-03-20/aafdfd09-0f41-4942-9414-10061f8f674e.jpeg
   thumbnailUrl: s3://windmill/store/abnormal-sound/source/2025-03-20/d4a6fa31-0404-486e-997d-9fd9a89f87bd.jpg
 
+voice:
+  commandStr: "ffmpeg"
+  commandProbeStr: "ffprobe"
+  commandCopy: "-c:v"
+  commandA: "-c:a"
+  commandAC: "aac"
+  commandSegmentTime: "300"
+
+

+ 2 - 2
src/main/resources/application.yml

@@ -1,7 +1,7 @@
 spring:
   profiles:
-#    active: dev
-    active: edge
+    active: dev
+#    active: edge
 #    active: prod
 
 springdoc: