當下直播技術已經成為各類應用不可或缺的一部分,從社交媒體到在線教育,再到電子商務和游戲領域,直播功能正在被廣泛應用。
本文將介紹如何使用SpringBoot框架構建一個直播流推拉系統。
一、直播技術基礎
1.1 推流與拉流概念
直播系統的核心環節包括推流和拉流:
- 推流(Push) : 指主播將采集的音視頻數據通過特定協議發送到流媒體服務器的過程
- 拉流(Pull) : 指觀眾從流媒體服務器獲取音視頻數據并播放的過程
1.2 常用直播協議
市面上主要的直播協議包括:
協議 | 優勢 | 劣勢 | 適用場景 |
---|---|---|---|
RTMP | 低延遲(1-3秒)、成熟穩定 | 基于Flash、需要額外端口(1935) | 主播推流、低延遲直播 |
HLS | 兼容性好、使用HTTP協議 | 延遲高 | 大規模直播分發、移動端 |
WebRTC | 超低延遲(<1秒)、P2P通信 | 穿透復雜網絡困難、部署復雜 | 實時互動、小規模視頻會議 |
HTTP-FLV | 低延遲、兼容性較好 | 不支持可變碼率 | 大規模觀看、延遲敏感場景 |
SRT | 低延遲、高可靠性 | 生態相對較新 | 專業直播、跨國直播 |
1.3 直播系統架構概述
一個完整的直播系統通常包含以下組件:
- 采集端:負責采集、編碼音視頻數據
- 流媒體服務器:處理音視頻流的轉發、轉碼和分發
- CDN:提供內容分發服務,解決大規模用戶訪問問題
- 播放器:解碼并播放音視頻內容
- 信令服務:管理直播間信息、用戶狀態等
二、系統技術設計
2.1 整體架構
核心組件包括:
- API服務層:基于SpringBoot構建的RESTful API,處理直播間管理、用戶認證等
- 媒體服務器集成層:集成開源流媒體服務器(如SRS)
- 流轉發層:處理媒體流的轉發、轉碼和適配
- 存儲層:用于直播回放和點播
- WebSocket服務:提供直播互動功能
2.2 技術選型
- 后端框架:SpringBoot 3
- 流媒體服務器:SRS (Simple RTMP Server)
- 數據庫:MySQL + Redis
- WebSocket:Spring WebSocket
- 存儲系統:MinIO (對象存儲)
- 前端播放器:Video.js、flv.js、hls.js
三、SpringBoot實現直播服務
3.1 項目依賴配置
首先配置pom.xml
文件,引入必要的依賴:
<dependencies><!-- SpringBoot 核心依賴 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- WebSocket 支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><!-- Redis 支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- MySQL 支持 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><!-- MyBatis Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-spring-boot3-starter</artifactId><version>3.5.5</version></dependency><!-- MinIO 客戶端 --><dependency><groupId>io.minio</groupId><artifactId>minio</artifactId><version>8.4.6</version></dependency><!-- HTTP 客戶端 --><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.14</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>
3.2 應用配置
在application.yml
中配置應用參數:
server:port: 8080spring:application:name: live-streaming-service# 數據庫配置datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/live_streaming?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghaiusername: rootpassword: root# Redis配置redis:host: localhostport: 6379database: 0# MyBatis Plus配置
mybatis-plus:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.example.livestream.entityconfiguration:map-underscore-to-camel-case: truelog-impl: org.apache.ibatis.logging.stdout.StdOutImpl# 流媒體服務器配置
live:srs:server-url: rtmp://192.168.195.100:1935/liveapi-url: http://192.168.195.100:1985/apihttp-flv-url: http://192.168.195.100:8080/livehls-url: http://192.168.195.100:8080/live/hlsrecord:save-path: /data/recordpush:key-check-enabled: trueauth-expire: 86400 # 24小時,單位秒auth-key: your_secret_key# MinIO配置
minio:endpoint: http://localhost:9000access-key: minioadminsecret-key: minioadminbucket: live-recordings
3.3 實體類設計
首先定義直播間實體:
@Data
@TableName("live_room")
public class LiveRoom {@TableId(value = "id", type = IdType.AUTO)private Long id;private String title;private String coverUrl;private Long userId;private String streamKey; // 推流密鑰private String streamUrl; // 推流地址private String hlsUrl; // HLS播放地址private String flvUrl; // HTTP-FLV播放地址private Integer status; // 0:未開播 1:直播中 2:直播結束private Long viewCount; // 觀看人數private Long likeCount; // 點贊數private LocalDateTime startTime; // 開播時間private LocalDateTime endTime; // 結束時間private LocalDateTime createdAt;private LocalDateTime updatedAt;
}
直播流信息實體:
@Data
@TableName("live_stream")
public class LiveStream {@TableId(value = "id", type = IdType.AUTO)private Long id;private Long roomId;private String streamId; // 流IDprivate String protocol; // 協議類型:rtmp/hls/flvprivate Integer bitrate; // 碼率private String resolution; // 分辨率private Integer status; // 0:未啟動 1:活躍 2:已結束private LocalDateTime createdAt;private LocalDateTime updatedAt;
}
直播回放實體:
@Data
@TableName("live_recording")
public class LiveRecording {@TableId(value = "id", type = IdType.AUTO)private Long id;private Long roomId;private String fileName;private String fileUrl;private Long fileSize;private Integer duration; // 時長,單位秒private LocalDateTime startTime;private LocalDateTime endTime;private Integer status; // 0:錄制中 1:錄制完成 2:處理中 3:可用 4:刪除private LocalDateTime createdAt;private LocalDateTime updatedAt;
}
3.4 數據庫表設計
根據實體類,創建對應的數據庫表:
-- 直播間表
CREATE TABLE `live_room` (`id` bigint NOT NULL AUTO_INCREMENT,`title` varchar(255) NOT NULL COMMENT '直播標題',`cover_url` varchar(255) DEFAULT NULL COMMENT '封面URL',`user_id` bigint NOT NULL COMMENT '主播用戶ID',`stream_key` varchar(64) NOT NULL COMMENT '推流密鑰',`stream_url` varchar(255) DEFAULT NULL COMMENT '推流地址',`hls_url` varchar(255) DEFAULT NULL COMMENT 'HLS播放地址',`flv_url` varchar(255) DEFAULT NULL COMMENT 'HTTP-FLV播放地址',`status` tinyint NOT NULL DEFAULT '0' COMMENT '狀態:0未開播 1直播中 2直播結束',`view_count` bigint NOT NULL DEFAULT '0' COMMENT '觀看人數',`like_count` bigint NOT NULL DEFAULT '0' COMMENT '點贊數',`start_time` datetime DEFAULT NULL COMMENT '開播時間',`end_time` datetime DEFAULT NULL COMMENT '結束時間',`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uk_stream_key` (`stream_key`),KEY `idx_user_id` (`user_id`),KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播間信息表';-- 直播流表
CREATE TABLE `live_stream` (`id` bigint NOT NULL AUTO_INCREMENT,`room_id` bigint NOT NULL COMMENT '直播間ID',`stream_id` varchar(64) NOT NULL COMMENT '流ID',`protocol` varchar(20) NOT NULL COMMENT '協議類型',`bitrate` int DEFAULT NULL COMMENT '碼率',`resolution` varchar(20) DEFAULT NULL COMMENT '分辨率',`status` tinyint NOT NULL DEFAULT '0' COMMENT '狀態:0未啟動 1活躍 2已結束',`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uk_stream_id` (`stream_id`),KEY `idx_room_id` (`room_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播流信息表';-- 直播回放表
CREATE TABLE `live_recording` (`id` bigint NOT NULL AUTO_INCREMENT,`room_id` bigint NOT NULL COMMENT '直播間ID',`file_name` varchar(255) NOT NULL COMMENT '文件名',`file_url` varchar(255) COMMENT '文件URL',`file_size` bigint DEFAULT NULL COMMENT '文件大小(字節)',`duration` int DEFAULT NULL COMMENT '時長(秒)',`start_time` datetime NOT NULL COMMENT '開始時間',`end_time` datetime DEFAULT NULL COMMENT '結束時間',`status` tinyint NOT NULL DEFAULT '0' COMMENT '狀態:0錄制中 1錄制完成 2處理中 3可用 4刪除',`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),KEY `idx_room_id` (`room_id`),KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播回放表';
3.5 Mapper接口
使用MyBatis-Plus創建Mapper接口:
@Mapper
public interface LiveRoomMapper extends BaseMapper<LiveRoom> {// 自定義查詢方法@Select("SELECT * FROM live_room WHERE status = 1 ORDER BY view_count DESC LIMIT #{limit}")List<LiveRoom> findHotLiveRooms(@Param("limit") int limit);
}@Mapper
public interface LiveStreamMapper extends BaseMapper<LiveStream> {// 基礎CRUD方法由BaseMapper提供
}@Mapper
public interface LiveRecordingMapper extends BaseMapper<LiveRecording> {// 基礎CRUD方法由BaseMapper提供
}
3.6 服務層實現
3.6.1 直播流服務
@Service
@Slf4j
public class LiveStreamService {@Autowiredprivate LiveRoomMapper liveRoomMapper;@Autowiredprivate LiveStreamMapper liveStreamMapper;@Autowiredprivate StringRedisTemplate redisTemplate;@Value("${live.srs.server-url}")private String srsServerUrl;@Value("${live.srs.api-url}")private String srsApiUrl;@Value("${live.srs.http-flv-url}")private String httpFlvUrl;@Value("${live.srs.hls-url}")private String hlsUrl;@Value("${live.push.key-check-enabled}")private boolean keyCheckEnabled;@Value("${live.push.auth-expire}")private long authExpire;@Value("${live.push.auth-key}")private String authKey;private RestTemplate restTemplate = new RestTemplate();/*** 創建直播間*/@Transactionalpublic LiveRoom createLiveRoom(LiveRoom liveRoom) {// 生成推流密鑰String streamKey = generateStreamKey(liveRoom.getUserId());liveRoom.setStreamKey(streamKey);// 構建推流地址String pushUrl = buildPushUrl(streamKey);liveRoom.setStreamUrl(pushUrl);// 構建播放地址liveRoom.setHlsUrl(hlsUrl + "/" + streamKey + ".m3u8");liveRoom.setFlvUrl(httpFlvUrl + "/" + streamKey + ".flv");// 設置初始狀態liveRoom.setStatus(0);liveRoom.setViewCount(0L);liveRoom.setLikeCount(0L);liveRoom.setCreatedAt(LocalDateTime.now());liveRoom.setUpdatedAt(LocalDateTime.now());// 保存到數據庫liveRoomMapper.insert(liveRoom);return liveRoom;}/*** 生成推流密鑰*/private String generateStreamKey(Long userId) {// 生成基于用戶ID和時間戳的唯一密鑰String baseKey = userId + "_" + System.currentTimeMillis();return DigestUtils.md5DigestAsHex(baseKey.getBytes());}/*** 構建推流地址*/private String buildPushUrl(String streamKey) {StringBuilder sb = new StringBuilder(srsServerUrl);sb.append("/").append(streamKey);// 如果啟用了推流驗證if (keyCheckEnabled) {long expireTimestamp = System.currentTimeMillis() / 1000 + authExpire;String authString = streamKey + "-" + expireTimestamp + "-" + authKey;String authToken = DigestUtils.md5DigestAsHex(authString.getBytes());sb.append("?auth_key=").append(authToken).append("&expire=").append(expireTimestamp);}return sb.toString();}/*** 開始直播*/@Transactionalpublic LiveRoom startLiveStream(Long roomId) {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null) {throw new IllegalArgumentException("直播間不存在");}// 更新直播間狀態為直播中liveRoom.setStatus(1);liveRoom.setStartTime(LocalDateTime.now());liveRoomMapper.updateById(liveRoom);// 創建直播流記錄LiveStream liveStream = new LiveStream();liveStream.setRoomId(roomId);liveStream.setStreamId(liveRoom.getStreamKey());liveStream.setProtocol("rtmp");liveStream.setStatus(1);liveStream.setCreatedAt(LocalDateTime.now());liveStream.setUpdatedAt(LocalDateTime.now());liveStreamMapper.insert(liveStream);// 更新Redis緩存中的活躍直播間redisTemplate.opsForSet().add("live:active_rooms", String.valueOf(roomId));return liveRoom;}/*** 結束直播*/@Transactionalpublic LiveRoom endLiveStream(Long roomId) {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {throw new IllegalArgumentException("直播間不存在或未開播");}// 更新直播間狀態為已結束liveRoom.setStatus(2);liveRoom.setEndTime(LocalDateTime.now());liveRoomMapper.updateById(liveRoom);// 更新直播流狀態QueryWrapper<LiveStream> queryWrapper = new QueryWrapper<>();queryWrapper.eq("room_id", roomId).eq("status", 1);LiveStream liveStream = liveStreamMapper.selectOne(queryWrapper);if (liveStream != null) {liveStream.setStatus(2);liveStream.setUpdatedAt(LocalDateTime.now());liveStreamMapper.updateById(liveStream);}// 從Redis中移除活躍直播間redisTemplate.opsForSet().remove("live:active_rooms", String.valueOf(roomId));return liveRoom;}/*** 獲取當前活躍的直播間列表*/public List<LiveRoom> getActiveLiveRooms(int page, int size) {Page<LiveRoom> pageParam = new Page<>(page, size);QueryWrapper<LiveRoom> queryWrapper = new QueryWrapper<>();queryWrapper.eq("status", 1).orderByDesc("view_count");return liveRoomMapper.selectPage(pageParam, queryWrapper).getRecords();}/*** 獲取熱門直播間*/public List<LiveRoom> getHotLiveRooms(int limit) {return liveRoomMapper.findHotLiveRooms(limit);}/*** 增加直播間觀看人數*/public void incrementViewCount(Long roomId) {// 使用Redis進行計數String key = "live:room:" + roomId + ":view_count";redisTemplate.opsForValue().increment(key);// 定期同步到數據庫if (Math.random() < 0.1) { // 10%概率同步,減少數據庫壓力String countStr = redisTemplate.opsForValue().get(key);if (countStr != null) {long count = Long.parseLong(countStr);LiveRoom room = new LiveRoom();room.setId(roomId);room.setViewCount(count);liveRoomMapper.updateById(room);}}}/*** 校驗推流密鑰*/public boolean validateStreamKey(String streamKey, String token, String expire) {if (!keyCheckEnabled) {return true;}try {long expireTimestamp = Long.parseLong(expire);long currentTime = System.currentTimeMillis() / 1000;// 檢查是否過期if (currentTime > expireTimestamp) {return false;}// 驗證tokenString authString = streamKey + "-" + expire + "-" + authKey;String calculatedToken = DigestUtils.md5DigestAsHex(authString.getBytes());return calculatedToken.equals(token);} catch (Exception e) {log.error("驗證推流密鑰異常", e);return false;}}/*** 處理SRS回調 - 流發布*/public void handleStreamPublish(String app, String stream) {try {// 查找對應的直播間QueryWrapper<LiveRoom> queryWrapper = new QueryWrapper<>();queryWrapper.eq("stream_key", stream);LiveRoom liveRoom = liveRoomMapper.selectOne(queryWrapper);if (liveRoom != null && liveRoom.getStatus() == 0) {// 更新直播間狀態startLiveStream(liveRoom.getId());log.info("直播流發布成功: app={}, stream={}, roomId={}", app, stream, liveRoom.getId());}} catch (Exception e) {log.error("處理流發布回調異常", e);}}/*** 處理SRS回調 - 流關閉*/public void handleStreamClose(String app, String stream) {try {// 查找對應的直播間QueryWrapper<LiveRoom> queryWrapper = new QueryWrapper<>();queryWrapper.eq("stream_key", stream);LiveRoom liveRoom = liveRoomMapper.selectOne(queryWrapper);if (liveRoom != null && liveRoom.getStatus() == 1) {// 更新直播間狀態endLiveStream(liveRoom.getId());log.info("直播流關閉: app={}, stream={}, roomId={}", app, stream, liveRoom.getId());}} catch (Exception e) {log.error("處理流關閉回調異常", e);}}/*** 獲取SRS服務器信息*/public Map<String, Object> getSrsServerInfo() {try {String url = srsApiUrl + "/v1/summaries";ResponseEntity<Map> response = restTemplate.getForEntity(url, Map.class);return response.getBody();} catch (Exception e) {log.error("獲取SRS服務器信息異常", e);return Collections.emptyMap();}}
}
3.6.2 直播回放服務
@Service
@Slf4j
public class LiveRecordingService {@Autowiredprivate LiveRoomMapper liveRoomMapper;@Autowiredprivate LiveRecordingMapper recordingMapper;@Autowiredprivate MinioClient minioClient;@Value("${minio.bucket}")private String minioBucket;@Value("${live.record.save-path}")private String recordSavePath;/*** 開始錄制直播*/@Transactionalpublic LiveRecording startRecording(Long roomId) {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {throw new IllegalArgumentException("直播間不存在或未開播");}// 創建錄制記錄LiveRecording recording = new LiveRecording();recording.setRoomId(roomId);recording.setFileName(liveRoom.getStreamKey() + "_" + System.currentTimeMillis() + ".mp4");recording.setStatus(0); // 錄制中recording.setStartTime(LocalDateTime.now());recording.setCreatedAt(LocalDateTime.now());recording.setUpdatedAt(LocalDateTime.now());recordingMapper.insert(recording);// 異步啟動錄制進程startRecordingProcess(liveRoom, recording);return recording;}/*** 停止錄制直播*/@Transactionalpublic LiveRecording stopRecording(Long recordingId) {LiveRecording recording = recordingMapper.selectById(recordingId);if (recording == null || recording.getStatus() != 0) {throw new IllegalArgumentException("錄制任務不存在或已結束");}// 更新錄制狀態recording.setStatus(1); // 錄制完成recording.setEndTime(LocalDateTime.now());recording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(recording);// 異步停止錄制進程并上傳文件stopRecordingProcess(recording);return recording;}/*** 獲取直播回放列表*/public List<LiveRecording> getRecordings(Long roomId, int page, int size) {Page<LiveRecording> pageParam = new Page<>(page, size);QueryWrapper<LiveRecording> queryWrapper = new QueryWrapper<>();queryWrapper.eq("room_id", roomId).eq("status", 3) // 可用狀態.orderByDesc("start_time");return recordingMapper.selectPage(pageParam, queryWrapper).getRecords();}/*** 啟動錄制進程*/private void startRecordingProcess(LiveRoom liveRoom, LiveRecording recording) {// 使用線程池異步執行CompletableFuture.runAsync(() -> {try {File saveDir = new File(recordSavePath);if (!saveDir.exists()) {saveDir.mkdirs();}String outputPath = recordSavePath + "/" + recording.getFileName();String inputUrl = liveRoom.getFlvUrl();// 使用FFmpeg錄制ProcessBuilder pb = new ProcessBuilder("ffmpeg", "-i", inputUrl,"-c:v", "copy","-c:a", "aac","-strict", "-2",outputPath);Process process = pb.start();// 保存進程ID,以便后續停止String processKey = "live:recording:process:" + recording.getId();Runtime.getRuntime().addShutdownHook(new Thread(() -> {process.destroy();}));log.info("開始錄制直播, roomId={}, recordingId={}", liveRoom.getId(), recording.getId());// 等待進程結束int exitCode = process.waitFor();log.info("錄制進程結束, roomId={}, recordingId={}, exitCode={}", liveRoom.getId(), recording.getId(), exitCode);// 如果是正常結束,則更新狀態并上傳文件if (exitCode == 0) {uploadRecording(recording, new File(outputPath));}} catch (Exception e) {log.error("錄制直播異常", e);// 更新錄制狀態為失敗LiveRecording failedRecording = new LiveRecording();failedRecording.setId(recording.getId());failedRecording.setStatus(4); // 失敗狀態failedRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(failedRecording);}});}/*** 停止錄制進程*/private void stopRecordingProcess(LiveRecording recording) {// 這里可以實現停止特定的FFmpeg進程// 在實際實現中,需要保存進程ID并通過操作系統命令停止進程log.info("手動停止錄制, recordingId={}", recording.getId());}/*** 上傳錄制文件到MinIO*/private void uploadRecording(LiveRecording recording, File file) {try {// 設置狀態為處理中LiveRecording processingRecording = new LiveRecording();processingRecording.setId(recording.getId());processingRecording.setStatus(2); // 處理中processingRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(processingRecording);// 獲取文件元數據long fileSize = file.length();// 使用FFmpeg獲取視頻時長String[] cmd = {"ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", file.getAbsolutePath()};Process process = Runtime.getRuntime().exec(cmd);BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));String durationStr = reader.readLine();int duration = (int) Float.parseFloat(durationStr);// 上傳到MinIOString objectName = "recordings/" + recording.getFileName();minioClient.uploadObject(UploadObjectArgs.builder().bucket(minioBucket).object(objectName).filename(file.getAbsolutePath()).contentType("video/mp4").build());// 構建訪問URLString fileUrl = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder().bucket(minioBucket).object(objectName).method(Method.GET).build());// 更新錄制記錄LiveRecording updatedRecording = new LiveRecording();updatedRecording.setId(recording.getId());updatedRecording.setFileUrl(fileUrl);updatedRecording.setFileSize(fileSize);updatedRecording.setDuration(duration);updatedRecording.setStatus(3); // 可用狀態updatedRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(updatedRecording);log.info("錄制文件上傳完成, recordingId={}, fileSize={}, duration={}s", recording.getId(), fileSize, duration);// 刪除本地文件file.delete();} catch (Exception e) {log.error("上傳錄制文件異常", e);// 更新錄制狀態為失敗LiveRecording failedRecording = new LiveRecording();failedRecording.setId(recording.getId());failedRecording.setStatus(4); // 失敗狀態failedRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(failedRecording);}}
}
3.7 控制器實現
3.7.1 直播控制器
@RestController
@RequestMapping("/api/live")
@Slf4j
public class LiveController {@Autowiredprivate LiveStreamService liveStreamService;@Autowiredprivate LiveRecordingService recordingService;@Autowiredprivate LiveRoomMapper liveRoomMapper;/*** 創建直播間*/@PostMapping("/room")public ResponseEntity<LiveRoom> createLiveRoom(@RequestBody LiveRoom liveRoom) {try {LiveRoom createdRoom = liveStreamService.createLiveRoom(liveRoom);return ResponseEntity.ok(createdRoom);} catch (Exception e) {log.error("創建直播間異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 獲取直播間詳情*/@GetMapping("/room/{roomId}")public ResponseEntity<LiveRoom> getLiveRoom(@PathVariable Long roomId) {try {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null) {return ResponseEntity.notFound().build();}return ResponseEntity.ok(liveRoom);} catch (Exception e) {log.error("獲取直播間詳情異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 開始直播*/@PostMapping("/room/{roomId}/start")public ResponseEntity<LiveRoom> startLiveStream(@PathVariable Long roomId) {try {LiveRoom liveRoom = liveStreamService.startLiveStream(roomId);return ResponseEntity.ok(liveRoom);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("開始直播異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 結束直播*/@PostMapping("/room/{roomId}/end")public ResponseEntity<LiveRoom> endLiveStream(@PathVariable Long roomId) {try {LiveRoom liveRoom = liveStreamService.endLiveStream(roomId);return ResponseEntity.ok(liveRoom);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("結束直播異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 獲取活躍直播間列表*/@GetMapping("/rooms/active")public ResponseEntity<List<LiveRoom>> getActiveLiveRooms(@RequestParam(defaultValue = "1") int page,@RequestParam(defaultValue = "10") int size) {try {List<LiveRoom> rooms = liveStreamService.getActiveLiveRooms(page, size);return ResponseEntity.ok(rooms);} catch (Exception e) {log.error("獲取活躍直播間列表異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 獲取熱門直播間*/@GetMapping("/rooms/hot")public ResponseEntity<List<LiveRoom>> getHotLiveRooms(@RequestParam(defaultValue = "10") int limit) {try {List<LiveRoom> rooms = liveStreamService.getHotLiveRooms(limit);return ResponseEntity.ok(rooms);} catch (Exception e) {log.error("獲取熱門直播間異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 增加觀看人數*/@PostMapping("/room/{roomId}/view")public ResponseEntity<Void> incrementViewCount(@PathVariable Long roomId) {try {liveStreamService.incrementViewCount(roomId);return ResponseEntity.ok().build();} catch (Exception e) {log.error("增加觀看人數異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 開始錄制直播*/@PostMapping("/room/{roomId}/record/start")public ResponseEntity<LiveRecording> startRecording(@PathVariable Long roomId) {try {LiveRecording recording = recordingService.startRecording(roomId);return ResponseEntity.ok(recording);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("開始錄制直播異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 停止錄制直播*/@PostMapping("/record/{recordingId}/stop")public ResponseEntity<LiveRecording> stopRecording(@PathVariable Long recordingId) {try {LiveRecording recording = recordingService.stopRecording(recordingId);return ResponseEntity.ok(recording);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("停止錄制直播異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 獲取直播回放列表*/@GetMapping("/room/{roomId}/recordings")public ResponseEntity<List<LiveRecording>> getRecordings(@PathVariable Long roomId,@RequestParam(defaultValue = "1") int page,@RequestParam(defaultValue = "10") int size) {try {List<LiveRecording> recordings = recordingService.getRecordings(roomId, page, size);return ResponseEntity.ok(recordings);} catch (Exception e) {log.error("獲取直播回放列表異常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}
}
3.7.2 SRS回調控制器
@RestController
@RequestMapping("/api/srs/callback")
@Slf4j
public class SrsCallbackController {@Autowiredprivate LiveStreamService liveStreamService;/*** 處理SRS on_publish回調* 當推流開始時,SRS會調用此接口*/@PostMapping("/on_publish")public ResponseEntity<Map<String, Object>> onPublish(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_publish回調: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();String param = callbackDto.getParam();Map<String, String> paramMap = HttpUtil.decodeParamMap(param, StandardCharsets.UTF_8);String token = paramMap.get("auth_key");String expire = paramMap.get("expire");callbackDto.setToken(token);callbackDto.setExpire(expire);// 驗證推流密鑰boolean valid = liveStreamService.validateStreamKey(callbackDto.getStream(), callbackDto.getToken(), callbackDto.getExpire());if (!valid) {result.put("code", 403);result.put("message", "Forbidden");return ResponseEntity.status(HttpStatus.FORBIDDEN).body(result);}// 處理流發布事件liveStreamService.handleStreamPublish(callbackDto.getApp(), callbackDto.getStream());result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 處理SRS on_unpublish回調* 當推流結束時,SRS會調用此接口*/@PostMapping("/on_unpublish")public ResponseEntity<Map<String, Object>> onUnpublish(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_unpublish回調: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());// 處理流關閉事件liveStreamService.handleStreamClose(callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 處理SRS on_play回調* 當播放流開始時,SRS會調用此接口*/@PostMapping("/on_play")public ResponseEntity<Map<String, Object>> onPlay(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_play回調: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 處理SRS on_stop回調* 當播放流結束時,SRS會調用此接口*/@PostMapping("/on_stop")public ResponseEntity<Map<String, Object>> onStop(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_stop回調: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 處理SRS on_dvr回調* 當DVR錄制文件關閉時,SRS會調用此接口*/@PostMapping("/on_dvr")public ResponseEntity<Map<String, Object>> onDvr(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_dvr回調: app={}, stream={}, file={}", callbackDto.getApp(), callbackDto.getStream(), callbackDto.getFile());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}
}
3.8 WebSocket實現
3.8.1 WebSocket配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {// 啟用簡單的消息代理registry.enableSimpleBroker("/topic");// 設置應用程序前綴registry.setApplicationDestinationPrefixes("/app");}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// 注冊STOMP端點registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();}
}
3.8.2 WebSocket控制器
@Controller
public class LiveChatController {@Autowiredprivate LiveRoomMapper liveRoomMapper;@Autowiredprivate SimpMessagingTemplate messagingTemplate;@Autowiredprivate StringRedisTemplate redisTemplate;/*** 發送聊天消息*/@MessageMapping("/chat/{roomId}")public void sendMessage(@DestinationVariable Long roomId, ChatMessage message) {// 檢查直播間是否存在LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {return;}// 設置消息時間message.setTimestamp(LocalDateTime.now());// 發送消息到訂閱該直播間的所有客戶端messagingTemplate.convertAndSend("/topic/chat/" + roomId, message);}/*** 直播間狀態變更通知*/public void notifyRoomStatusChange(Long roomId, int status) {Map<String, Object> payload = new HashMap<>();payload.put("roomId", roomId);payload.put("status", status);payload.put("timestamp", LocalDateTime.now());messagingTemplate.convertAndSend("/topic/room/" + roomId + "/status", payload);}/*** 發送直播點贊通知*/@MessageMapping("/like/{roomId}")public void sendLike(@DestinationVariable Long roomId, Map<String, Object> payload) {// 檢查直播間是否存在LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {return;}// 增加點贊數String key = "live:room:" + roomId + ":like_count";Long likeCount = redisTemplate.opsForValue().increment(key);// 定期同步到數據庫if (likeCount % 100 == 0) { // 每100個點贊同步一次LiveRoom room = new LiveRoom();room.setId(roomId);room.setLikeCount(likeCount);liveRoomMapper.updateById(room);}// 添加時間戳payload.put("timestamp", LocalDateTime.now());payload.put("likeCount", likeCount);// 發送點贊通知messagingTemplate.convertAndSend("/topic/room/" + roomId + "/like", payload);}@Datapublic static class ChatMessage {private String username;private String userId;private String content;private String avatar;private LocalDateTime timestamp;}
}
四、SRS服務器配置
SRS (Simple RTMP Server) 是一個優秀的開源流媒體服務器,下面是基本配置文件srs.conf
:
listen 1935;
max_connections 1000;
daemon off;
srs_log_tank file;
srs_log_file ./objs/srs.log;
http_api {enabled on;listen 1985;
}
http_server {enabled on;listen 8080;dir ./objs/nginx/html;
}
vhost __defaultVhost__ {hls {enabled on;hls_path ./objs/nginx/html/hls;hls_fragment 10;hls_window 60;}http_remux {enabled on;mount [vhost]/[app]/[stream].flv;}dvr {enabled on;dvr_path ./objs/nginx/html/dvr/[app]/[stream].[timestamp].mp4;dvr_plan segment;dvr_duration 30;}http_hooks {enabled on;on_publish http://192.168.195.1:8080/api/srs/callback/on_publish;on_unpublish http://192.168.195.1:8080/api/srs/callback/on_unpublish;on_play http://192.168.195.1:8080/api/srs/callback/on_play;on_stop http://192.168.195.1:8080/api/srs/callback/on_stop;on_dvr http://192.168.195.1:8080/api/srs/callback/on_dvr;}
}
五、前端播放器實現
5.1 基于Video.js的播放器
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>直播播放器</title><link href="https://vjs.zencdn.net/7.20.3/video-js.css" rel="stylesheet" /><script src="https://vjs.zencdn.net/7.20.3/video.min.js"></script><script src="https://cdn.jsdelivr.net/npm/sockjs-client@1.5.0/dist/sockjs.min.js"></script><script src="https://cdn.jsdelivr.net/npm/stompjs@2.3.3/lib/stomp.min.js"></script><script src="https://cdn.jsdelivr.net/npm/videojs-contrib-hls@5.15.0/dist/videojs-contrib-hls.min.js"></script><script src="https://cdn.jsdelivr.net/npm/flv.js@1.6.2/dist/flv.min.js"></script><script src="https://cdn.jsdelivr.net/npm/videojs-flvjs@0.2.0/dist/videojs-flvjs.min.js"></script><style>.video-container {max-width: 800px;margin: 0 auto;}.video-js {width: 100%;height: 450px;}.room-info {margin-top: 20px;padding: 15px;background-color: #f8f9fa;border-radius: 5px;}.room-title {font-size: 24px;font-weight: bold;margin-bottom: 10px;}.streamer-info {display: flex;align-items: center;margin-bottom: 10px;}.streamer-avatar {width: 40px;height: 40px;border-radius: 50%;margin-right: 10px;}.streamer-name {font-weight: bold;}.room-stats {display: flex;gap: 20px;color: #666;}</style>
</head>
<body>
<div class="video-container"><video id="live-player" class="video-js vjs-default-skin vjs-big-play-centered" controls preload="auto"><p class="vjs-no-js">To view this video please enable JavaScript, and consider upgrading to a web browser that<a href="https://videojs.com/html5-video-support/" target="_blank">supports HTML5 video</a></p></video><div class="room-info"><div class="room-title" id="room-title">直播間標題</div><div class="streamer-info"><img class="streamer-avatar" id="streamer-avatar" src="https://api.dicebear.com/7.x/avataaars/svg?seed=user1" alt="主播頭像"><span class="streamer-name" id="streamer-name">主播昵稱</span></div><div class="room-stats"><div><i class="icon-eye"></i> <span id="view-count">0</span> 觀看</div><div><i class="icon-heart"></i> <span id="like-count">0</span> 點贊</div></div></div>
</div><script>// 獲取URL參數function getQueryParam(name) {const urlParams = new URLSearchParams(window.location.search);return urlParams.get(name);}// 初始化播放器function initPlayer() {const roomId = getQueryParam('roomId');if (!roomId) {alert('請指定直播間ID');return;}// 獲取直播間信息fetch(`/api/live/room/${roomId}`).then(response => response.json()).then(room => {// 更新頁面信息document.getElementById('room-title').textContent = room.title;document.getElementById('streamer-name').textContent = `主播ID: ${room.userId}`;document.getElementById('view-count').textContent = room.viewCount;document.getElementById('like-count').textContent = room.likeCount;// 判斷直播狀態if (room.status !== 1) {alert('直播未開始或已結束');return;}// 創建播放器const player = videojs('live-player', {autoplay: true,liveui: true,controls: true,preload: 'auto',responsive: true,fluid: true,sources: [/*{src: room.flvUrl,type: 'video/x-flv'},*/ {src: room.hlsUrl,type: 'application/x-mpegURL'}]});// 優先使用FLV.js/*if (flvjs.isSupported()) {player.flvjs({mediaDataSource: {type: 'flv',url: room.flvUrl}});}*/// 播放器錯誤處理player.on('error', function() {console.error('播放器錯誤,嘗試切換播放源');// 嘗試切換到HLSplayer.src({src: room.hlsUrl,type: 'application/x-mpegURL'});});// 統計觀看人數fetch(`/api/live/room/${roomId}/view`, {method: 'POST'});// 連接WebSocket接收直播狀態更新connectWebSocket(roomId);}).catch(error => {console.error('獲取直播間信息失敗:', error);alert('獲取直播間信息失敗');});}// 連接WebSocketfunction connectWebSocket(roomId) {const socket = new SockJS('/ws');const stompClient = Stomp.over(socket);stompClient.connect({}, function(frame) {console.log('Connected to WebSocket');// 訂閱直播間狀態變更stompClient.subscribe(`/topic/room/${roomId}/status`, function(message) {const data = JSON.parse(message.body);if (data.status !== 1) {alert('直播已結束');location.reload();}});// 訂閱點贊更新stompClient.subscribe(`/topic/room/${roomId}/like`, function(message) {const data = JSON.parse(message.body);document.getElementById('like-count').textContent = data.likeCount;});});}// 頁面加載完成后初始化document.addEventListener('DOMContentLoaded', initPlayer);
</script>
</body>
</html>
5.2 推流工具選擇
對于主播端推流,可以選擇以下工具:
- OBS Studio:開源、功能強大的推流軟件
- FFmpeg:命令行工具,適合自動化推流
- WebRTC:瀏覽器直接采集和推流(低延遲但兼容性有限)
此處使用OBS Studio
六、性能優化與擴展
6.1 緩存策略
在高并發場景下,合理使用緩存至關重要:
@Configuration
@EnableCaching
public class CacheConfig {@Beanpublic CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(10)).serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())).serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));Map<String, RedisCacheConfiguration> cacheConfigurations = new HashMap<>();// 直播間列表緩存1分鐘cacheConfigurations.put("liveRoomList", config.entryTtl(Duration.ofMinutes(1)));// 直播間詳情緩存5分鐘cacheConfigurations.put("liveRoom", config.entryTtl(Duration.ofMinutes(5)));// 回放列表緩存30分鐘cacheConfigurations.put("recordingList", config.entryTtl(Duration.ofMinutes(30)));return RedisCacheManager.builder(redisConnectionFactory).cacheDefaults(config).withInitialCacheConfigurations(cacheConfigurations).build();}
}
在服務層添加緩存注解:
@Cacheable(value = "liveRoomList", key = "'active_page_' + #page + '_' + #size")
public List<LiveRoom> getActiveLiveRooms(int page, int size) {// 原有實現...
}@Cacheable(value = "liveRoom", key = "#roomId")
public LiveRoom getLiveRoomDetail(Long roomId) {return liveRoomMapper.selectById(roomId);
}@CacheEvict(value = "liveRoom", key = "#roomId")
@Caching(evict = {@CacheEvict(value = "liveRoomList", allEntries = true)
})
public LiveRoom updateLiveRoomStatus(Long roomId, int status) {// 更新邏輯...
}
6.2 負載均衡與集群部署
對于大型直播系統,單機部署無法滿足需求,需要考慮集群部署:
# Docker Compose示例配置
version: '3'services:# 應用服務器集群app1:image: live-streaming-service:latestports:- "8081:8080"environment:- SPRING_PROFILES_ACTIVE=prod- SERVER_PORT=8080- LIVE_SRS_SERVER_URL=rtmp://srs1:1935/livedepends_on:- mysql- redisapp2:image: live-streaming-service:latestports:- "8082:8080"environment:- SPRING_PROFILES_ACTIVE=prod- SERVER_PORT=8080- LIVE_SRS_SERVER_URL=rtmp://srs2:1935/livedepends_on:- mysql- redis# 流媒體服務器集群srs1:image: ossrs/srs:latestports:- "1935:1935"- "1985:1985"- "8080:8080"volumes:- ./srs1.conf:/usr/local/srs/conf/srs.confsrs2:image: ossrs/srs:latestports:- "1936:1935"- "1986:1985"- "8081:8080"volumes:- ./srs2.conf:/usr/local/srs/conf/srs.conf# 負載均衡器nginx:image: nginx:latestports:- "80:80"- "443:443"volumes:- ./nginx/nginx.conf:/etc/nginx/nginx.conf- ./nginx/ssl:/etc/nginx/ssldepends_on:- app1- app2- srs1- srs2# 數據庫和中間件mysql:image: mysql:8.0ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=password- MYSQL_DATABASE=live_streamingvolumes:- mysql-data:/var/lib/mysqlredis:image: redis:6.2ports:- "6379:6379"volumes:- redis-data:/datavolumes:mysql-data:redis-data:
NGINX負載均衡配置:
http {upstream app_servers {# IP哈希負載均衡,確保同一用戶請求發送到同一服務器ip_hash;server app1:8080;server app2:8080;}upstream srs_http_servers {server srs1:8080;server srs2:8080;}server {listen 80;server_name live.example.com;# API請求location /api/ {proxy_pass http://app_servers;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;}# WebSocket連接location /ws {proxy_pass http://app_servers;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}# HLS分發location /hls/ {proxy_pass http://srs_http_servers;proxy_set_header Host $host;}# HTTP-FLV分發location /live/ {proxy_pass http://srs_http_servers;proxy_set_header Host $host;}}
}stream {upstream rtmp_servers {server srs1:1935;server srs2:1935;}server {listen 1935;proxy_pass rtmp_servers;}
}
6.3 流量控制與限流
為防止服務器被惡意攻擊或過載,實現限流機制:
@Configuration
public class RateLimitConfig {@Beanpublic RedisRateLimiter redisRateLimiter(StringRedisTemplate redisTemplate) {return new RedisRateLimiter(redisTemplate);}
}@Component
public class RedisRateLimiter {private final StringRedisTemplate redisTemplate;private final String luaScript;public RedisRateLimiter(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// Lua腳本實現滑動窗口限流this.luaScript = "local key = KEYS[1] " +"local capacity = tonumber(ARGV[1]) " +"local period = tonumber(ARGV[2]) " +"local now = tonumber(ARGV[3]) " +"local requested = tonumber(ARGV[4]) " +// 移除過期的時間戳"redis.call('zremrangebyscore', key, 0, now - period) " +// 獲取當前請求數"local currentCount = redis.call('zcard', key) " +// 如果請求數超過容量,返回0"if currentCount + requested > capacity then " +" return 0 " +"end " +// 添加新請求的時間戳"for i = 1, requested do " +" redis.call('zadd', key, now, now .. i) " +"end " +// 設置過期時間"redis.call('expire', key, period) " +// 返回剩余容量"return capacity - currentCount - requested";}/*** 嘗試獲取令牌* @param key 限流鍵* @param capacity 容量* @param period 時間窗口(秒)* @param requested 請求令牌數* @return 是否獲取成功*/public boolean tryAcquire(String key, int capacity, int period, int requested) {long now = System.currentTimeMillis() / 1000;RedisScript<Long> script = RedisScript.of(luaScript, Long.class);Long remainingTokens = redisTemplate.execute(script,Collections.singletonList(key),String.valueOf(capacity),String.valueOf(period),String.valueOf(now),String.valueOf(requested));return remainingTokens != null && remainingTokens >= 0;}
}
使用切面實現API限流:
@Aspect
@Component
public class RateLimitAspect {private final RedisRateLimiter rateLimiter;@Autowiredpublic RateLimitAspect(RedisRateLimiter rateLimiter) {this.rateLimiter = rateLimiter;}@Around("@annotation(rateLimit)")public Object rateLimit(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {// 獲取請求IPHttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();String ip = getClientIp(request);// 構建限流鍵String key = "rate_limit:" + rateLimit.key() + ":" + ip;// 嘗試獲取令牌boolean allowed = rateLimiter.tryAcquire(key, rateLimit.capacity(), rateLimit.period(), 1);if (allowed) {return joinPoint.proceed();} else {throw new TooManyRequestsException("請求過于頻繁,請稍后再試");}}private String getClientIp(HttpServletRequest request) {String ip = request.getHeader("X-Forwarded-For");if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getHeader("Proxy-Client-IP");}if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getHeader("WL-Proxy-Client-IP");}if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getRemoteAddr();}// 取第一個IP地址if (ip != null && ip.contains(",")) {ip = ip.split(",")[0].trim();}return ip;}
}@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {String key(); // 限流鍵int capacity() default 10; // 容量int period() default 60; // 時間窗口(秒)
}
在控制器中應用限流注解:
@PostMapping("/room")
@RateLimit(key = "create_room", capacity = 5, period = 3600) // 每小時限制創建5個直播間
public ResponseEntity<LiveRoom> createLiveRoom(@RequestBody LiveRoom liveRoom) {// 實現...
}@PostMapping("/room/{roomId}/view")
@RateLimit(key = "view_increment", capacity = 1, period = 5) // 每5秒限制一次
public ResponseEntity<Void> incrementViewCount(@PathVariable Long roomId) {// 實現...
}
6.4 監控與告警
整合Prometheus和Grafana實現監控:
@Configuration
public class MonitoringConfig {@BeanMeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {return registry -> registry.config().commonTags("application", "live-streaming-service");}
}
添加自定義指標:
@Service
public class LiveStreamMonitoringService {private final MeterRegistry meterRegistry;private final Counter liveStartCounter;private final Counter liveEndCounter;private final Gauge activeStreamGauge;private final Timer streamProcessingTimer;@Autowiredpublic LiveStreamMonitoringService(MeterRegistry meterRegistry) {this.meterRegistry = meterRegistry;// 創建指標this.liveStartCounter = Counter.builder("live.stream.start").description("直播開始計數").register(meterRegistry);this.liveEndCounter = Counter.builder("live.stream.end").description("直播結束計數").register(meterRegistry);this.activeStreamGauge = Gauge.builder("live.stream.active", this::getActiveStreamCount).description("當前活躍直播數").register(meterRegistry);this.streamProcessingTimer = Timer.builder("live.stream.processing").description("直播處理時間").register(meterRegistry);}// 獲取活躍直播數private long getActiveStreamCount() {// 從Redis獲取活躍直播間數量return redisTemplate.opsForSet().size("live:active_rooms");}// 記錄直播開始public void recordLiveStart() {liveStartCounter.increment();}// 記錄直播結束public void recordLiveEnd() {liveEndCounter.increment();}// 記錄處理時間public <T> T recordProcessingTime(Supplier<T> supplier) {return streamProcessingTimer.record(supplier);}// 記錄錯誤public void recordError(String errorType) {meterRegistry.counter("live.stream.errors", "type", errorType).increment();}
}
七、安全性考慮
7.1 推流鑒權
前面已經實現了基于時間戳和簽名的推流鑒權。為了增強安全性,可以添加IP白名單:
@Service
public class StreamSecurityService {@Value("${live.security.ip-whitelist-enabled}")private boolean ipWhitelistEnabled;@Value("#{'${live.security.ip-whitelist:}'.split(',')}")private List<String> ipWhitelist;/*** 驗證IP是否在白名單中*/public boolean isIpAllowed(String ip) {if (!ipWhitelistEnabled) {return true;}if (ipWhitelist.isEmpty()) {return true;}return ipWhitelist.contains(ip) || ipWhitelist.contains("0.0.0.0");}/*** 生成帶有過期時間的推流URL*/public String generateSecureStreamUrl(String baseUrl, String streamKey, long expireSeconds) {long expireTimestamp = System.currentTimeMillis() / 1000 + expireSeconds;String authString = streamKey + "-" + expireTimestamp + "-" + authKey;String authToken = DigestUtils.md5DigestAsHex(authString.getBytes());return baseUrl + "?auth_key=" + authToken + "&expire=" + expireTimestamp;}
}
7.2 播放鑒權
同樣可以為播放URL添加鑒權:
@Service
public class PlayAuthService {@Value("${live.play.auth-enabled}")private boolean authEnabled;@Value("${live.play.auth-key}")private String authKey;@Value("${live.play.auth-expire}")private long authExpire;/*** 生成帶鑒權的播放URL*/public String generateAuthPlayUrl(String baseUrl, String streamKey, Long userId) {if (!authEnabled) {return baseUrl;}long expireTimestamp = System.currentTimeMillis() / 1000 + authExpire;String authString = streamKey + "-" + userId + "-" + expireTimestamp + "-" + authKey;String authToken = DigestUtils.md5DigestAsHex(authString.getBytes());return baseUrl + "?auth_key=" + authToken + "&user_id=" + userId + "&expire=" + expireTimestamp;}/*** 驗證播放URL*/public boolean validatePlayUrl(String streamKey, String authToken, String userId, String expireStr) {if (!authEnabled) {return true;}try {long expireTimestamp = Long.parseLong(expireStr);long currentTime = System.currentTimeMillis() / 1000;// 檢查是否過期if (currentTime > expireTimestamp) {return false;}// 驗證tokenString authString = streamKey + "-" + userId + "-" + expireStr + "-" + authKey;String calculatedToken = DigestUtils.md5DigestAsHex(authString.getBytes());return calculatedToken.equals(authToken);} catch (Exception e) {return false;}}
}
7.3 內容安全
對于用戶生成的內容,需要進行審核和過濾:
@Service
public class ContentSecurityService {@Value("${live.content.sensitive-words-file}")private String sensitiveWordsFile;private Set<String> sensitiveWords = new HashSet<>();@PostConstructpublic void init() {// 加載敏感詞庫try {File file = new File(sensitiveWordsFile);if (file.exists()) {List<String> lines = Files.readAllLines(file.toPath());sensitiveWords.addAll(lines);}} catch (IOException e) {// 如果加載失敗,使用默認敏感詞sensitiveWords.add("敏感詞1");sensitiveWords.add("敏感詞2");}}/*** 過濾敏感詞*/public String filterContent(String content) {if (content == null || content.isEmpty()) {return content;}String filteredContent = content;for (String word : sensitiveWords) {filteredContent = filteredContent.replaceAll(word, "***");}return filteredContent;}/*** 檢查內容是否包含敏感詞*/public boolean containsSensitiveWords(String content) {if (content == null || content.isEmpty()) {return false;}for (String word : sensitiveWords) {if (content.contains(word)) {return true;}}return false;}
}
在WebSocket消息處理中使用:
@MessageMapping("/chat/{roomId}")
public void sendMessage(@DestinationVariable Long roomId, ChatMessage message) {// 過濾敏感內容String filteredContent = contentSecurityService.filterContent(message.getContent());message.setContent(filteredContent);// 檢查是否全是敏感詞if (filteredContent.matches("\*+")) {// 記錄違規日志log.warn("用戶發送違規內容: userId={}, content={}", message.getUserId(), message.getContent());return;}// 正常發送消息messagingTemplate.convertAndSend("/topic/chat/" + roomId, message);
}
八、演示說明
為了幫助大家快速理解和測試直播系統,下面提供完整的演示步驟:
8.1 環境準備
首先,需要準備以下環境:
- Java 21+ :運行SpringBoot應用
- MySQL 5.7+ :存儲直播信息
- Redis:緩存和消息通信
- SRS:流媒體服務器
- FFmpeg:視頻處理工具(錄制直播回放)
- OBS Studio:用于測試推流
8.2 安裝SRS
使用Docker可以快速部署SRS服務器:
# 拉取SRS鏡像
docker pull ossrs/srs:4# 運行SRS容器
docker run --name srs -d --restart=always \-p 1935:1935 \-p 1985:1985 \-p 8080:8080 \-v $(pwd)/conf:/usr/local/srs/conf \ossrs/srs:4 objs/srs -c conf/srs.conf
將前面提供的SRS配置文件保存為conf/srs.conf
。
8.3 直播功能演示流程
8.3.1 創建直播間
通過API創建直播間
curl -X POST http://localhost:8080/api/live/room \-H "Content-Type: application/json" \-d '{"title": "測試直播間","userId": 1,"coverUrl": "https://example.com/cover.jpg"}'
響應示例
{"id": 1,"title": "測試直播間","userId": 1,"coverUrl": "https://example.com/cover.jpg","streamKey": "8f7d6e5c4b3a2f1e0d9c8b7a","streamUrl": "rtmp://localhost:1935/live/8f7d6e5c4b3a2f1e0d9c8b7a","hlsUrl": "http://localhost:8080/hls/8f7d6e5c4b3a2f1e0d9c8b7a.m3u8","flvUrl": "http://localhost:8080/live/8f7d6e5c4b3a2f1e0d9c8b7a.flv","status": 0,"viewCount": 0,"likeCount": 0
}
記錄下streamKey
和streamUrl
,這將用于推流設置。
8.3.2 推流測試
-
打開OBS Studio,設置推流參數:
- 設置 → 流
- 服務:自定義
- 服務器:rtmp://192.168.195.100:1935/live/2feabf98b5ee6bb0c3dbc6cd423084a3?auth_key=9875b710e26914eff9f9aa1cc1df0093&expire=1749220798
- 流密鑰:剛才獲取的streamKey(例如:8f7d6e5c4b3a2f1e0d9c8b7a)
-
添加視頻源(例如攝像頭或屏幕捕獲)
-
點擊"開始推流"按鈕開始直播
系統會自動檢測到推流開始,并通過SRS回調更新直播間狀態為"直播中"。
8.3.3 播放測試
- 打開前端頁面:http://localhost:8080/play.html?roomId=1
- 頁面將自動加載直播流并開始播放
- 測試播放:
-
- HLS播放:http://192.168.195.100:8080/hls/live/2feabf98b5ee6bb0c3dbc6cd423084a3/2feabf98b5ee6bb0c3dbc6cd423084a3.m3u8
8.3.4 直播互動測試
- 多次刷新頁面,查看觀看數是否增加
8.3.5 直播回放測試
開始錄制直播
curl -X POST http://localhost:8080/api/live/room/1/record/start
等待一段時間后,停止錄制
# 使用返回的recordingId
curl -X POST http://localhost:8080/api/live/record/1/stop
查看回放列表
curl http://localhost:8080/api/live/room/1/recordings
使用返回的fileUrl播放回放視頻
8.3.6 結束直播
- 在OBS Studio中點擊"停止推流"按鈕
- 系統會自動檢測到推流結束,并更新直播間狀態為"直播結束"
- 也可以通過API手動結束直播
curl -X POST http://localhost:8080/api/live/room/1/end