SpringBoot實現簡易直播

當下直播技術已經成為各類應用不可或缺的一部分,從社交媒體到在線教育,再到電子商務和游戲領域,直播功能正在被廣泛應用。

本文將介紹如何使用SpringBoot框架構建一個直播流推拉系統。

在這里插入圖片描述

一、直播技術基礎

1.1 推流與拉流概念

直播系統的核心環節包括推流和拉流:

  • 推流(Push) : 指主播將采集的音視頻數據通過特定協議發送到流媒體服務器的過程
  • 拉流(Pull) : 指觀眾從流媒體服務器獲取音視頻數據并播放的過程

1.2 常用直播協議

市面上主要的直播協議包括:

協議優勢劣勢適用場景
RTMP低延遲(1-3秒)、成熟穩定基于Flash、需要額外端口(1935)主播推流、低延遲直播
HLS兼容性好、使用HTTP協議延遲高大規模直播分發、移動端
WebRTC超低延遲(<1秒)、P2P通信穿透復雜網絡困難、部署復雜實時互動、小規模視頻會議
HTTP-FLV低延遲、兼容性較好不支持可變碼率大規模觀看、延遲敏感場景
SRT低延遲、高可靠性生態相對較新專業直播、跨國直播

1.3 直播系統架構概述

一個完整的直播系統通常包含以下組件:

  1. 采集端:負責采集、編碼音視頻數據
  2. 流媒體服務器:處理音視頻流的轉發、轉碼和分發
  3. CDN:提供內容分發服務,解決大規模用戶訪問問題
  4. 播放器:解碼并播放音視頻內容
  5. 信令服務:管理直播間信息、用戶狀態等

二、系統技術設計

2.1 整體架構

核心組件包括:

  1. API服務層:基于SpringBoot構建的RESTful API,處理直播間管理、用戶認證等
  2. 媒體服務器集成層:集成開源流媒體服務器(如SRS)
  3. 流轉發層:處理媒體流的轉發、轉碼和適配
  4. 存儲層:用于直播回放和點播
  5. WebSocket服務:提供直播互動功能

2.2 技術選型

  • 后端框架:SpringBoot 3
  • 流媒體服務器:SRS (Simple RTMP Server)
  • 數據庫:MySQL + Redis
  • WebSocket:Spring WebSocket
  • 存儲系統:MinIO (對象存儲)
  • 前端播放器:Video.js、flv.js、hls.js

三、SpringBoot實現直播服務

3.1 項目依賴配置

首先配置pom.xml文件,引入必要的依賴:

<dependencies><!-- SpringBoot 核心依賴 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- WebSocket 支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><!-- Redis 支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- MySQL 支持 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><!-- MyBatis Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-spring-boot3-starter</artifactId><version>3.5.5</version></dependency><!-- MinIO 客戶端 --><dependency><groupId>io.minio</groupId><artifactId>minio</artifactId><version>8.4.6</version></dependency><!-- HTTP 客戶端 --><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.14</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>

3.2 應用配置

application.yml中配置應用參數:

server:port: 8080spring:application:name: live-streaming-service# 數據庫配置datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/live_streaming?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghaiusername: rootpassword: root# Redis配置redis:host: localhostport: 6379database: 0# MyBatis Plus配置
mybatis-plus:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.example.livestream.entityconfiguration:map-underscore-to-camel-case: truelog-impl: org.apache.ibatis.logging.stdout.StdOutImpl# 流媒體服務器配置
live:srs:server-url: rtmp://192.168.195.100:1935/liveapi-url: http://192.168.195.100:1985/apihttp-flv-url: http://192.168.195.100:8080/livehls-url: http://192.168.195.100:8080/live/hlsrecord:save-path: /data/recordpush:key-check-enabled: trueauth-expire: 86400  # 24小時,單位秒auth-key: your_secret_key# MinIO配置
minio:endpoint: http://localhost:9000access-key: minioadminsecret-key: minioadminbucket: live-recordings

3.3 實體類設計

首先定義直播間實體:

@Data
@TableName("live_room")
public class LiveRoom {@TableId(value = "id", type = IdType.AUTO)private Long id;private String title;private String coverUrl;private Long userId;private String streamKey;  // 推流密鑰private String streamUrl;  // 推流地址private String hlsUrl;     // HLS播放地址private String flvUrl;     // HTTP-FLV播放地址private Integer status;    // 0:未開播 1:直播中 2:直播結束private Long viewCount;    // 觀看人數private Long likeCount;    // 點贊數private LocalDateTime startTime;  // 開播時間private LocalDateTime endTime;    // 結束時間private LocalDateTime createdAt;private LocalDateTime updatedAt;
}

直播流信息實體:

@Data
@TableName("live_stream")
public class LiveStream {@TableId(value = "id", type = IdType.AUTO)private Long id;private Long roomId;private String streamId;  // 流IDprivate String protocol;  // 協議類型:rtmp/hls/flvprivate Integer bitrate;  // 碼率private String resolution; // 分辨率private Integer status;   // 0:未啟動 1:活躍 2:已結束private LocalDateTime createdAt;private LocalDateTime updatedAt;
}

直播回放實體:

@Data
@TableName("live_recording")
public class LiveRecording {@TableId(value = "id", type = IdType.AUTO)private Long id;private Long roomId;private String fileName;private String fileUrl;private Long fileSize;private Integer duration;  // 時長,單位秒private LocalDateTime startTime;private LocalDateTime endTime;private Integer status;   // 0:錄制中 1:錄制完成 2:處理中 3:可用 4:刪除private LocalDateTime createdAt;private LocalDateTime updatedAt;
}

3.4 數據庫表設計

根據實體類,創建對應的數據庫表:

