|
@@ -4,15 +4,13 @@ import com.example.unusualsounds.common.utils.FileToMultipartFile;
|
|
|
import com.example.unusualsounds.common.utils.RedisUtils;
|
|
|
import com.example.unusualsounds.common.utils.VoiceAnalysisUtils;
|
|
|
import com.example.unusualsounds.framework.minio.MinioUtil;
|
|
|
+import com.example.unusualsounds.project.vox.entity.VideoTask;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.Data;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import okhttp3.ConnectionPool;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
-import org.springframework.http.*;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
-import org.springframework.web.client.RestTemplate;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.util.*;
|
|
@@ -26,21 +24,45 @@ import java.util.stream.Collectors;
|
|
|
@Slf4j
|
|
|
public class VideoTaskManager {
|
|
|
|
|
|
+ private final ConcurrentMap<String, VideoTask> taskMap = new ConcurrentHashMap<>();
|
|
|
+ /**
|
|
|
+ * 上传切片音频文件
|
|
|
+ *
|
|
|
+ * @param filePath
|
|
|
+ * @param skillName
|
|
|
+ * @param streamId
|
|
|
+ * @param skillId
|
|
|
+ * @param deviceName
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+
|
|
|
@Value("${vox.video-dir}")
|
|
|
private String videoDir;
|
|
|
-
|
|
|
@Value("${minio.source-dir}")
|
|
|
private String sourceDir;
|
|
|
-
|
|
|
@Value("${vox.post-analysis-url}")
|
|
|
private String postAnalysisUrl;
|
|
|
-
|
|
|
@Autowired
|
|
|
private MinioUtil minioUtil;
|
|
|
-
|
|
|
-
|
|
|
- private final ConcurrentMap<String, VideoTask> taskMap = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
+ // todo 路径。
|
|
|
+ @Value("${voice.commandStr}")
|
|
|
+ private String commandStr;
|
|
|
+ @Value("${voice.commandProbeStr}")
|
|
|
+ private String commandProbeStr;
|
|
|
+ @Value("${voice.commandCopy}")
|
|
|
+ private String commandCopy;
|
|
|
+ @Value("${voice.commandA}")
|
|
|
+ private String commandA;
|
|
|
+ @Value("${voice.commandAC}")
|
|
|
+ private String commandAC;
|
|
|
+ @Value("${voice.commandSegmentTime}")
|
|
|
+ private String commandSegmentTime;
|
|
|
+ @Value("spring.profiles.active")
|
|
|
+ private String profilesActive;
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ private final ExecutorService uploadExecutor = Executors.newFixedThreadPool(30);
|
|
|
|
|
|
public boolean startTask(String streamId, String deviceName, String skillID, String rtspUrl, String skillName, String openUuid) {
|
|
|
String compositeKey = streamId + "_" + skillName;
|
|
@@ -75,10 +97,234 @@ public class VideoTaskManager {
|
|
|
public List<TaskStatus> listTasks() {
|
|
|
|
|
|
return taskMap.entrySet().stream()
|
|
|
- .map(e -> new TaskStatus(e.getKey(), e.getValue().getRtspUrl(), e.getValue().openUuid, e.getValue().isRunning()))
|
|
|
+ .map(e -> new TaskStatus(e.getKey(), e.getValue().getRtspUrl(), e.getValue().openUuid, e.getValue().isRunning(),e.getValue().singleThreadExecutor.toString()))
|
|
|
.collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 异步检查切片文件是否完成
|
|
|
+ *
|
|
|
+ * @param filePath
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private CompletableFuture<Boolean> checkUploadFileAsync(String filePath) {
|
|
|
+ return CompletableFuture.supplyAsync(() -> {
|
|
|
+ Process process = null;
|
|
|
+ try {
|
|
|
+ int attempts = 0;
|
|
|
+ int waitTime = 2 * 60 * 1000; //2分钟
|
|
|
+ while (attempts < 5) {
|
|
|
+ log.info("checkUploadFileAsync() ---> 不同环境切换对应值 profilesActive:{},commandProbeStr:{}", profilesActive, commandProbeStr);
|
|
|
+ List<String> checkVoxFinish = Arrays.asList(
|
|
|
+ commandProbeStr,
|
|
|
+ "-v", "error",
|
|
|
+ "-i", filePath,
|
|
|
+ "-show_entries", "format=duration",
|
|
|
+ "-of", "default=noprint_wrappers=1:nokey=1"
|
|
|
+ );
|
|
|
+
|
|
|
+ process = new ProcessBuilder(checkVoxFinish).start();
|
|
|
+ StringBuilder errorOutput = new StringBuilder();
|
|
|
+
|
|
|
+ // 读取错误输出
|
|
|
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
|
|
|
+ String line;
|
|
|
+ while ((line = reader.readLine()) != null) {
|
|
|
+ errorOutput.append(line).append("\n");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int exitCode = process.waitFor();
|
|
|
+ if (exitCode != 0 || !errorOutput.toString().trim().isEmpty()) {
|
|
|
+ log.error("checkUploadFile:[{}]文件未完成,错误信息:{}", filePath, errorOutput.toString());
|
|
|
+ Thread.sleep(waitTime);
|
|
|
+ attempts++;
|
|
|
+ } else {
|
|
|
+ log.info("checkUploadFile:[{}]文件已完成", filePath);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false; // 超过最大尝试次数,返回 false
|
|
|
+ } catch (InterruptedException | IOException e) {
|
|
|
+ log.error("checkUploadFile:检查文件[{}]失败", filePath, e);
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ } finally {
|
|
|
+ if (process != null) {
|
|
|
+ process.destroy();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 异步上传文件
|
|
|
+ * @param filePath
|
|
|
+ * @param skillName
|
|
|
+ * @param streamId
|
|
|
+ * @param skillId
|
|
|
+ * @param deviceName
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private CompletableFuture<Void> uploadFileAsync(String filePath, String skillName, String streamId, String skillId, String deviceName) {
|
|
|
+ return CompletableFuture.runAsync(() -> {
|
|
|
+ int maxRetries = 4; // 最大重试次数
|
|
|
+ int retryCount = 0; // 当前重试次数
|
|
|
+ boolean uploadSuccess = false; // 标记上传是否成功
|
|
|
+ while (retryCount < maxRetries && !uploadSuccess) {
|
|
|
+ try {
|
|
|
+ // 上传文件
|
|
|
+ File file = new File(filePath);
|
|
|
+ FileToMultipartFile fileToMultipartFile = new FileToMultipartFile(file);
|
|
|
+ String upload = minioUtil.upload(fileToMultipartFile, sourceDir);
|
|
|
+ log.info("上传文件:{}", upload);
|
|
|
+ if (upload == null || upload.isEmpty()) {
|
|
|
+ // 手动抛出异常
|
|
|
+ throw new Exception();
|
|
|
+ } else {
|
|
|
+ // 发送消息
|
|
|
+ Map<String, Object> map = new HashMap<>();
|
|
|
+ map.put("filePath", upload);
|
|
|
+ map.put("skillName", skillName);
|
|
|
+ map.put("streamId", streamId);
|
|
|
+ map.put("deviceName", deviceName);
|
|
|
+ map.put("skillId", skillId);
|
|
|
+
|
|
|
+
|
|
|
+ log.info(VoiceAnalysisUtils.postVoxAnalysis(map, postAnalysisUrl));
|
|
|
+ // 标记上传成功
|
|
|
+ uploadSuccess = true;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 记录错误日志
|
|
|
+ log.error("上传文件{}或发送消息失败,重试次数: {}/{},错误信息: {}", filePath, retryCount + 1, maxRetries, e.getMessage(), e);
|
|
|
+ // 增加重试计数
|
|
|
+ retryCount++;
|
|
|
+
|
|
|
+
|
|
|
+ // 如果达到最大重试次数,抛出异常
|
|
|
+ if (retryCount >= maxRetries) {
|
|
|
+ log.error("已达到最大重试次数,上传{}失败", filePath);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(20000); // 等待 20 秒
|
|
|
+ } catch (InterruptedException interruptedException) {
|
|
|
+ log.error("线程等待中断: {}", interruptedException.getMessage());
|
|
|
+ Thread.currentThread().interrupt(); // 恢复中断状态
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, uploadExecutor);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 文件上传
|
|
|
+ * @param filePath
|
|
|
+ * @param skillName
|
|
|
+ * @param streamId
|
|
|
+ * @param skillId
|
|
|
+ * @param deviceName
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private boolean uploadFile(String filePath, String skillName, String streamId, String skillId, String deviceName) {
|
|
|
+ int maxRetries = 4; // 最大重试次数
|
|
|
+ int retryCount = 0; // 当前重试次数
|
|
|
+ boolean uploadSuccess = false; // 标记上传是否成功
|
|
|
+ while (retryCount < maxRetries && !uploadSuccess) {
|
|
|
+ try {
|
|
|
+ // 上传文件
|
|
|
+ File file = new File(filePath);
|
|
|
+ FileToMultipartFile fileToMultipartFile = new FileToMultipartFile(file);
|
|
|
+ String upload = minioUtil.upload(fileToMultipartFile, sourceDir);
|
|
|
+ log.info("上传文件:{}", upload);
|
|
|
+ if (upload == null || upload.isEmpty()) {
|
|
|
+ // 手动抛出异常
|
|
|
+ throw new Exception();
|
|
|
+ } else {
|
|
|
+ // 发送消息
|
|
|
+ Map<String, Object> map = new HashMap<>();
|
|
|
+ map.put("filePath", upload);
|
|
|
+ map.put("skillName", skillName);
|
|
|
+ map.put("streamId", streamId);
|
|
|
+ map.put("deviceName", deviceName);
|
|
|
+ map.put("skillId", skillId);
|
|
|
+ log.info(VoiceAnalysisUtils.postVoxAnalysis(map, postAnalysisUrl));
|
|
|
+ // 标记上传成功
|
|
|
+ uploadSuccess = true;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 记录错误日志
|
|
|
+ log.error("上传文件{}或发送消息失败,重试次数: {}/{},错误信息: {}", filePath, retryCount + 1, maxRetries, e.getMessage(), e);
|
|
|
+ // 增加重试计数
|
|
|
+ retryCount++;
|
|
|
+ // 如果达到最大重试次数,抛出异常
|
|
|
+ if (retryCount >= maxRetries) {
|
|
|
+ log.error("已达到最大重试次数,上传{}失败", filePath);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(20000); // 等待 20 秒
|
|
|
+ } catch (InterruptedException interruptedException) {
|
|
|
+ log.error("线程等待中断: {}", interruptedException.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return uploadSuccess;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 文件检查
|
|
|
+ * @param filePath
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private boolean checkUploadFile(String filePath) {
|
|
|
+ Process process = null;
|
|
|
+ try {
|
|
|
+ int attempts = 0;
|
|
|
+ int waitTime = 2 * 60 * 1000; //2分钟
|
|
|
+ while (attempts < 5) {
|
|
|
+ log.info("checkUploadFile() ---> 不同环境切换对应值 profilesActive:{},commandProbeStr:{}", profilesActive, commandProbeStr);
|
|
|
+ List<String> checkVoxFinish = Arrays.asList(
|
|
|
+ commandProbeStr,
|
|
|
+ "-v", "error",
|
|
|
+ "-i", filePath,
|
|
|
+ "-show_entries", "format=duration",
|
|
|
+ "-of", "default=noprint_wrappers=1:nokey=1"
|
|
|
+ );
|
|
|
+
|
|
|
+ process = new ProcessBuilder(checkVoxFinish).start();
|
|
|
+ StringBuilder errorOutput = new StringBuilder();
|
|
|
+
|
|
|
+ // 读取错误输出
|
|
|
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
|
|
|
+ String line;
|
|
|
+ while ((line = reader.readLine()) != null) {
|
|
|
+ errorOutput.append(line).append("\n");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int exitCode = process.waitFor();
|
|
|
+ if (exitCode != 0 || !errorOutput.toString().trim().isEmpty()) {
|
|
|
+ log.error("checkUploadFile:[{}]文件未完成,错误信息:{}", filePath, errorOutput.toString());
|
|
|
+ Thread.sleep(waitTime);
|
|
|
+ attempts++;
|
|
|
+ } else {
|
|
|
+ log.info("checkUploadFile:[{}]文件已完成", filePath);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false; // 超过最大尝试次数,返回 false
|
|
|
+ } catch (InterruptedException | IOException e) {
|
|
|
+ log.error("checkUploadFile:检查文件[{}]失败", filePath, e);
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ } finally {
|
|
|
+ if (process != null) {
|
|
|
+ process.destroy();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
@Data
|
|
|
@AllArgsConstructor
|
|
|
public static class TaskStatus {
|
|
@@ -86,6 +332,7 @@ public class VideoTaskManager {
|
|
|
private String rtspUrl;
|
|
|
private String openUuid;
|
|
|
private boolean isRunning;
|
|
|
+ private String singleThreadExecutorName;
|
|
|
}
|
|
|
|
|
|
@Data
|
|
@@ -98,6 +345,7 @@ public class VideoTaskManager {
|
|
|
private String openUuid;
|
|
|
private Process process;
|
|
|
private Future<?> future;
|
|
|
+ private ExecutorService singleThreadExecutor;
|
|
|
|
|
|
VideoTask(String streamId, String rtspUrl, String skillName, String skillId, String deviceName, String openUuid) {
|
|
|
this.streamId = streamId;
|
|
@@ -109,58 +357,49 @@ public class VideoTaskManager {
|
|
|
}
|
|
|
|
|
|
void start() {
|
|
|
-// List<String> command = Arrays.asList(
|
|
|
-//// "D:\\file\\ffmpeg\\ffmpeg.exe",
|
|
|
-// "ffmpeg",
|
|
|
-// "-rtsp_transport", "tcp",
|
|
|
-// "-i", rtspUrl,
|
|
|
-// "-vn",
|
|
|
-// "-c", "copy",
|
|
|
-// "-f", "segment",
|
|
|
-// "-segment_time", "300", //100秒
|
|
|
-// "-segment_format", "mp4",
|
|
|
-// "-reset_timestamps", "1",
|
|
|
-// "-force_key_frames", "expr:gte(t,n_floor(t/100)*100)",
|
|
|
-// "-write_empty_segments", "1",
|
|
|
-// "-segment_atclocktime", "1",
|
|
|
-// "-strftime", "1",
|
|
|
-// videoDir+streamId+"-%Y-%m-%d_%H-%M-%S"+".mp4"
|
|
|
-// );
|
|
|
-
|
|
|
- //现场环境内ffmpeg commandList
|
|
|
- List<String> command = Arrays.asList(
|
|
|
- "ffmpeg",
|
|
|
- "-rtsp_transport", "tcp",
|
|
|
- "-i", rtspUrl,
|
|
|
- "-vn",
|
|
|
- "-c:v", "copy",
|
|
|
- "-c:a", "aac", // 现场音频为pcm_alaw转mp4不兼容,需要先转aac
|
|
|
- "-f", "segment",
|
|
|
- "-segment_time", "300", //5分钟
|
|
|
+ log.info("start() ---> 不同环境切换对应值 profilesActive:{},commandProbeStr:{}", profilesActive, commandProbeStr);
|
|
|
+ List<String> command = new ArrayList<>();
|
|
|
+ List<String> commandAnother = Arrays.asList("-f", "segment",
|
|
|
+ "-segment_time", commandSegmentTime, //获取配置文件内时间
|
|
|
"-segment_format", "mp4",
|
|
|
"-reset_timestamps", "1",
|
|
|
"-force_key_frames", "expr:gte(t,n_floor(t/100)*100)",
|
|
|
"-write_empty_segments", "1",
|
|
|
"-segment_atclocktime", "1",
|
|
|
"-strftime", "1",
|
|
|
- videoDir + streamId + "-%Y-%m-%d_%H-%M-%S" + ".mp4"
|
|
|
-
|
|
|
- );
|
|
|
+ videoDir + streamId + "-%Y-%m-%d_%H-%M-%S" + ".mp4");
|
|
|
+ profilesActive = "prod";
|
|
|
+ if (profilesActive.equals("dev")) {
|
|
|
+ command = new ArrayList<>(Arrays.asList(
|
|
|
+ commandStr,
|
|
|
+ "-rtsp_transport", "tcp",
|
|
|
+ "-i", rtspUrl,
|
|
|
+ "-vn",
|
|
|
+ commandCopy, "copy"));
|
|
|
+ } else {
|
|
|
+ command = new ArrayList<>(Arrays.asList(
|
|
|
+ commandStr,
|
|
|
+ "-rtsp_transport", "tcp",
|
|
|
+ "-i", rtspUrl,
|
|
|
+ "-vn",
|
|
|
+ commandCopy, "copy",
|
|
|
+ commandA, commandAC)); // 现场音频为pcm_alaw转mp4不兼容,需要先转aac
|
|
|
+ }
|
|
|
+ command.addAll(commandAnother);
|
|
|
|
|
|
ProcessBuilder pb = new ProcessBuilder(command)
|
|
|
.redirectErrorStream(true);
|
|
|
|
|
|
try {
|
|
|
this.process = pb.start();
|
|
|
- this.future = Executors.newSingleThreadExecutor().submit(this::monitorProcess);
|
|
|
+ this.singleThreadExecutor = Executors.newSingleThreadExecutor();
|
|
|
+ this.future = singleThreadExecutor.submit(this::monitorProcess);
|
|
|
} catch (IOException e) {
|
|
|
throw new RuntimeException("无法启动FFmpeg进程", e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void stop() throws InterruptedException {
|
|
|
-
|
|
|
-
|
|
|
if (future != null && !future.isDone()) {
|
|
|
future.cancel(true);
|
|
|
}
|
|
@@ -172,7 +411,6 @@ public class VideoTaskManager {
|
|
|
|
|
|
int exitCode = process.waitFor(); // 等待FFmpeg进程正常结束
|
|
|
log.info("[{}] 进程退出状态: {}", streamId, exitCode);
|
|
|
-
|
|
|
// 如果进程没有在合理时间内响应,可以考虑强制销毁
|
|
|
if (exitCode != 0) {
|
|
|
process.destroyForcibly();
|
|
@@ -180,11 +418,23 @@ public class VideoTaskManager {
|
|
|
} else {
|
|
|
log.info("[{}] 进程顺利结束", streamId);
|
|
|
}
|
|
|
+
|
|
|
+ }
|
|
|
+ if (singleThreadExecutor != null) {
|
|
|
+ log.info("开始关闭单例线程池:{}",singleThreadExecutor);
|
|
|
+ singleThreadExecutor.shutdown(); // 关闭单例线程池
|
|
|
+ if (!singleThreadExecutor.awaitTermination(6, TimeUnit.SECONDS)) {
|
|
|
+ log.info("线程池{}未能及时终止,正在强制关闭...",singleThreadExecutor);
|
|
|
+ singleThreadExecutor.shutdownNow();
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
}
|
|
|
|
|
|
|
|
|
private Void monitorProcess() {
|
|
|
+ ExecutorService executorService = Executors.newFixedThreadPool(5);
|
|
|
List<CompletableFuture<?>> futures = new ArrayList<>();
|
|
|
try (BufferedReader reader = new BufferedReader(
|
|
|
new InputStreamReader(process.getInputStream()))) {
|
|
@@ -195,7 +445,6 @@ public class VideoTaskManager {
|
|
|
log.info("[{}] 进程监控被中断", streamId);
|
|
|
return null;
|
|
|
}
|
|
|
- ;
|
|
|
//[segment @ 000002bcb71e5680] Opening 'D:\tmp\data\vox\output-2025-02-14_17-53-20499aec0d-0579-4ce8-8a19-0cdea2c8a648.mp4' for writing
|
|
|
//不打印rtsp的过程日志
|
|
|
//log.info("[{}]FFmpeg: {}", streamId, line);
|
|
@@ -204,20 +453,37 @@ public class VideoTaskManager {
|
|
|
String filePath = matcher.group(1).trim();
|
|
|
log.info("[{}] 新的视频文件已生成: {}", streamId, filePath);
|
|
|
//使用CompletableFuture进行检查
|
|
|
- CompletableFuture<CompletableFuture<?>> exceptionally = checkUploadFileAsync(filePath)
|
|
|
- .thenApply(isCompleted -> {
|
|
|
+// CompletableFuture<CompletableFuture<?>> exceptionally = checkUploadFileAsync(filePath)
|
|
|
+// .thenApply(isCompleted -> {
|
|
|
+// if (isCompleted) {
|
|
|
+// return uploadFileAsync(filePath, skillName, streamId, skillId, deviceName);
|
|
|
+// } else {
|
|
|
+// log.warn("文件 {} 未完成,跳过上传", filePath);
|
|
|
+// return CompletableFuture.completedFuture(null);
|
|
|
+// }
|
|
|
+//
|
|
|
+// })
|
|
|
+// .exceptionally(ex -> {
|
|
|
+// log.error("处理文件 {} 时发生错误: {}", filePath, ex.getMessage(), ex);
|
|
|
+// return null;
|
|
|
+// });
|
|
|
+//
|
|
|
+// futures.add(exceptionally);
|
|
|
+ CompletableFuture<?> fileProcessingFuture = CompletableFuture.supplyAsync(() -> {
|
|
|
+ boolean isCompleted = checkUploadFile(filePath);
|
|
|
if (isCompleted) {
|
|
|
- return uploadFileAsync(filePath, skillName, streamId, skillId, deviceName);
|
|
|
+ uploadFile(filePath, skillName, streamId, skillId, deviceName);
|
|
|
} else {
|
|
|
log.warn("文件 {} 未完成,跳过上传", filePath);
|
|
|
- return CompletableFuture.completedFuture(null);
|
|
|
}
|
|
|
- })
|
|
|
+ return null;
|
|
|
+ }, executorService)
|
|
|
.exceptionally(ex -> {
|
|
|
log.error("处理文件 {} 时发生错误: {}", filePath, ex.getMessage(), ex);
|
|
|
- return null;
|
|
|
+ throw new RuntimeException("文件处理失败", ex); // 抛出异常以便进一步处理
|
|
|
});
|
|
|
- futures.add(exceptionally);
|
|
|
+ futures.add(fileProcessingFuture);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -231,14 +497,17 @@ public class VideoTaskManager {
|
|
|
} catch (IOException e) {
|
|
|
log.error("[{}] 进程监控失败", streamId, e);
|
|
|
} finally {
|
|
|
+ executorService.shutdown();
|
|
|
cleanup();
|
|
|
taskMap.remove(streamId);
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+
|
|
|
private void cleanup() {
|
|
|
- // 清理临时文件
|
|
|
+ // 此时生成的视频切片被占用,不能立即清除
|
|
|
}
|
|
|
|
|
|
boolean isRunning() {
|
|
@@ -246,226 +515,6 @@ public class VideoTaskManager {
|
|
|
}
|
|
|
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * 检查切片文件是否完成
|
|
|
- * @param filePath
|
|
|
- * @return
|
|
|
- */
|
|
|
- private CompletableFuture<Boolean> checkUploadFileAsync(String filePath) {
|
|
|
- return CompletableFuture.supplyAsync(() -> {
|
|
|
- Process process = null;
|
|
|
- try {
|
|
|
- int attempts = 0;
|
|
|
- int waitTime = 2*60*1000; //2分钟
|
|
|
- while (attempts < 5) {
|
|
|
- List<String> checkVoxFinish = Arrays.asList(
|
|
|
- "ffprobe",
|
|
|
- "-v", "error",
|
|
|
- "-i", filePath,
|
|
|
- "-show_entries", "format=duration",
|
|
|
- "-of", "default=noprint_wrappers=1:nokey=1"
|
|
|
- );
|
|
|
-
|
|
|
- process = new ProcessBuilder(checkVoxFinish).start();
|
|
|
- StringBuilder errorOutput = new StringBuilder();
|
|
|
-
|
|
|
- // 读取错误输出
|
|
|
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
|
|
|
- String line;
|
|
|
- while ((line = reader.readLine()) != null) {
|
|
|
- errorOutput.append(line).append("\n");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- int exitCode = process.waitFor();
|
|
|
- if (exitCode != 0 || !errorOutput.toString().trim().isEmpty()) {
|
|
|
- log.error("checkUploadFile:[{}]文件未完成,错误信息:{}", filePath, errorOutput.toString());
|
|
|
- Thread.sleep(waitTime);
|
|
|
- attempts++;
|
|
|
- } else {
|
|
|
- log.info("checkUploadFile:[{}]文件已完成", filePath);
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- return false; // 超过最大尝试次数,返回 false
|
|
|
- } catch (InterruptedException | IOException e) {
|
|
|
- log.error("checkUploadFile:检查文件[{}]失败", filePath, e);
|
|
|
- throw new RuntimeException(e);
|
|
|
- } finally {
|
|
|
- if (process != null) {
|
|
|
- process.destroy();
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 上传切片音频文件
|
|
|
- * @param filePath
|
|
|
- * @param skillName
|
|
|
- * @param streamId
|
|
|
- * @param skillId
|
|
|
- * @param deviceName
|
|
|
- * @return
|
|
|
- */
|
|
|
- private CompletableFuture<Void> uploadFileAsync(String filePath, String skillName, String streamId, String skillId, String deviceName) {
|
|
|
- return CompletableFuture.runAsync(() -> {
|
|
|
- int maxRetries = 4; // 最大重试次数
|
|
|
- int retryCount = 0; // 当前重试次数
|
|
|
- boolean uploadSuccess = false; // 标记上传是否成功
|
|
|
- while (retryCount < maxRetries && !uploadSuccess) {
|
|
|
- try {
|
|
|
- // 上传文件
|
|
|
- File file = new File(filePath);
|
|
|
- FileToMultipartFile fileToMultipartFile = new FileToMultipartFile(file);
|
|
|
- String upload = minioUtil.upload(fileToMultipartFile, sourceDir);
|
|
|
- log.info("上传文件:{}", upload);
|
|
|
- if (upload == null || upload.isEmpty()) {
|
|
|
- // 手动抛出异常
|
|
|
- throw new Exception();
|
|
|
- } else {
|
|
|
- // 发送消息
|
|
|
- Map<String, Object> map = new HashMap<>();
|
|
|
- map.put("filePath", upload);
|
|
|
- map.put("skillName", skillName);
|
|
|
- map.put("streamId", streamId);
|
|
|
- map.put("deviceName", deviceName);
|
|
|
- map.put("skillId", skillId);
|
|
|
-
|
|
|
- log.info(VoiceAnalysisUtils.postVoxAnalysis(map, postAnalysisUrl));
|
|
|
- // 标记上传成功
|
|
|
- uploadSuccess = true;
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- // 记录错误日志
|
|
|
- log.error("上传文件{}或发送消息失败,重试次数: {}/{},错误信息: {}", filePath, retryCount + 1, maxRetries, e.getMessage(), e);
|
|
|
- // 增加重试计数
|
|
|
- retryCount++;
|
|
|
- // 如果达到最大重试次数,抛出异常
|
|
|
- if (retryCount >= maxRetries) {
|
|
|
- log.error("已达到最大重试次数,上传{}失败", filePath);
|
|
|
- break;
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(20000); // 等待 20 秒
|
|
|
- } catch (InterruptedException interruptedException) {
|
|
|
- log.error("线程等待中断: {}", interruptedException.getMessage());
|
|
|
- Thread.currentThread().interrupt(); // 恢复中断状态
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
|
|
|
-// /**
|
|
|
-// * 只检查文件是否完成
|
|
|
-// *
|
|
|
-// * @param filePath
|
|
|
-// * @return
|
|
|
-// */
|
|
|
-// private CompletableFuture<Void> checkUploadFile(String filePath,String skillName,String streamId,String skillId, String deviceName){
|
|
|
-//
|
|
|
-// return CompletableFuture.runAsync(()->{
|
|
|
-// Process process = null;
|
|
|
-// BufferedReader reader = null;
|
|
|
-// try {
|
|
|
-// //不能无限检查3*5 15分钟
|
|
|
-// int isNum=0;
|
|
|
-// boolean isFlag =false;
|
|
|
-// while (isNum<5){
|
|
|
-// //检查文件是否完成
|
|
|
-// List<String> checkVoxFinish = Arrays.asList(
|
|
|
-//// "D:\\file\\ffmpeg\\ffmpeg.exe",
|
|
|
-// "ffmpeg",
|
|
|
-// "-v","error",
|
|
|
-// "-i", filePath,
|
|
|
-// "-f", "null",
|
|
|
-// "-"
|
|
|
-// );
|
|
|
-// process = new ProcessBuilder(checkVoxFinish).start();
|
|
|
-// reader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
|
|
|
-// StringBuilder errorOutput = new StringBuilder();
|
|
|
-//
|
|
|
-// String line;
|
|
|
-// while ((line = reader.readLine()) != null) {
|
|
|
-// errorOutput.append(line).append("\n");
|
|
|
-// }
|
|
|
-// int exitCode = process.waitFor();
|
|
|
-// if (exitCode != 0 || !errorOutput.toString().trim().isEmpty()) {
|
|
|
-// log.error("checkVoxFinish:[{}]文件未完成",filePath);
|
|
|
-// //重置状态errorOutput状态
|
|
|
-// errorOutput.delete(0, errorOutput.length());
|
|
|
-// //等待3分钟后再次检查
|
|
|
-// Thread.sleep(180000);
|
|
|
-// isNum++;
|
|
|
-// }else {
|
|
|
-// //跳出循环,上传文件
|
|
|
-// isFlag =true;
|
|
|
-// break;
|
|
|
-// }
|
|
|
-// }
|
|
|
-// if (isFlag) {
|
|
|
-// int maxRetries = 4; // 最大重试次数
|
|
|
-// int retryCount = 0; // 当前重试次数
|
|
|
-// boolean uploadSuccess = false; // 标记上传是否成功
|
|
|
-// while (retryCount < maxRetries && !uploadSuccess) {
|
|
|
-// try {
|
|
|
-// // 上传文件
|
|
|
-// File file = new File(filePath);
|
|
|
-// FileToMultipartFile fileToMultipartFile = new FileToMultipartFile(file);
|
|
|
-// String upload = minioUtil.upload(fileToMultipartFile, sourceDir);
|
|
|
-// log.info("上传文件:{}", upload);
|
|
|
-// // 发送消息
|
|
|
-// Map<String, Object> map = new HashMap<>();
|
|
|
-// map.put("filePath", upload);
|
|
|
-// map.put("skillName", skillName);
|
|
|
-// map.put("streamId", streamId);
|
|
|
-// map.put("deviceName", deviceName);
|
|
|
-// map.put("skillId", skillId);
|
|
|
-// log.info(VoiceAnalysisUtils.postVoxAnalysis(map, postAnalysisUrl));
|
|
|
-// // 标记上传成功
|
|
|
-// uploadSuccess = true;
|
|
|
-// } catch (Exception e) {
|
|
|
-// // 记录错误日志
|
|
|
-// log.error("上传文件或发送消息失败,重试次数: {}/{},错误信息: {}", retryCount + 1, maxRetries, e.getMessage(), e);
|
|
|
-// // 增加重试计数
|
|
|
-// retryCount++;
|
|
|
-// // 如果达到最大重试次数,抛出异常
|
|
|
-// if (retryCount >= maxRetries) {
|
|
|
-// log.error("已达到最大重试次数,上传失败");
|
|
|
-// throw new RuntimeException("上传文件失败,已达到最大重试次数", e);
|
|
|
-// }
|
|
|
-// // 可选:等待一段时间再重试(例如 2 秒)
|
|
|
-// try {
|
|
|
-// Thread.sleep(2000); // 等待 2 秒
|
|
|
-// } catch (InterruptedException interruptedException) {
|
|
|
-// log.error("线程等待中断: {}", interruptedException.getMessage());
|
|
|
-// Thread.currentThread().interrupt(); // 恢复中断状态
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-//
|
|
|
-// } catch (InterruptedException e) {
|
|
|
-// throw new RuntimeException(e);
|
|
|
-// } catch (IOException e) {
|
|
|
-// throw new RuntimeException(e);
|
|
|
-// } finally {
|
|
|
-// if(reader !=null){
|
|
|
-// try {
|
|
|
-// reader.close();
|
|
|
-// } catch (IOException e) {
|
|
|
-// log.error("reader关闭失败{}",e.getMessage());
|
|
|
-// }
|
|
|
-// }
|
|
|
-// if(process !=null){
|
|
|
-// process.destroy();
|
|
|
-// }
|
|
|
-//
|
|
|
-// }
|
|
|
-// });
|
|
|
-//
|
|
|
-// }
|