#前言 大學的學習時光臨近尾聲,感嘆時光匆匆,三年一晃而過。同窗們都忙着找工做,我也在這裏拋一份簡歷吧,歡迎各位老闆和獵手誠邀。咱們進入正題。直播行業是當前火熱的行業,誰都想從中分得一杯羹,直播養活了一大批人,一個平臺主播粗略估計就有幾千號人,可是實時在線觀看量有的竟然到了驚人的百萬級別,特別是遊戲主播,可想而知,直播間是一個磁鐵式的廣告傳播媒介,也難怪這麼多巨頭公司都搶着作直播。我不太清楚直播行業技術有多深,畢竟本身沒作過,可是我們能夠本身實現一個知足幾百號人同時觀看的直播間呀。css
手機端效果html
這個場景很熟悉吧~~ 經過obs推流軟件來推流。前端
![]vue
戶外直播,經過yasea手機端推流軟件,使用手機攝像頭推流。 java
電腦端效果mysql
播放香港衛視react
直播畫面linux
項目分爲三個部分:webpack
客戶端
直播間視頻拉流、播放和聊天室,炫酷的彈幕以及直播間信息ios
服務端
處理直播間、用戶的數據業務,聊天室消息的處理
服務器部署
視頻服務器和web服務器
移動客戶端
電腦端客戶端
服務端
服務器部署
###直播間主要涉及到兩個主要功能:第一是視頻直播、第二是聊天室。這兩個都是很是講究實時性。
說到直播咱們先了解下幾個經常使用的直播流協議,看了挺多的流媒體協議文章博客,但都是很是粗略,這裏有個比較詳細的 流媒體協議介紹,若是想詳細瞭解協議內容估計去要看看專業書籍了。這裏咱們用到的只是rtmp和hls,實踐後發現:rtmp只可以在電腦端播放,hls只可以在手機端播放。並且rtmp是至關快的儘管沒有rtsp那麼快,延遲只有幾秒,我測試的就差很少2-5秒,可是hls大概有10幾秒。因此若是你體驗過demo,就會發現手機延遲比較多。
直播的流程: 直播分爲推流和拉流兩個過程,那麼流推向哪裏,拉流又從哪裏拉取呢?那固然須要視頻服務器啦,千萬不要覺得視頻直播服務器很複雜,其實在nginx服務器中一切都變得簡單。後面我會講解如何部署Nginx服務器並配置視頻模塊(nginx-rtmp-module).
首先主播經過推流軟件,好比OBS Studio推流軟件,這個是比較專業級別的,不少直播平臺的推薦主播使用這個軟件來推送視頻流,這裏我也推薦一個開源的安卓端推流工具Yasea,下載地址,文件很小,可是很強大。 直播內容推送到服務器後,就能夠在服務器端使用視頻編碼工具進行轉碼了,能夠轉換成各類高清,標清,超清的分辨率視頻,也就是爲何咱們在各個視頻網站均可以選擇視頻清晰度。這裏咱們沒有轉碼,只是經過前端視頻播放器(video.js)來拉取視頻.這樣整個視頻推流拉流過程就完成了。
直播間裏面的聊天室跟咱們的羣聊天差很少,只不過它變成了web端,web端的即時通訊方案有不少,這裏咱們選擇websocket協議來與服務端通訊,websocket是基於http之上的傳輸協議,客戶端向服務端發送http請求,並攜帶Upgrade:websocket升級頭信息表示轉換websocket協議,經過與服務端握手成功後就能夠創建tcp通道,由此來傳遞消息,它與http最大的差異就是,服務端能夠主動向客戶端發送消息。
既然創建了消息通道,那咱們就須要往通道里發消息,可是總得須要一個東西來管控消息該發給誰吧,要否則全亂套了,因此咱們選擇了消息中間件RabbitMQ.使用它來負責消息的路由去向。
#移動客戶端實操 源碼地址 ##工程結構
|—— build 構建服務和webpack配置 |—— congfig 項目不一樣環境的配置 |—— dist build生成生產目錄 |—— static 靜態資源 |—— package.json 項目配置文件 |—— src 開發源代碼目錄 |—— api 經過axios導出的api目錄 |—— components 頁面和組件 |—— public 公有組件 |—— vuex 全局狀態 |—— main.js 應用啓動配置點
##功能模塊
拉取服務器的直播視頻流(hls)並播放直播畫面
與服務端建立websocket鏈接,收發聊天室消息
經過websocket獲取消息併發送到彈幕
經過websocket實時更新在線用戶
結合服務端獲取訪問歷史記錄
問題反饋模塊
##效果圖 ##項目說明 請參考源碼 #服務端實操 源碼地址
因爲我的比較喜歡接觸新的東西,因此後端選擇了springboot,前端選擇了Vue.js年輕人嘛總得跟上潮流。SpringBoot實踐事後發現真的太省心了,不用再理會各類配置文件,全自動化裝配。 這裏貼一下pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hushangjie</groupId> <artifactId>rtmp-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rtmp-demo</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-actuator-docs</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!--<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> </dependency>--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <!--非嚴格模式解析HTML5--> <dependency> <groupId>net.sourceforge.nekohtml</groupId> <artifactId>nekohtml</artifactId> <version>1.9.22</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <!-- 打包成war時能夠移除嵌入式tomcat插件 --> <!--<exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions>--> </dependency> <!--<dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency>--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.webjars</groupId> <artifactId>vue</artifactId> <version>2.1.3</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.2</version> </dependency> <!-- RabbitMQ相關配置--> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>2.0.8.RELEASE</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-net</artifactId> <version>2.0.8.RELEASE</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <fork>true</fork> </configuration> </plugin> </plugins> </build> </project>
application.properties文件
spring.datasource.url=jdbc:mysql://host:3306/database?characterEncoding=utf8&useSSL=false spring.datasource.username=username spring.datasource.password=password spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.thymeleaf.mode=LEGACYHTML5 server.port=8085 # REDIS (RedisProperties) # Redis數據庫索引(默認爲0) spring.redis.database=0 # Redis服務器地址 spring.redis.host=127.0.0.1 # Redis服務器鏈接端口 spring.redis.port=6379 # Redis服務器鏈接密碼(默認爲空) spring.redis.password= # 鏈接池最大鏈接數(使用負值表示沒有限制) spring.redis.pool.max-active=8 # 鏈接池最大阻塞等待時間(使用負值表示沒有限制) spring.redis.pool.max-wait=-1 # 鏈接池中的最大空閒鏈接 spring.redis.pool.max-idle=8 # 鏈接池中的最小空閒鏈接 spring.redis.pool.min-idle=0 # 鏈接超時時間(毫秒) spring.redis.timeout=0
##websocket配置
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { //攔截器注入service失敗解決辦法 @Bean public MyChannelInterceptor myChannelInterceptor(){ return new MyChannelInterceptor(); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { //添加訪問域名限制能夠防止跨域socket鏈接 //setAllowedOrigins("http://localhost:8085") registry.addEndpoint("/live").setAllowedOrigins("*").addInterceptors(new HandShkeInceptor()).withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { /*.enableSimpleBroker("/topic","/queue");*/ //假如須要第三方消息代理,好比rabitMQ,activeMq,在這裏配置 registry.setApplicationDestinationPrefixes("/demo") .enableStompBrokerRelay("/topic","/queue") .setRelayHost("127.0.0.1") .setRelayPort(61613) .setClientLogin("guest") .setClientPasscode("guest") .setSystemLogin("guest") .setSystemPasscode("guest") .setSystemHeartbeatSendInterval(5000) .setSystemHeartbeatReceiveInterval(4000); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { ChannelRegistration channelRegistration = registration.setInterceptors(myChannelInterceptor()); super.configureClientInboundChannel(registration); } @Override public void configureClientOutboundChannel(ChannelRegistration registration) { super.configureClientOutboundChannel(registration); } }
配置類繼承了消息代理配置類,意味着咱們將使用消息代理rabbitmq.使用registerStompEndpoints方法註冊一個websocket終端鏈接。這裏咱們須要瞭解兩個東西,第一個是stomp和sockjs,sockjs是啥呢,其實它是對於websocket的封裝,由於若是單純使用websocket的話效率會很是低,咱們須要的編碼量也會增多,並且若是瀏覽器不支持websocket,sockjs會自動降級爲輪詢策略,並模擬websocket,保證客戶端和服務端能夠通訊。 stomp有是什麼看這裏
stomp是一種簡單(流)文本定向消息協議,它提供了一個可互操做的鏈接格式,容許STOMP客戶端與任意STOMP消息代理(Broker)進行交互,也就是咱們上面的RabbbitMQ,它就是一個消息代理。 咱們能夠經過configureMessageBroker來配置消息代理,須要注意的是咱們將要部署的服務器也應該要有RabbitMQ,由於它是一箇中間件,安裝很是容易,這裏就不說明了。這裏咱們配置了「/topic,/queue」兩個代理轉播策略,就是說客戶端訂閱了前綴爲「/topic,/queue」頻道都會經過消息代理(RabbitMQ)來轉發。跟spring沒啥關係啦,徹底解耦。
##websocke如何保證安全
一開始接觸 stomp的時候一直有個問題困擾我,客戶端只要與服務端經過websocket創建了鏈接,那麼他就能夠訂閱任何內容,意味着能夠接受任何消息,這樣豈不是亂了套啦,因而我翻閱了大量博客文章,不少都是官方的例子並無解決實際問題。通過琢磨,其實websocket是要考慮安全性的。具體在如下幾個方面
對於跨域問題,咱們能夠經過setAllowedOrigins方法來設置可鏈接的域名,防止跨站鏈接。
對於站內用戶是否容許鏈接咱們能夠以下配置
public class HandShkeInceptor extends HttpSessionHandshakeInterceptor { private static final Set<UserEntity> ONLINE_USERS = new HashSet<>(); @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { System.out.println("握手前"+request.getURI()); //http協議轉換websoket協議進行前,一般這個攔截器能夠用來判斷用戶合法性等 //鑑別用戶 if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request; //這句話很重要若是getSession(true)會致使移動端沒法握手成功 //request.getSession(true):若存在會話則返回該會話,不然新建一個會話。 //request.getSession(false):若存在會話則返回該會話,不然返回NULL //HttpSession session = servletRequest.getServletRequest().getSession(false); HttpSession session = servletRequest.getServletRequest().getSession(); UserEntity user = (UserEntity) session.getAttribute("user"); if (user != null) { //這裏只使用簡單的session來存儲用戶,若是使用了springsecurity能夠直接使用principal return super.beforeHandshake(request, response, wsHandler, attributes); }else { System.out.println("用戶未登陸,握手失敗!"); return false; } } return false; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) { //握手成功後,一般用來註冊用戶信息 System.out.println("握手後"); super.afterHandshake(request, response, wsHandler, ex); } }
HttpSessionHandshakeInterceptor 這個攔截器用來管理握手和握手後的事情,咱們能夠經過請求信息,好比token、或者session判用戶是否能夠鏈接,這樣就可以防範非法用戶。
那如何限制用戶只能訂閱指定內容呢?咱們接着往下看
public class MyChannelInterceptor extends ChannelInterceptorAdapter { @Autowired private StatDao statDao; @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Override public boolean preReceive(MessageChannel channel) { System.out.println("preReceive"); return super.preReceive(channel); } @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); //檢測用戶訂閱內容(防止用戶訂閱不合法頻道) if (StompCommand.SUBSCRIBE.equals(command)) { //從數據庫獲取用戶訂閱頻道進行對比(這裏爲了演示直接使用set集合代替) Set<String> subedChannelInDB = new HashSet<>(); subedChannelInDB.add("/topic/group"); subedChannelInDB.add("/topic/online_user"); if (subedChannelInDB.contains(accessor.getDestination())) { //該用戶訂閱的頻道合法 return super.preSend(message, channel); } else { //該用戶訂閱的頻道不合法直接返回null前端用戶就接受不到該頻道信息。 return null; } } else { return super.preSend(message, channel); } } @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { //System.out.println("afterSendCompletion"); //檢測用戶是否鏈接成功,蒐集在線的用戶信息若是數據量過大咱們能夠選擇使用緩存數據庫好比redis, //這裏因爲須要頻繁的刪除和增長集合內容,咱們選擇set集合來存儲在線用戶 StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); if (StompCommand.SUBSCRIBE.equals(command)){ Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.add(map.get("user")); UserEntity user = map.get("user"); if(user != null){ statDao.pushOnlineUser(user); Guest guest = new Guest(); guest.setUserEntity(user); guest.setAccessTime(Calendar.getInstance().getTimeInMillis()); statDao.pushGuestHistory(guest); //經過websocket實時返回在線人數 this.simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } //若是用戶斷開鏈接,刪除用戶信息 if (StompCommand.DISCONNECT.equals(command)){ Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.remove(map.get("user")); UserEntity user = map.get("user"); if (user != null){ statDao.popOnlineUser(user); simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } super.afterSendCompletion(message, channel, sent, ex); } }
在stomp裏面,Channel信道就是消息傳送的通道,客戶端與服務端創建了鏈接就至關於創建了通道,之後的信息就是經過這個通道來傳輸。全部的消息都有消息頭,被封裝在了spring 的messag接口中,好比創建鏈接時候消息頭就含有CONNECT,固然還有一些其餘的信息。客戶端訂閱的時候也有訂閱頭信息SUBSCRIBE,那麼我是否是能夠在這個攔截器ChannelInterceptorAdapter 中攔截每一個人的訂閱信息,而後與數據庫的信息做比對,最後決定這個用戶是否能夠訂閱這個頻道的信息呢,對的,這是個人想法,按照這樣的思路,作單聊不是迎刃而解了嗎。 那客戶端經過websocket發送的消息如何到達訂閱者手中呢,按照rabbitmq的規則,訂閱者屬於消費者,發送消息的一方屬於生產者,生產者經過websocket把消息發送到服務端,服務端經過轉發給消息代理(rabbitmq),消息代理負責存儲消息,管理髮送規則,推送消息給訂閱者,看下面的代碼
@MessageMapping(value = "/chat") @SendTo("/topic/group") public MsgEntity testWst(String message , @Header(value = "simpSessionAttributes") Map<String,Object> session){ UserEntity user = (UserEntity) session.get("user"); String username = user.getRandomName(); MsgEntity msg = new MsgEntity(); msg.setCreator(username); msg.setsTime(Calendar.getInstance()); msg.setMsgBody(message); return msg; }
@MessageMapping看起來跟springmvc方法特別像,它便可以用在類級別上也能夠用在方法級別上 當發送者往‘/chat’發送消息後,服務端接受到消息,再發送給「/topic/group」的訂閱者,@SendTo就是發送給誰,這裏須要注意的有,若是咱們沒有配置消息代理,只使用了enableSimpleBroker("/topic","/queue")簡單消息代理,那麼就是直接發送到消息訂閱者,若是配置了消息代理,那還要經過消息代理,由它來轉發。
若是咱們想在服務端隨時發送消息,而不是在客戶端發送(這樣的場景很常見,好比發送全局通知),可使用SimpMessagingTemplate類,經過注入該bean,在合適的業務場景中發送消息。
直播間常常須要統計數據,好比實時在線人數,訪問量,貢獻排行榜,訂閱量。我選擇的方案是使用redis來計數,儘管這個demo可能不會太多人訪問,可是個人目的是學習如何使用redis 先看springboot中redis的配置
@Configuration public class RedisConfig extends CachingConfigurerSupport{ /** * 生成key的策略 * * @return */ @Bean public KeyGenerator keyGenerator() { return new KeyGenerator() { @Override public Object generate(Object target, Method method, Object... params) { StringBuilder sb = new StringBuilder(); sb.append(target.getClass().getName()); sb.append(method.getName()); for (Object obj : params) { sb.append(obj.toString()); } return sb.toString(); } }; } /** * 管理緩存 * * @param redisTemplate * @return */ @SuppressWarnings("rawtypes") @Bean public CacheManager cacheManager(RedisTemplate redisTemplate) { RedisCacheManager rcm = new RedisCacheManager(redisTemplate); //設置緩存過時時間 // rcm.setDefaultExpiration(60);//秒 //設置value的過時時間 Map<String,Long> map=new HashMap(); map.put("test",60L); rcm.setExpires(map); return rcm; } /** * RedisTemplate配置 * @param factory * @return */ @Bean public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template = new StringRedisTemplate(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setValueSerializer(jackson2JsonRedisSerializer);//若是key是String 須要配置一下StringSerializer,否則key會亂碼 /XX/XX template.afterPropertiesSet(); //template.setStringSerializer(); return template; } }
redis數據統計Dao的實現
@Repository public class StatDao { @Autowired RedisTemplate redisTemplate; public void pushOnlineUser(UserEntity userEntity){ redisTemplate.opsForSet().add("OnlineUser",userEntity); } public void popOnlineUser(UserEntity userEntity){ redisTemplate.opsForSet().remove("OnlineUser" ,userEntity); } public Set getAllUserOnline(){ return redisTemplate.opsForSet().members("OnlineUser"); } public void pushGuestHistory(Guest guest){ //最多存儲指定個數的訪客 if (redisTemplate.opsForList().size("Guest") == 200l){ redisTemplate.opsForList().rightPop("Guest"); } redisTemplate.opsForList().leftPush("Guest",guest); } public List getGuestHistory(){ return redisTemplate.opsForList().range("Guest",0,-1); } }
Dao層很是簡單,由於咱們只須要統計在線人數和訪客。可是在線人數是實時更新的,既然咱們使用了websocket實時數據更新就很是容易了,前面咱們講過,經過信道攔截器能夠攔截鏈接,訂閱,斷開鏈接等等事件信息,因此咱們就能夠當用戶鏈接時存儲在線用戶,經過websocket返回在線用戶信息。
public class MyChannelInterceptor extends ChannelInterceptorAdapter { @Autowired private StatDao statDao; @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Override public boolean preReceive(MessageChannel channel) { System.out.println("preReceive"); return super.preReceive(channel); } @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); //檢測用戶訂閱內容(防止用戶訂閱不合法頻道) if (StompCommand.SUBSCRIBE.equals(command)) { //從數據庫獲取用戶訂閱頻道進行對比(這裏爲了演示直接使用set集合代替) Set<String> subedChannelInDB = new HashSet<>(); subedChannelInDB.add("/topic/group"); subedChannelInDB.add("/topic/online_user"); if (subedChannelInDB.contains(accessor.getDestination())) { //該用戶訂閱的頻道合法 return super.preSend(message, channel); } else { //該用戶訂閱的頻道不合法直接返回null前端用戶就接受不到該頻道信息。 return null; } } else { return super.preSend(message, channel); } } @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { //System.out.println("afterSendCompletion"); //檢測用戶是否鏈接成功,蒐集在線的用戶信息若是數據量過大咱們能夠選擇使用緩存數據庫好比redis, //這裏因爲須要頻繁的刪除和增長集合內容,咱們選擇set集合來存儲在線用戶 StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); if (StompCommand.SUBSCRIBE.equals(command)){ Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.add(map.get("user")); UserEntity user = map.get("user"); if(user != null){ statDao.pushOnlineUser(user); Guest guest = new Guest(); guest.setUserEntity(user); guest.setAccessTime(Calendar.getInstance().getTimeInMillis()); statDao.pushGuestHistory(guest); //經過websocket實時返回在線人數 this.simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } //若是用戶斷開鏈接,刪除用戶信息 if (StompCommand.DISCONNECT.equals(command)){ Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.remove(map.get("user")); UserEntity user = map.get("user"); if (user != null){ statDao.popOnlineUser(user); simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } super.afterSendCompletion(message, channel, sent, ex); } }
因爲這個項目有移動端和電腦端,因此須要根據請求代理UserAgent來判斷客戶端屬於哪種類型。這個工具類在源碼上有。我就不貼了。 #服務器部署 說了這麼多即時通訊,卻沒發現視頻直播。不要着急咱們立刻進入視頻環節。文章開頭就說明了幾種媒體流協議,這裏不講解詳細的協議流程,只須要知道,咱們是經過推流軟件採集視頻信息,如何採集也不是咱們關注的。採集到信息後經過軟件來推送到指定的服務器,以下圖
obs推流設置 yasea手機端推流設置
紅色部分是服務器開放的獲取流接口。 ##Nginx-rtmp-module配置 視頻服務器有不少,也支持不少媒體流協議。這裏咱們選擇nginx-rtmp-module來作視頻服務,接下來咱們須要在linux下安裝nginx,並安裝rtmp模塊。本人也是linux初學者,一步步摸索着把服務器搭建好,據說tomcat和nginx很配哦,因此做爲免費開源的固然首選這兩個。 接下來須要在linux安裝一下軟件和服務。
安裝步驟我就不說了,你們搜索一下啦,這裏貼一下nginx.conf文件配置
rtmp { server { listen 1935; chunk_size 4096; application video { play /yjdata/www/www/video; } application live { live on; hls on; hls_path /yjdata/www/www/live/hls/; hls_fragment 5s; } } }
上面代碼是配置rtmp模塊, play /yjdata/www/www/video 指的是配置點播模塊,能夠直接播放/yjdata/www/www/video路徑下的視頻。hls_path制定hls分塊存放路徑,由於hls是經過獲取到推送的視頻流信息,分塊存儲在服務器。因此它的延時比rtmp要更高。
server { listen 80; server_name localhost; #charset koi8-r; index index.jsp index.html; root /yjdata/www/www; #access_log logs/host.access.log main; location / { proxy_pass http://127.0.0.1:8080; } location ~ .*\.(gif|jpg|jpeg|png|bmp|swf|js|css|docx|pdf|doc|ppt|html|properties)$ { expires 30d; root /yjdata/www/www/static/; } location /hls { types { application/vnd.apple.mpegurl m3u8; #application/x-mpegURL; video/mp2t ts; } alias /yjdata/www/www/live/hls/; expires -1; add_header Cache-Control no-cache; } location /stat { rtmp_stat all; rtmp_stat_stylesheet stat.xsl; } location /stat.xsl { root /soft/nginx/nginx-rtmp-module/; }
上面配置了location 指向/hls,別名是/yjdata/www/www/live/hls/,因此能夠在前端直接經過域名+/hls/+文件名.m3u8獲取直播視頻。 關於nginx的配置還有不少,我也在學習當中。總而言之nginx很是強大。 #總結 經過從前端=>後臺=>服務器,整個流程走下來仍是須要花不少心思。可是收穫也是不少。本人將從大學出來,初出茅廬,文章錯誤之處,盡請指正。本人郵箱979783618@qq.com