-- 直播間表
CREATE TABLE `live_room` (`id` bigint NOT NULL AUTO_INCREMENT,`title` varchar(255) NOT NULL COMMENT '直播標題',`cover_url` varchar(255) DEFAULT NULL COMMENT '封面URL',`user_id` bigint NOT NULL COMMENT '主播用戶ID',`stream_key` varchar(64) NOT NULL COMMENT '推流密鑰',`stream_url` varchar(255) DEFAULT NULL COMMENT '推流地址',`hls_url` varchar(255) DEFAULT NULL COMMENT 'HLS播放地址',`flv_url` varchar(255) DEFAULT NULL COMMENT 'HTTP-FLV播放地址',`status` tinyint NOT NULL DEFAULT '0' COMMENT '狀態:0未開播 1直播中 2直播結束',`view_count` bigint NOT NULL DEFAULT '0' COMMENT '觀看人數',`like_count` bigint NOT NULL DEFAULT '0' COMMENT '點贊數',`start_time` datetime DEFAULT NULL COMMENT '開播時間',`end_time` datetime DEFAULT NULL COMMENT '結束時間',`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uk_stream_key` (`stream_key`),KEY `idx_user_id` (`user_id`),KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播間信息表';-- 直播流表
CREATE TABLE `live_stream` (`id` bigint NOT NULL AUTO_INCREMENT,`room_id` bigint NOT NULL COMMENT '直播間ID',`stream_id` varchar(64) NOT NULL COMMENT '流ID',`protocol` varchar(20) NOT NULL COMMENT '協議類型',`bitrate` int DEFAULT NULL COMMENT '碼率',`resolution` varchar(20) DEFAULT NULL COMMENT '分辨率',`status` tinyint NOT NULL DEFAULT '0' COMMENT '狀態:0未啟動 1活躍 2已結束',`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uk_stream_id` (`stream_id`),KEY `idx_room_id` (`room_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播流信息表';-- 直播回放表
CREATE TABLE `live_recording` (`id` bigint NOT NULL AUTO_INCREMENT,`room_id` bigint NOT NULL COMMENT '直播間ID',`file_name` varchar(255) NOT NULL COMMENT '文件名',`file_url` varchar(255) COMMENT '文件URL',`file_size` bigint DEFAULT NULL COMMENT '文件大小(字節)',`duration` int DEFAULT NULL COMMENT '時長(秒)',`start_time` datetime NOT NULL COMMENT '開始時間',`end_time` datetime DEFAULT NULL COMMENT '結束時間',`status` tinyint NOT NULL DEFAULT '0' COMMENT '狀態:0錄制中 1錄制完成 2處理中 3可用 4刪除',`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),KEY `idx_room_id` (`room_id`),KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播回放表';

3.5 Mapper接口

使用MyBatis-Plus創建Mapper接口:

@Mapper
public interface LiveRoomMapper extends BaseMapper<LiveRoom> {// 自定義查詢方法@Select("SELECT * FROM live_room WHERE status = 1 ORDER BY view_count DESC LIMIT #{limit}")List<LiveRoom> findHotLiveRooms(@Param("limit") int limit);
}@Mapper
public interface LiveStreamMapper extends BaseMapper<LiveStream> {// 基礎CRUD方法由BaseMapper提供
}@Mapper
public interface LiveRecordingMapper extends BaseMapper<LiveRecording> {// 基礎CRUD方法由BaseMapper提供
}

3.6 服務層實現

3.6.1 直播流服務
@Service
@Slf4j
public class LiveStreamService {@Autowiredprivate LiveRoomMapper liveRoomMapper;@Autowiredprivate LiveStreamMapper liveStreamMapper;@Autowiredprivate StringRedisTemplate redisTemplate;@Value("${live.srs.server-url}")private String srsServerUrl;@Value("${live.srs.api-url}")private String srsApiUrl;@Value("${live.srs.http-flv-url}")private String httpFlvUrl;@Value("${live.srs.hls-url}")private String hlsUrl;@Value("${live.push.key-check-enabled}")private boolean keyCheckEnabled;@Value("${live.push.auth-expire}")private long authExpire;@Value("${live.push.auth-key}")private String authKey;private RestTemplate restTemplate = new RestTemplate();/*** 創建直播間*/@Transactionalpublic LiveRoom createLiveRoom(LiveRoom liveRoom) {// 生成推流密鑰String streamKey = generateStreamKey(liveRoom.getUserId());liveRoom.setStreamKey(streamKey);// 構建推流地址String pushUrl = buildPushUrl(streamKey);liveRoom.setStreamUrl(pushUrl);// 構建播放地址liveRoom.setHlsUrl(hlsUrl + "/" + streamKey + ".m3u8");liveRoom.setFlvUrl(httpFlvUrl + "/" + streamKey + ".flv");// 設置初始狀態liveRoom.setStatus(0);liveRoom.setViewCount(0L);liveRoom.setLikeCount(0L);liveRoom.setCreatedAt(LocalDateTime.now());liveRoom.setUpdatedAt(LocalDateTime.now());// 保存到數據庫liveRoomMapper.insert(liveRoom);return liveRoom;}/*** 生成推流密鑰*/private String generateStreamKey(Long userId) {// 生成基于用戶ID和時間戳的唯一密鑰String baseKey = userId + "_" + System.currentTimeMillis();return DigestUtils.md5DigestAsHex(baseKey.getBytes());}/*** 構建推流地址*/private String buildPushUrl(String streamKey) {StringBuilder sb = new StringBuilder(srsServerUrl);sb.append("/").append(streamKey);// 如果啟用了推流驗證if (keyCheckEnabled) {long expireTimestamp = System.currentTimeMillis() / 1000 + authExpire;String authString = streamKey + "-" + expireTimestamp + "-" + authKey;String authToken = DigestUtils.md5DigestAsHex(authString.getBytes());sb.append("?auth_key=").append(authToken).append("&expire=").append(expireTimestamp);}return sb.toString();}/*** 開始直播*/@Transactionalpublic LiveRoom startLiveStream(Long roomId) {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null) {throw new IllegalArgumentException("直播間不存在");}// 更新直播間狀態為直播中liveRoom.setStatus(1);liveRoom.setStartTime(LocalDateTime.now());liveRoomMapper.updateById(liveRoom);// 創建直播流記錄LiveStream liveStream = new LiveStream();liveStream.setRoomId(roomId);liveStream.setStreamId(liveRoom.getStreamKey());liveStream.setProtocol("rtmp");liveStream.setStatus(1);liveStream.setCreatedAt(LocalDateTime.now());liveStream.setUpdatedAt(LocalDateTime.now());liveStreamMapper.insert(liveStream);// 更新Redis緩存中的活躍直播間redisTemplate.opsForSet().add("live:active_rooms", String.valueOf(roomId));return liveRoom;}/*** 結束直播*/@Transactionalpublic LiveRoom endLiveStream(Long roomId) {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {throw new IllegalArgumentException("直播間不存在或未開播");}// 更新直播間狀態為已結束liveRoom.setStatus(2);liveRoom.setEndTime(LocalDateTime.now());liveRoomMapper.updateById(liveRoom);// 更新直播流狀態QueryWrapper<LiveStream> queryWrapper = new QueryWrapper<>();queryWrapper.eq("room_id", roomId).eq("status", 1);LiveStream liveStream = liveStreamMapper.selectOne(queryWrapper);if (liveStream != null) {liveStream.setStatus(2);liveStream.setUpdatedAt(LocalDateTime.now());liveStreamMapper.updateById(liveStream);}// 從Redis中移除活躍直播間redisTemplate.opsForSet().remove("live:active_rooms", String.valueOf(roomId));return liveRoom;}/*** 獲取當前活躍的直播間列表*/public List<LiveRoom> getActiveLiveRooms(int page, int size) {Page<LiveRoom> pageParam = new Page<>(page, size);QueryWrapper<LiveRoom> queryWrapper = new QueryWrapper<>();queryWrapper.eq("status", 1).orderByDesc("view_count");return liveRoomMapper.selectPage(pageParam, queryWrapper).getRecords();}/*** 獲取熱門直播間*/public List<LiveRoom> getHotLiveRooms(int limit) {return liveRoomMapper.findHotLiveRooms(limit);}/*** 增加直播間觀看人數*/public void incrementViewCount(Long roomId) {// 使用Redis進行計數String key = "live:room:" + roomId + ":view_count";redisTemplate.opsForValue().increment(key);// 定期同步到數據庫if (Math.random() < 0.1) {  // 10%概率同步,減少數據庫壓力String countStr = redisTemplate.opsForValue().get(key);if (countStr != null) {long count = Long.parseLong(countStr);LiveRoom room = new LiveRoom();room.setId(roomId);room.setViewCount(count);liveRoomMapper.updateById(room);}}}/*** 校驗推流密鑰*/public boolean validateStreamKey(String streamKey, String token, String expire) {if (!keyCheckEnabled) {return true;}try {long expireTimestamp = Long.parseLong(expire);long currentTime = System.currentTimeMillis() / 1000;// 檢查是否過期if (currentTime > expireTimestamp) {return false;}// 驗證tokenString authString = streamKey + "-" + expire + "-" + authKey;String calculatedToken = DigestUtils.md5DigestAsHex(authString.getBytes());return calculatedToken.equals(token);} catch (Exception e) {log.error("驗證推流密鑰異常", e);return false;}}/*** 處理SRS回調 - 流發布*/public void handleStreamPublish(String app, String stream) {try {// 查找對應的直播間QueryWrapper<LiveRoom> queryWrapper = new QueryWrapper<>();queryWrapper.eq("stream_key", stream);LiveRoom liveRoom = liveRoomMapper.selectOne(queryWrapper);if (liveRoom != null && liveRoom.getStatus() == 0) {// 更新直播間狀態startLiveStream(liveRoom.getId());log.info("直播流發布成功: app={}, stream={}, roomId={}", app, stream, liveRoom.getId());}} catch (Exception e) {log.error("處理流發布回調異常", e);}}/*** 處理SRS回調 - 流關閉*/public void handleStreamClose(String app, String stream) {try {// 查找對應的直播間QueryWrapper<LiveRoom> queryWrapper = new QueryWrapper<>();queryWrapper.eq("stream_key", stream);LiveRoom liveRoom = liveRoomMapper.selectOne(queryWrapper);if (liveRoom != null && liveRoom.getStatus() == 1) {// 更新直播間狀態endLiveStream(liveRoom.getId());log.info("直播流關閉: app={}, stream={}, roomId={}", app, stream, liveRoom.getId());}} catch (Exception e) {log.error("處理流關閉回調異常", e);}}/*** 獲取SRS服務器信息*/public Map<String, Object> getSrsServerInfo() {try {String url = srsApiUrl + "/v1/summaries";ResponseEntity<Map> response = restTemplate.getForEntity(url, Map.class);return response.getBody();} catch (Exception e) {log.error("獲取SRS服務器信息異常", e);return Collections.emptyMap();}}
}
3.6.2 直播回放服務
@Service
@Slf4j
public class LiveRecordingService {@Autowiredprivate LiveRoomMapper liveRoomMapper;@Autowiredprivate LiveRecordingMapper recordingMapper;@Autowiredprivate MinioClient minioClient;@Value("${minio.bucket}")private String minioBucket;@Value("${live.record.save-path}")private String recordSavePath;/*** 開始錄制直播*/@Transactionalpublic LiveRecording startRecording(Long roomId) {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {throw new IllegalArgumentException("直播間不存在或未開播");}// 創建錄制記錄LiveRecording recording = new LiveRecording();recording.setRoomId(roomId);recording.setFileName(liveRoom.getStreamKey() + "_" + System.currentTimeMillis() + ".mp4");recording.setStatus(0);  // 錄制中recording.setStartTime(LocalDateTime.now());recording.setCreatedAt(LocalDateTime.now());recording.setUpdatedAt(LocalDateTime.now());recordingMapper.insert(recording);// 異步啟動錄制進程startRecordingProcess(liveRoom, recording);return recording;}/*** 停止錄制直播*/@Transactionalpublic LiveRecording stopRecording(Long recordingId) {LiveRecording recording = recordingMapper.selectById(recordingId);if (recording == null || recording.getStatus() != 0) {throw new IllegalArgumentException("錄制任務不存在或已結束");}// 更新錄制狀態recording.setStatus(1);  // 錄制完成recording.setEndTime(LocalDateTime.now());recording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(recording);// 異步停止錄制進程并上傳文件stopRecordingProcess(recording);return recording;}/*** 獲取直播回放列表*/public List<LiveRecording> getRecordings(Long roomId, int page, int size) {Page<LiveRecording> pageParam = new Page<>(page, size);QueryWrapper<LiveRecording> queryWrapper = new QueryWrapper<>();queryWrapper.eq("room_id", roomId).eq("status", 3)  // 可用狀態.orderByDesc("start_time");return recordingMapper.selectPage(pageParam, queryWrapper).getRecords();}/*** 啟動錄制進程*/private void startRecordingProcess(LiveRoom liveRoom, LiveRecording recording) {// 使用線程池異步執行CompletableFuture.runAsync(() -> {try {File saveDir = new File(recordSavePath);if (!saveDir.exists()) {saveDir.mkdirs();}String outputPath = recordSavePath + "/" + recording.getFileName();String inputUrl = liveRoom.getFlvUrl();// 使用FFmpeg錄制ProcessBuilder pb = new ProcessBuilder("ffmpeg", "-i", inputUrl,"-c:v", "copy","-c:a", "aac","-strict", "-2",outputPath);Process process = pb.start();// 保存進程ID,以便后續停止String processKey = "live:recording:process:" + recording.getId();Runtime.getRuntime().addShutdownHook(new Thread(() -> {process.destroy();}));log.info("開始錄制直播, roomId={}, recordingId={}", liveRoom.getId(), recording.getId());// 等待進程結束int exitCode = process.waitFor();log.info("錄制進程結束, roomId={}, recordingId={}, exitCode={}", liveRoom.getId(), recording.getId(), exitCode);// 如果是正常結束,則更新狀態并上傳文件if (exitCode == 0) {uploadRecording(recording, new File(outputPath));}} catch (Exception e) {log.error("錄制直播異常", e);// 更新錄制狀態為失敗LiveRecording failedRecording = new LiveRecording();failedRecording.setId(recording.getId());failedRecording.setStatus(4);  // 失敗狀態failedRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(failedRecording);}});}/*** 停止錄制進程*/private void stopRecordingProcess(LiveRecording recording) {// 這里可以實現停止特定的FFmpeg進程// 在實際實現中,需要保存進程ID并通過操作系統命令停止進程log.info("手動停止錄制, recordingId={}", recording.getId());}/*** 上傳錄制文件到MinIO*/private void uploadRecording(LiveRecording recording, File file) {try {// 設置狀態為處理中LiveRecording processingRecording = new LiveRecording();processingRecording.setId(recording.getId());processingRecording.setStatus(2);  // 處理中processingRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(processingRecording);// 獲取文件元數據long fileSize = file.length();// 使用FFmpeg獲取視頻時長String[] cmd = {"ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", file.getAbsolutePath()};Process process = Runtime.getRuntime().exec(cmd);BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));String durationStr = reader.readLine();int duration = (int) Float.parseFloat(durationStr);// 上傳到MinIOString objectName = "recordings/" + recording.getFileName();minioClient.uploadObject(UploadObjectArgs.builder().bucket(minioBucket).object(objectName).filename(file.getAbsolutePath()).contentType("video/mp4").build());// 構建訪問URLString fileUrl = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder().bucket(minioBucket).object(objectName).method(Method.GET).build());// 更新錄制記錄LiveRecording updatedRecording = new LiveRecording();updatedRecording.setId(recording.getId());updatedRecording.setFileUrl(fileUrl);updatedRecording.setFileSize(fileSize);updatedRecording.setDuration(duration);updatedRecording.setStatus(3);  // 可用狀態updatedRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(updatedRecording);log.info("錄制文件上傳完成, recordingId={}, fileSize={}, duration={}s", recording.getId(), fileSize, duration);// 刪除本地文件file.delete();} catch (Exception e) {log.error("上傳錄制文件異常", e);// 更新錄制狀態為失敗LiveRecording failedRecording = new LiveRecording();failedRecording.setId(recording.getId());failedRecording.setStatus(4);  // 失敗狀態failedRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(failedRecording);}}
}

3.7 控制器實現

3.7.1 直播控制器
@RestController
@RequestMapping("/api/live")
@Slf4j
public class LiveController {@Autowiredprivate LiveStreamService liveStreamService;@Autowiredprivate LiveRecordingService recordingService;@Autowiredprivate LiveRoomMapper liveRoomMapper;/*** 創建直播間*/@PostMapping("/room")public ResponseEntity<LiveRoom> createLiveRoom(@RequestBody LiveRoom liveRoom) {try {LiveRoom createdRoom = liveStreamService.createLiveRoom(liveRoom);return ResponseEntity.ok(createdRoom);} catch (Exception e) {log.error("創建直播間異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 獲取直播間詳情*/@GetMapping("/room/{roomId}")public ResponseEntity<LiveRoom> getLiveRoom(@PathVariable Long roomId) {try {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null) {return ResponseEntity.notFound().build();}return ResponseEntity.ok(liveRoom);} catch (Exception e) {log.error("獲取直播間詳情異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 開始直播*/@PostMapping("/room/{roomId}/start")public ResponseEntity<LiveRoom> startLiveStream(@PathVariable Long roomId) {try {LiveRoom liveRoom = liveStreamService.startLiveStream(roomId);return ResponseEntity.ok(liveRoom);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("開始直播異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 結束直播*/@PostMapping("/room/{roomId}/end")public ResponseEntity<LiveRoom> endLiveStream(@PathVariable Long roomId) {try {LiveRoom liveRoom = liveStreamService.endLiveStream(roomId);return ResponseEntity.ok(liveRoom);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("結束直播異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 獲取活躍直播間列表*/@GetMapping("/rooms/active")public ResponseEntity<List<LiveRoom>> getActiveLiveRooms(@RequestParam(defaultValue = "1") int page,@RequestParam(defaultValue = "10") int size) {try {List<LiveRoom> rooms = liveStreamService.getActiveLiveRooms(page, size);return ResponseEntity.ok(rooms);} catch (Exception e) {log.error("獲取活躍直播間列表異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 獲取熱門直播間*/@GetMapping("/rooms/hot")public ResponseEntity<List<LiveRoom>> getHotLiveRooms(@RequestParam(defaultValue = "10") int limit) {try {List<LiveRoom> rooms = liveStreamService.getHotLiveRooms(limit);return ResponseEntity.ok(rooms);} catch (Exception e) {log.error("獲取熱門直播間異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 增加觀看人數*/@PostMapping("/room/{roomId}/view")public ResponseEntity<Void> incrementViewCount(@PathVariable Long roomId) {try {liveStreamService.incrementViewCount(roomId);return ResponseEntity.ok().build();} catch (Exception e) {log.error("增加觀看人數異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 開始錄制直播*/@PostMapping("/room/{roomId}/record/start")public ResponseEntity<LiveRecording> startRecording(@PathVariable Long roomId) {try {LiveRecording recording = recordingService.startRecording(roomId);return ResponseEntity.ok(recording);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("開始錄制直播異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 停止錄制直播*/@PostMapping("/record/{recordingId}/stop")public ResponseEntity<LiveRecording> stopRecording(@PathVariable Long recordingId) {try {LiveRecording recording = recordingService.stopRecording(recordingId);return ResponseEntity.ok(recording);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("停止錄制直播異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 獲取直播回放列表*/@GetMapping("/room/{roomId}/recordings")public ResponseEntity<List<LiveRecording>> getRecordings(@PathVariable Long roomId,@RequestParam(defaultValue = "1") int page,@RequestParam(defaultValue = "10") int size) {try {List<LiveRecording> recordings = recordingService.getRecordings(roomId, page, size);return ResponseEntity.ok(recordings);} catch (Exception e) {log.error("獲取直播回放列表異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}
}
3.7.2 SRS回調控制器
@RestController
@RequestMapping("/api/srs/callback")
@Slf4j
public class SrsCallbackController {@Autowiredprivate LiveStreamService liveStreamService;/*** 處理SRS on_publish回調* 當推流開始時,SRS會調用此接口*/@PostMapping("/on_publish")public ResponseEntity<Map<String, Object>> onPublish(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_publish回調: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();String param = callbackDto.getParam();Map<String, String> paramMap = HttpUtil.decodeParamMap(param, StandardCharsets.UTF_8);String token = paramMap.get("auth_key");String expire = paramMap.get("expire");callbackDto.setToken(token);callbackDto.setExpire(expire);// 驗證推流密鑰boolean valid = liveStreamService.validateStreamKey(callbackDto.getStream(), callbackDto.getToken(), callbackDto.getExpire());if (!valid) {result.put("code", 403);result.put("message", "Forbidden");return ResponseEntity.status(HttpStatus.FORBIDDEN).body(result);}// 處理流發布事件liveStreamService.handleStreamPublish(callbackDto.getApp(), callbackDto.getStream());result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 處理SRS on_unpublish回調* 當推流結束時,SRS會調用此接口*/@PostMapping("/on_unpublish")public ResponseEntity<Map<String, Object>> onUnpublish(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_unpublish回調: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());// 處理流關閉事件liveStreamService.handleStreamClose(callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 處理SRS on_play回調* 當播放流開始時,SRS會調用此接口*/@PostMapping("/on_play")public ResponseEntity<Map<String, Object>> onPlay(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_play回調: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 處理SRS on_stop回調* 當播放流結束時,SRS會調用此接口*/@PostMapping("/on_stop")public ResponseEntity<Map<String, Object>> onStop(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_stop回調: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 處理SRS on_dvr回調* 當DVR錄制文件關閉時,SRS會調用此接口*/@PostMapping("/on_dvr")public ResponseEntity<Map<String, Object>> onDvr(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_dvr回調: app={}, stream={}, file={}", callbackDto.getApp(), callbackDto.getStream(), callbackDto.getFile());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}
}

3.8 WebSocket實現

3.8.1 WebSocket配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {// 啟用簡單的消息代理registry.enableSimpleBroker("/topic");// 設置應用程序前綴registry.setApplicationDestinationPrefixes("/app");}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// 注冊STOMP端點registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();}
}
3.8.2 WebSocket控制器
@Controller
public class LiveChatController {@Autowiredprivate LiveRoomMapper liveRoomMapper;@Autowiredprivate SimpMessagingTemplate messagingTemplate;@Autowiredprivate StringRedisTemplate redisTemplate;/*** 發送聊天消息*/@MessageMapping("/chat/{roomId}")public void sendMessage(@DestinationVariable Long roomId, ChatMessage message) {// 檢查直播間是否存在LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {return;}// 設置消息時間message.setTimestamp(LocalDateTime.now());// 發送消息到訂閱該直播間的所有客戶端messagingTemplate.convertAndSend("/topic/chat/" + roomId, message);}/*** 直播間狀態變更通知*/public void notifyRoomStatusChange(Long roomId, int status) {Map<String, Object> payload = new HashMap<>();payload.put("roomId", roomId);payload.put("status", status);payload.put("timestamp", LocalDateTime.now());messagingTemplate.convertAndSend("/topic/room/" + roomId + "/status", payload);}/*** 發送直播點贊通知*/@MessageMapping("/like/{roomId}")public void sendLike(@DestinationVariable Long roomId, Map<String, Object> payload) {// 檢查直播間是否存在LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {return;}// 增加點贊數String key = "live:room:" + roomId + ":like_count";Long likeCount = redisTemplate.opsForValue().increment(key);// 定期同步到數據庫if (likeCount % 100 == 0) {  // 每100個點贊同步一次LiveRoom room = new LiveRoom();room.setId(roomId);room.setLikeCount(likeCount);liveRoomMapper.updateById(room);}// 添加時間戳payload.put("timestamp", LocalDateTime.now());payload.put("likeCount", likeCount);// 發送點贊通知messagingTemplate.convertAndSend("/topic/room/" + roomId + "/like", payload);}@Datapublic static class ChatMessage {private String username;private String userId;private String content;private String avatar;private LocalDateTime timestamp;}
}

四、SRS服務器配置

SRS (Simple RTMP Server) 是一個優秀的開源流媒體服務器,下面是基本配置文件srs.conf

listen              1935;
max_connections     1000;
daemon              off;
srs_log_tank        file;
srs_log_file        ./objs/srs.log;
http_api {enabled         on;listen          1985;
}
http_server {enabled         on;listen          8080;dir             ./objs/nginx/html;
}
vhost __defaultVhost__ {hls {enabled         on;hls_path        ./objs/nginx/html/hls;hls_fragment    10;hls_window      60;}http_remux {enabled     on;mount       [vhost]/[app]/[stream].flv;}dvr {enabled      on;dvr_path     ./objs/nginx/html/dvr/[app]/[stream].[timestamp].mp4;dvr_plan     segment;dvr_duration 30;}http_hooks {enabled         on;on_publish      http://192.168.195.1:8080/api/srs/callback/on_publish;on_unpublish    http://192.168.195.1:8080/api/srs/callback/on_unpublish;on_play         http://192.168.195.1:8080/api/srs/callback/on_play;on_stop         http://192.168.195.1:8080/api/srs/callback/on_stop;on_dvr          http://192.168.195.1:8080/api/srs/callback/on_dvr;}
}

五、前端播放器實現

5.1 基于Video.js的播放器

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>直播播放器</title><link href="https://vjs.zencdn.net/7.20.3/video-js.css" rel="stylesheet" /><script src="https://vjs.zencdn.net/7.20.3/video.min.js"></script><script src="https://cdn.jsdelivr.net/npm/sockjs-client@1.5.0/dist/sockjs.min.js"></script><script src="https://cdn.jsdelivr.net/npm/stompjs@2.3.3/lib/stomp.min.js"></script><script src="https://cdn.jsdelivr.net/npm/videojs-contrib-hls@5.15.0/dist/videojs-contrib-hls.min.js"></script><script src="https://cdn.jsdelivr.net/npm/flv.js@1.6.2/dist/flv.min.js"></script><script src="https://cdn.jsdelivr.net/npm/videojs-flvjs@0.2.0/dist/videojs-flvjs.min.js"></script><style>.video-container {max-width: 800px;margin: 0 auto;}.video-js {width: 100%;height: 450px;}.room-info {margin-top: 20px;padding: 15px;background-color: #f8f9fa;border-radius: 5px;}.room-title {font-size: 24px;font-weight: bold;margin-bottom: 10px;}.streamer-info {display: flex;align-items: center;margin-bottom: 10px;}.streamer-avatar {width: 40px;height: 40px;border-radius: 50%;margin-right: 10px;}.streamer-name {font-weight: bold;}.room-stats {display: flex;gap: 20px;color: #666;}</style>
</head>
<body>
<div class="video-container"><video id="live-player" class="video-js vjs-default-skin vjs-big-play-centered" controls preload="auto"><p class="vjs-no-js">To view this video please enable JavaScript, and consider upgrading to a web browser that<a href="https://videojs.com/html5-video-support/" target="_blank">supports HTML5 video</a></p></video><div class="room-info"><div class="room-title" id="room-title">直播間標題</div><div class="streamer-info"><img class="streamer-avatar" id="streamer-avatar" src="https://api.dicebear.com/7.x/avataaars/svg?seed=user1" alt="主播頭像"><span class="streamer-name" id="streamer-name">主播昵稱</span></div><div class="room-stats"><div><i class="icon-eye"></i> <span id="view-count">0</span> 觀看</div><div><i class="icon-heart"></i> <span id="like-count">0</span> 點贊</div></div></div>
</div><script>// 獲取URL參數function getQueryParam(name) {const urlParams = new URLSearchParams(window.location.search);return urlParams.get(name);}// 初始化播放器function initPlayer() {const roomId = getQueryParam('roomId');if (!roomId) {alert('請指定直播間ID');return;}// 獲取直播間信息fetch(`/api/live/room/${roomId}`).then(response => response.json()).then(room => {// 更新頁面信息document.getElementById('room-title').textContent = room.title;document.getElementById('streamer-name').textContent = `主播ID: ${room.userId}`;document.getElementById('view-count').textContent = room.viewCount;document.getElementById('like-count').textContent = room.likeCount;// 判斷直播狀態if (room.status !== 1) {alert('直播未開始或已結束');return;}// 創建播放器const player = videojs('live-player', {autoplay: true,liveui: true,controls: true,preload: 'auto',responsive: true,fluid: true,sources: [/*{src: room.flvUrl,type: 'video/x-flv'},*/ {src: room.hlsUrl,type: 'application/x-mpegURL'}]});// 優先使用FLV.js/*if (flvjs.isSupported()) {player.flvjs({mediaDataSource: {type: 'flv',url: room.flvUrl}});}*/// 播放器錯誤處理player.on('error', function() {console.error('播放器錯誤,嘗試切換播放源');// 嘗試切換到HLSplayer.src({src: room.hlsUrl,type: 'application/x-mpegURL'});});// 統計觀看人數fetch(`/api/live/room/${roomId}/view`, {method: 'POST'});// 連接WebSocket接收直播狀態更新connectWebSocket(roomId);}).catch(error => {console.error('獲取直播間信息失敗:', error);alert('獲取直播間信息失敗');});}// 連接WebSocketfunction connectWebSocket(roomId) {const socket = new SockJS('/ws');const stompClient = Stomp.over(socket);stompClient.connect({}, function(frame) {console.log('Connected to WebSocket');// 訂閱直播間狀態變更stompClient.subscribe(`/topic/room/${roomId}/status`, function(message) {const data = JSON.parse(message.body);if (data.status !== 1) {alert('直播已結束');location.reload();}});// 訂閱點贊更新stompClient.subscribe(`/topic/room/${roomId}/like`, function(message) {const data = JSON.parse(message.body);document.getElementById('like-count').textContent = data.likeCount;});});}// 頁面加載完成后初始化document.addEventListener('DOMContentLoaded', initPlayer);
</script>
</body>
</html>

5.2 推流工具選擇

對于主播端推流,可以選擇以下工具:

  1. OBS Studio:開源、功能強大的推流軟件
  2. FFmpeg:命令行工具,適合自動化推流
  3. WebRTC:瀏覽器直接采集和推流(低延遲但兼容性有限)

此處使用OBS Studio

六、性能優化與擴展

6.1 緩存策略

在高并發場景下,合理使用緩存至關重要:

@Configuration
@EnableCaching
public class CacheConfig {@Beanpublic CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(10)).serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())).serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));Map<String, RedisCacheConfiguration> cacheConfigurations = new HashMap<>();// 直播間列表緩存1分鐘cacheConfigurations.put("liveRoomList", config.entryTtl(Duration.ofMinutes(1)));// 直播間詳情緩存5分鐘cacheConfigurations.put("liveRoom", config.entryTtl(Duration.ofMinutes(5)));// 回放列表緩存30分鐘cacheConfigurations.put("recordingList", config.entryTtl(Duration.ofMinutes(30)));return RedisCacheManager.builder(redisConnectionFactory).cacheDefaults(config).withInitialCacheConfigurations(cacheConfigurations).build();}
}

在服務層添加緩存注解:

@Cacheable(value = "liveRoomList", key = "'active_page_' + #page + '_' + #size")
public List<LiveRoom> getActiveLiveRooms(int page, int size) {// 原有實現...
}@Cacheable(value = "liveRoom", key = "#roomId")
public LiveRoom getLiveRoomDetail(Long roomId) {return liveRoomMapper.selectById(roomId);
}@CacheEvict(value = "liveRoom", key = "#roomId")
@Caching(evict = {@CacheEvict(value = "liveRoomList", allEntries = true)
})
public LiveRoom updateLiveRoomStatus(Long roomId, int status) {// 更新邏輯...
}

6.2 負載均衡與集群部署

對于大型直播系統,單機部署無法滿足需求,需要考慮集群部署:

# Docker Compose示例配置
version: '3'services:# 應用服務器集群app1:image: live-streaming-service:latestports:- "8081:8080"environment:- SPRING_PROFILES_ACTIVE=prod- SERVER_PORT=8080- LIVE_SRS_SERVER_URL=rtmp://srs1:1935/livedepends_on:- mysql- redisapp2:image: live-streaming-service:latestports:- "8082:8080"environment:- SPRING_PROFILES_ACTIVE=prod- SERVER_PORT=8080- LIVE_SRS_SERVER_URL=rtmp://srs2:1935/livedepends_on:- mysql- redis# 流媒體服務器集群srs1:image: ossrs/srs:latestports:- "1935:1935"- "1985:1985"- "8080:8080"volumes:- ./srs1.conf:/usr/local/srs/conf/srs.confsrs2:image: ossrs/srs:latestports:- "1936:1935"- "1986:1985"- "8081:8080"volumes:- ./srs2.conf:/usr/local/srs/conf/srs.conf# 負載均衡器nginx:image: nginx:latestports:- "80:80"- "443:443"volumes:- ./nginx/nginx.conf:/etc/nginx/nginx.conf- ./nginx/ssl:/etc/nginx/ssldepends_on:- app1- app2- srs1- srs2# 數據庫和中間件mysql:image: mysql:8.0ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=password- MYSQL_DATABASE=live_streamingvolumes:- mysql-data:/var/lib/mysqlredis:image: redis:6.2ports:- "6379:6379"volumes:- redis-data:/datavolumes:mysql-data:redis-data:

NGINX負載均衡配置:

http {upstream app_servers {# IP哈希負載均衡,確保同一用戶請求發送到同一服務器ip_hash;server app1:8080;server app2:8080;}upstream srs_http_servers {server srs1:8080;server srs2:8080;}server {listen 80;server_name live.example.com;# API請求location /api/ {proxy_pass http://app_servers;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;}# WebSocket連接location /ws {proxy_pass http://app_servers;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}# HLS分發location /hls/ {proxy_pass http://srs_http_servers;proxy_set_header Host $host;}# HTTP-FLV分發location /live/ {proxy_pass http://srs_http_servers;proxy_set_header Host $host;}}
}stream {upstream rtmp_servers {server srs1:1935;server srs2:1935;}server {listen 1935;proxy_pass rtmp_servers;}
}

6.3 流量控制與限流

為防止服務器被惡意攻擊或過載,實現限流機制:

@Configuration
public class RateLimitConfig {@Beanpublic RedisRateLimiter redisRateLimiter(StringRedisTemplate redisTemplate) {return new RedisRateLimiter(redisTemplate);}
}@Component
public class RedisRateLimiter {private final StringRedisTemplate redisTemplate;private final String luaScript;public RedisRateLimiter(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// Lua腳本實現滑動窗口限流this.luaScript = "local key = KEYS[1] " +"local capacity = tonumber(ARGV[1]) " +"local period = tonumber(ARGV[2]) " +"local now = tonumber(ARGV[3]) " +"local requested = tonumber(ARGV[4]) " +// 移除過期的時間戳"redis.call('zremrangebyscore', key, 0, now - period) " +// 獲取當前請求數"local currentCount = redis.call('zcard', key) " +// 如果請求數超過容量,返回0"if currentCount + requested > capacity then " +"    return 0 " +"end " +// 添加新請求的時間戳"for i = 1, requested do " +"    redis.call('zadd', key, now, now .. i) " +"end " +// 設置過期時間"redis.call('expire', key, period) " +// 返回剩余容量"return capacity - currentCount - requested";}/*** 嘗試獲取令牌* @param key 限流鍵* @param capacity 容量* @param period 時間窗口(秒)* @param requested 請求令牌數* @return 是否獲取成功*/public boolean tryAcquire(String key, int capacity, int period, int requested) {long now = System.currentTimeMillis() / 1000;RedisScript<Long> script = RedisScript.of(luaScript, Long.class);Long remainingTokens = redisTemplate.execute(script,Collections.singletonList(key),String.valueOf(capacity),String.valueOf(period),String.valueOf(now),String.valueOf(requested));return remainingTokens != null && remainingTokens >= 0;}
}

使用切面實現API限流:

@Aspect
@Component
public class RateLimitAspect {private final RedisRateLimiter rateLimiter;@Autowiredpublic RateLimitAspect(RedisRateLimiter rateLimiter) {this.rateLimiter = rateLimiter;}@Around("@annotation(rateLimit)")public Object rateLimit(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {// 獲取請求IPHttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();String ip = getClientIp(request);// 構建限流鍵String key = "rate_limit:" + rateLimit.key() + ":" + ip;// 嘗試獲取令牌boolean allowed = rateLimiter.tryAcquire(key, rateLimit.capacity(), rateLimit.period(), 1);if (allowed) {return joinPoint.proceed();} else {throw new TooManyRequestsException("請求過于頻繁,請稍后再試");}}private String getClientIp(HttpServletRequest request) {String ip = request.getHeader("X-Forwarded-For");if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getHeader("Proxy-Client-IP");}if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getHeader("WL-Proxy-Client-IP");}if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getRemoteAddr();}// 取第一個IP地址if (ip != null && ip.contains(",")) {ip = ip.split(",")[0].trim();}return ip;}
}@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {String key();           // 限流鍵int capacity() default 10;  // 容量int period() default 60;    // 時間窗口(秒)
}

在控制器中應用限流注解:

@PostMapping("/room")
@RateLimit(key = "create_room", capacity = 5, period = 3600)  // 每小時限制創建5個直播間
public ResponseEntity<LiveRoom> createLiveRoom(@RequestBody LiveRoom liveRoom) {// 實現...
}@PostMapping("/room/{roomId}/view")
@RateLimit(key = "view_increment", capacity = 1, period = 5)  // 每5秒限制一次
public ResponseEntity<Void> incrementViewCount(@PathVariable Long roomId) {// 實現...
}

6.4 監控與告警

整合Prometheus和Grafana實現監控:

@Configuration
public class MonitoringConfig {@BeanMeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {return registry -> registry.config().commonTags("application", "live-streaming-service");}
}

添加自定義指標:

@Service
public class LiveStreamMonitoringService {private final MeterRegistry meterRegistry;private final Counter liveStartCounter;private final Counter liveEndCounter;private final Gauge activeStreamGauge;private final Timer streamProcessingTimer;@Autowiredpublic LiveStreamMonitoringService(MeterRegistry meterRegistry) {this.meterRegistry = meterRegistry;// 創建指標this.liveStartCounter = Counter.builder("live.stream.start").description("直播開始計數").register(meterRegistry);this.liveEndCounter = Counter.builder("live.stream.end").description("直播結束計數").register(meterRegistry);this.activeStreamGauge = Gauge.builder("live.stream.active", this::getActiveStreamCount).description("當前活躍直播數").register(meterRegistry);this.streamProcessingTimer = Timer.builder("live.stream.processing").description("直播處理時間").register(meterRegistry);}// 獲取活躍直播數private long getActiveStreamCount() {// 從Redis獲取活躍直播間數量return redisTemplate.opsForSet().size("live:active_rooms");}// 記錄直播開始public void recordLiveStart() {liveStartCounter.increment();}// 記錄直播結束public void recordLiveEnd() {liveEndCounter.increment();}// 記錄處理時間public <T> T recordProcessingTime(Supplier<T> supplier) {return streamProcessingTimer.record(supplier);}// 記錄錯誤public void recordError(String errorType) {meterRegistry.counter("live.stream.errors", "type", errorType).increment();}
}

七、安全性考慮

7.1 推流鑒權

前面已經實現了基于時間戳和簽名的推流鑒權。為了增強安全性,可以添加IP白名單:

@Service
public class StreamSecurityService {@Value("${live.security.ip-whitelist-enabled}")private boolean ipWhitelistEnabled;@Value("#{'${live.security.ip-whitelist:}'.split(',')}")private List<String> ipWhitelist;/*** 驗證IP是否在白名單中*/public boolean isIpAllowed(String ip) {if (!ipWhitelistEnabled) {return true;}if (ipWhitelist.isEmpty()) {return true;}return ipWhitelist.contains(ip) || ipWhitelist.contains("0.0.0.0");}/*** 生成帶有過期時間的推流URL*/public String generateSecureStreamUrl(String baseUrl, String streamKey, long expireSeconds) {long expireTimestamp = System.currentTimeMillis() / 1000 + expireSeconds;String authString = streamKey + "-" + expireTimestamp + "-" + authKey;String authToken = DigestUtils.md5DigestAsHex(authString.getBytes());return baseUrl + "?auth_key=" + authToken + "&expire=" + expireTimestamp;}
}

7.2 播放鑒權

同樣可以為播放URL添加鑒權:

@Service
public class PlayAuthService {@Value("${live.play.auth-enabled}")private boolean authEnabled;@Value("${live.play.auth-key}")private String authKey;@Value("${live.play.auth-expire}")private long authExpire;/*** 生成帶鑒權的播放URL*/public String generateAuthPlayUrl(String baseUrl, String streamKey, Long userId) {if (!authEnabled) {return baseUrl;}long expireTimestamp = System.currentTimeMillis() / 1000 + authExpire;String authString = streamKey + "-" + userId + "-" + expireTimestamp + "-" + authKey;String authToken = DigestUtils.md5DigestAsHex(authString.getBytes());return baseUrl + "?auth_key=" + authToken + "&user_id=" + userId + "&expire=" + expireTimestamp;}/*** 驗證播放URL*/public boolean validatePlayUrl(String streamKey, String authToken, String userId, String expireStr) {if (!authEnabled) {return true;}try {long expireTimestamp = Long.parseLong(expireStr);long currentTime = System.currentTimeMillis() / 1000;// 檢查是否過期if (currentTime > expireTimestamp) {return false;}// 驗證tokenString authString = streamKey + "-" + userId + "-" + expireStr + "-" + authKey;String calculatedToken = DigestUtils.md5DigestAsHex(authString.getBytes());return calculatedToken.equals(authToken);} catch (Exception e) {return false;}}
}

7.3 內容安全

對于用戶生成的內容,需要進行審核和過濾:

@Service
public class ContentSecurityService {@Value("${live.content.sensitive-words-file}")private String sensitiveWordsFile;private Set<String> sensitiveWords = new HashSet<>();@PostConstructpublic void init() {// 加載敏感詞庫try {File file = new File(sensitiveWordsFile);if (file.exists()) {List<String> lines = Files.readAllLines(file.toPath());sensitiveWords.addAll(lines);}} catch (IOException e) {// 如果加載失敗,使用默認敏感詞sensitiveWords.add("敏感詞1");sensitiveWords.add("敏感詞2");}}/*** 過濾敏感詞*/public String filterContent(String content) {if (content == null || content.isEmpty()) {return content;}String filteredContent = content;for (String word : sensitiveWords) {filteredContent = filteredContent.replaceAll(word, "***");}return filteredContent;}/*** 檢查內容是否包含敏感詞*/public boolean containsSensitiveWords(String content) {if (content == null || content.isEmpty()) {return false;}for (String word : sensitiveWords) {if (content.contains(word)) {return true;}}return false;}
}

在WebSocket消息處理中使用:

@MessageMapping("/chat/{roomId}")
public void sendMessage(@DestinationVariable Long roomId, ChatMessage message) {// 過濾敏感內容String filteredContent = contentSecurityService.filterContent(message.getContent());message.setContent(filteredContent);// 檢查是否全是敏感詞if (filteredContent.matches("\*+")) {// 記錄違規日志log.warn("用戶發送違規內容: userId={}, content={}", message.getUserId(), message.getContent());return;}// 正常發送消息messagingTemplate.convertAndSend("/topic/chat/" + roomId, message);
}

八、演示說明

為了幫助大家快速理解和測試直播系統,下面提供完整的演示步驟:

8.1 環境準備

首先,需要準備以下環境:

  1. Java 21+ :運行SpringBoot應用
  2. MySQL 5.7+ :存儲直播信息
  3. Redis:緩存和消息通信
  4. SRS:流媒體服務器
  5. FFmpeg:視頻處理工具(錄制直播回放)
  6. OBS Studio:用于測試推流

8.2 安裝SRS

使用Docker可以快速部署SRS服務器:

# 拉取SRS鏡像
docker pull ossrs/srs:4# 運行SRS容器
docker run --name srs -d --restart=always \-p 1935:1935 \-p 1985:1985 \-p 8080:8080 \-v $(pwd)/conf:/usr/local/srs/conf \ossrs/srs:4 objs/srs -c conf/srs.conf

將前面提供的SRS配置文件保存為conf/srs.conf

8.3 直播功能演示流程

8.3.1 創建直播間

通過API創建直播間

curl -X POST http://localhost:8080/api/live/room \-H "Content-Type: application/json" \-d '{"title": "測試直播間","userId": 1,"coverUrl": "https://example.com/cover.jpg"}'

響應示例

{"id": 1,"title": "測試直播間","userId": 1,"coverUrl": "https://example.com/cover.jpg","streamKey": "8f7d6e5c4b3a2f1e0d9c8b7a","streamUrl": "rtmp://localhost:1935/live/8f7d6e5c4b3a2f1e0d9c8b7a","hlsUrl": "http://localhost:8080/hls/8f7d6e5c4b3a2f1e0d9c8b7a.m3u8","flvUrl": "http://localhost:8080/live/8f7d6e5c4b3a2f1e0d9c8b7a.flv","status": 0,"viewCount": 0,"likeCount": 0
}

記錄下streamKeystreamUrl,這將用于推流設置。

8.3.2 推流測試
  1. 打開OBS Studio,設置推流參數:

    • 設置 → 流
    • 服務:自定義
    • 服務器:rtmp://192.168.195.100:1935/live/2feabf98b5ee6bb0c3dbc6cd423084a3?auth_key=9875b710e26914eff9f9aa1cc1df0093&expire=1749220798
    • 流密鑰:剛才獲取的streamKey(例如:8f7d6e5c4b3a2f1e0d9c8b7a)
  2. 添加視頻源(例如攝像頭或屏幕捕獲)

  3. 點擊"開始推流"按鈕開始直播

系統會自動檢測到推流開始,并通過SRS回調更新直播間狀態為"直播中"。

8.3.3 播放測試
  1. 打開前端頁面:http://localhost:8080/play.html?roomId=1
  2. 頁面將自動加載直播流并開始播放
  3. 測試播放:
    • HLS播放:http://192.168.195.100:8080/hls/live/2feabf98b5ee6bb0c3dbc6cd423084a3/2feabf98b5ee6bb0c3dbc6cd423084a3.m3u8
8.3.4 直播互動測試
  1. 多次刷新頁面,查看觀看數是否增加
8.3.5 直播回放測試

開始錄制直播

curl -X POST http://localhost:8080/api/live/room/1/record/start

等待一段時間后,停止錄制

# 使用返回的recordingId
curl -X POST http://localhost:8080/api/live/record/1/stop

查看回放列表

curl http://localhost:8080/api/live/room/1/recordings

使用返回的fileUrl播放回放視頻

8.3.6 結束直播
  1. 在OBS Studio中點擊"停止推流"按鈕
  2. 系統會自動檢測到推流結束,并更新直播間狀態為"直播結束"
  3. 也可以通過API手動結束直播
curl -X POST http://localhost:8080/api/live/room/1/end

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/84299.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/84299.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/84299.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

xcode 各版本真機調試包下載

下載地址 https://github.com/filsv/iOSDeviceSupport 使用方法&#xff1a; 添加到下面路徑中&#xff0c;然后退出重啟xcode /Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/DeviceSupport

DL00871-基于深度學習YOLOv11的盲人障礙物目標檢測含完整數據集

基于深度學習YOLOv11的盲人障礙物目標檢測&#xff1a;開啟盲人出行新紀元 在全球范圍內&#xff0c;盲人及視覺障礙者的出行問題一直是社會關注的重點。盡管技術不斷進步&#xff0c;許多城市的無障礙設施依然未能滿足盲人出行的實際需求。尤其是在復雜的城市環境中&#xff…

Python 訓練 day46

知識點回顧&#xff1a; 不同CNN層的特征圖&#xff1a;不同通道的特征圖什么是注意力&#xff1a;注意力家族&#xff0c;類似于動物園&#xff0c;都是不同的模塊&#xff0c;好不好試了才知道。通道注意力&#xff1a;模型的定義和插入的位置通道注意力后的特征圖和熱力圖 作…

TSN交換機正在重構工業網絡,PROFINET和EtherCAT會被取代嗎?

在工業自動化持續演進的今天&#xff0c;通信網絡的角色正變得愈發關鍵。 2025年6月6日&#xff0c;為期三天的華南國際工業博覽會在深圳國際會展中心&#xff08;寶安&#xff09;圓滿落幕。作為國內工業通信領域的技術型企業&#xff0c;光路科技&#xff08;Fiberroad&…

Qwen系列之Qwen3解讀:最強開源模型的細節拆解

文章目錄 1.1分鐘快覽2.模型架構2.1.Dense模型2.2.MoE模型 3.預訓練階段3.1.數據3.2.訓練3.3.評估 4.后訓練階段S1: 長鏈思維冷啟動S2: 推理強化學習S3: 思考模式融合S4: 通用強化學習 5.全家桶中的小模型訓練評估評估數據集評估細節評估效果弱智評估和民間Arena 分析展望 如果…

yolo模型精度提升策略

總結與行動建議 立即行動&#xff1a; 顯著增加6種相似BGA的高質量、多樣化訓練數據&#xff08;2倍以上是合理起點&#xff09;。 實施針對性數據增強&#xff1a; 設計模擬BGA實際成像挑戰&#xff08;反光、模糊、視角變化&#xff09;的增強方案。 升級模型與損失函數&am…

Kafka主題運維全指南:從基礎配置到故障處理

#作者&#xff1a;張桐瑞 文章目錄 主題日常管理1. 修改主題分區。2. 修改主題級別參數。3. 變更副本數。4. 修改主題限速。5.主題分區遷移。6. 常見主題錯誤處理常見錯誤1&#xff1a;主題刪除失敗。常見錯誤2&#xff1a;__consumer_offsets占用太多的磁盤。 主題日常管理 …

使用Docker部署MySQLRedis容器與常見命令

目錄 1. 檢查WSL配置2. 設置WSL版本3. 拉取MySQL鏡像4. 驗證鏡像5. 運行MySQL容器在WSL環境中使用以下命令啟動MySQL容器查看容器/鏡像的完整信息顯式指定宿主機掛載路徑可選&#xff1a;在Windows的cmd中使用以下命令啟動MySQL容器 6. 管理容器啟動已創建的容器查看運行中的容…

01__C++入門

一、C的語法框架 首先學習一門語言&#xff0c;我們需要了解語言的基本框架&#xff0c;這一小節&#xff0c;我們學習C的歷史應用&#xff0c;c和c的區別和c的標準 二、認識C 1、C的歷史 所有的主流C編譯器都支持這個版本的C&#xff08;1998年的版本&#xff09;。 2、C的應…

2024 CKA題庫+詳盡解析| 15、備份還原Etcd

目錄 免費獲取題庫配套 CKA_v1.31_模擬系統 15、 備份還原Etcd 題目&#xff1a; 開始操作: 1&#xff09;、切換集群 2&#xff09;、登錄master并提權 3&#xff09;、備份Etcd現有數據 4&#xff09;、驗證備份數據快照 5&#xff09;、查看節點和Pod狀態 6&am…

Flotherm許可的并發用戶數限制

在電子產品熱設計領域&#xff0c;Flotherm軟件以其卓越的性能和精確的仿真能力而受到廣大用戶的青睞。然而&#xff0c;在使用Flotherm軟件時&#xff0c;了解其許可的并發用戶數限制對于優化資源配置和提升工作效率至關重要。本文將詳細介紹Flotherm軟件許可的并發用戶數限制…

讀取寶塔方法,查找容別名存放位置

可以查到對應方法 根據參數名可知 查找到 得到位置

【1】跨越技術棧鴻溝:字節跳動開源TRAE AI編程IDE的實戰體驗

2024年初&#xff0c;人工智能編程工具領域發生了一次靜默的變革。當字節跳動宣布退出其TRAE項目&#xff08;一款融合大型語言模型能力的云端AI編程IDE&#xff09;時&#xff0c;技術社區曾短暫嘆息。然而這一退場并非終點——通過開源社區的接力&#xff0c;TRAE在WayToAGI等…

git連接本地倉庫以及gitee

參考:gitee創建新倉庫并上傳代碼_gitee新建倉庫導入代碼-CSDN博客 git初始化以及添加git分支 在idea查看master主分支 報錯 原因gitee推送更新失敗問題記錄&#xff1a;remote: error: hook declined to update refs/heads/master-CSDN博客 取消郵箱暴露

pocketflow庫實現guardrail

目錄 代碼代碼解釋1. 系統架構2. 核心組件詳解2.1 LLM調用函數2.2 UserInputNode&#xff08;用戶輸入節點&#xff09;2.3 GuardrailNode&#xff08;安全防護節點&#xff09;2.4 LLMNode&#xff08;LLM處理節點&#xff09; 3. 流程控制機制 示例運行 代碼 from pocketflo…

Fetch API 使用詳解:Bearer Token 與 localStorage 實踐

Fetch API&#xff1a;現代瀏覽器內置的用于發送 HTTP 請求的 API&#xff0c;Bearer Token&#xff1a;一種基于令牌的身份驗證方案&#xff0c;常用于 JWT 認證&#xff0c;localStorage&#xff1a;瀏覽器提供的持久化存儲方案&#xff0c;用于在客戶端存儲數據。 token是我…

Netty自定義協議解析

目錄 自定義協議設計 實現消息解碼器 實現消息編碼器 自定義消息對象 配置ChannelPipeline Netty提供了強大的編解碼器抽象基類,這些基類能夠幫助開發者快速實現自定義協議的解析。 自定義協議設計 在實現自定義協議解析之前,需要明確協議的具體格式。例如,一個簡單的…

馭碼 CodeRider 2.0 產品體驗:智能研發的革新之旅

馭碼 CodeRider 2.0 產品體驗&#xff1a;智能研發的革新之旅 在當今快速發展的軟件開發領域&#xff0c;研發效率與質量始終是開發者和企業關注的核心。面對開發協作流程繁瑣、代碼生成補全不準、代碼審核低效、知識協同困難以及部署成本與靈活性難以平衡等問題&#xff0c;…

NLP學習路線圖(二十六):自注意力機制

一、為何需要你?序列建模的困境 在你出現之前,循環神經網絡(RNN)及其變種LSTM、GRU是處理序列數據(如文本、語音、時間序列)的主流工具。它們按順序逐個處理輸入元素,將歷史信息壓縮在一個隱藏狀態向量中傳遞。 瓶頸顯現: 長程依賴遺忘: 隨著序列增長,早期信息在傳遞…

【渲染】Unity-分析URP的延遲渲染-DeferredShading

我是一名資深游戲開發&#xff0c;小時候喜歡看十萬個為什么 介紹 本文旨在搞清楚延遲渲染在unity下如何實現的&#xff0c;為自己寫延遲渲染打一個基礎&#xff0c;打開從知到行的大門延遲渲染 輸出物體表面信息(rt1, rt2, rt3, …) 著色(rt1, rt2, rt3, …)研究完感覺核心…