Spray.io搭建Rest — 支持WebSocket

Spray.io嘗試 javascript


Spray.io搭建Rest — 支持WebSocket

工程地址:http://git.oschina.net/for-1988/Simples.git
    最近在看spray就是想用它來作WebSocket的後臺,今天就研究了一下spray怎麼支持WebSocket。參考了一些老外的代碼,WebSocket的協議的實現用了 Java-WebSocket這個開源項目。

添加JAR依賴

    在build.sbt文件中添加Java-WebSocket的依賴。我這裏這個包經過sbt下載不了,因此就直接放在工程中引入了。 html

"org.java-websocket"  %   "Java-WebSocket" % "1.3.1"

WebSocket服務

    咱們藉助Java-WebSocket來實現WebSocket的相關消息事件的處理,它爲咱們作了不少的封裝,只需傳入java.net.InetSocketAddress對象便可。而後把不一樣的消息封裝成對象,轉發給對應的actor。這裏都是運用了Akka。 java

object WSocketServer {

  sealed trait WSocketServerMessage

  case class Message(ws: WebSocket, msg: String)
    extends WSocketServerMessage

  case class BufferMessage(ws: WebSocket, buffer: ByteBuffer)
    extends WSocketServerMessage

  case class Open(ws: WebSocket, hs: ClientHandshake)
    extends WSocketServerMessage

  case class Close(ws: WebSocket, code: Int, reason: String, external: Boolean)
    extends WSocketServerMessage

  case class Error(ws: WebSocket, ex: Exception)
    extends WSocketServerMessage

}

class WSocketServer(val port: Int)
  extends WebSocketServer(new InetSocketAddress(port)) {
  private val reactors = Map[String, ActorRef]()

  final def forResource(descriptor: String, reactor: Option[ActorRef]) {
    reactor match {
      case Some(actor) => reactors += ((descriptor, actor))
      case None => reactors -= descriptor
    }
  }

  final override def onMessage(ws: WebSocket, msg: String) {
    if (null != ws) {
      reactors.get(ws.getResourceDescriptor) match {
        case Some(actor) => actor ! WSocketServer.Message(ws, msg)
        case None => ws.close(CloseFrame.REFUSE)
      }
    }
  }

  final override def onMessage(ws: WebSocket, buffer: ByteBuffer) {
    if (null != ws) {
      reactors.get(ws.getResourceDescriptor) match {
        case Some(actor) => actor ! WSocketServer.BufferMessage(ws, buffer)
        case None => ws.close(CloseFrame.REFUSE)
      }
    }
  }

  final override def onOpen(ws: WebSocket, hs: ClientHandshake) {
    if (null != ws) {
      val actor = reactors.get(ws.getResourceDescriptor)
      actor.get ! WSocketServer.Open(ws, hs)
    }
  }

  final override def onClose(ws: WebSocket, code: Int, reason: String, external: Boolean) {
    if (null != ws) {
      reactors.get(ws.getResourceDescriptor) match {
        case Some(actor) => actor ! WSocketServer.Close(ws, code, reason, external)
        case None => ws.close(CloseFrame.REFUSE)
      }
    }
  }

  final override def onError(ws: WebSocket, ex: Exception) {
    if (null != ws) {
      reactors.get(ws.getResourceDescriptor) match {
        case Some(actor) => actor ! WSocketServer.Error(ws, ex)
        case None => ws.close(CloseFrame.REFUSE)
      }
    }
  }
}

咱們在這個對象中,經過Map來存儲了每個請求地址對應的處理Actor。 react

WebSocket的路由

1.在Routes中添加註冊每個WebSocket請求地址對應的處理Actor git

trait Routes extends RouteConcatenation with StaticRoute with AbstractAkkaSystem {

  val httpServer = system.actorOf(Props(classOf[HttpServer], allRoutes))

  val socketServer = system.actorOf(Props[SocketServer])


  lazy val index = system.actorOf(Props[IndexActor], "index")

  lazy val allRoutes = logRequest(showReq _) {
    new IndexService(index).route ~ staticRoute
  }
  //註冊WebSocket處理Actor
  implicit val wsocketServer: WSocketServer
  wsocketServer.forResource("/ws", Some(index))


  private def showReq(req: HttpRequest) = LogEntry(req.uri, InfoLevel)
}
2.啓動WebSocket服務
object Server extends App with Routes {

  implicit lazy val system = ActorSystem("server-system")

  implicit lazy val wsocketServer = new WSocketServer(Configuration.portWs)
  //啓動WebSocket服務
  wsocketServer.start
  sys.addShutdownHook({
    system.shutdown
    wsocketServer.stop
  })

  IO(Http) ! Http.Bind(httpServer, Configuration.host, port = Configuration.portHttp)
  // IO(Tcp) ! Tcp.Bind(socketServer, new InetSocketAddress(Configuration.host, Configuration.portTcp))
}

WebSocket處理Actor

咱們在以前的IndexActor上加一下對WebSocket消息的處理,這裏作了簡單的接受消息,而後直接回復給客戶端 github

class IndexActor extends Actor with ActorLogging {

  import WSocketServer._

  override def receive = {
    case Open(ws, hs) =>
      ws.send("Hello")
      log.debug("registered monitor for url {}", ws.getResourceDescriptor)
    case Message(ws, msg) =>
      log.debug("url {} received msg '{}'", ws.getResourceDescriptor, msg)
      ws.send("【echo】"+msg)
  }
}

客戶端調用

    咱們修改了以前的index模板,加入了WebSocket web

@(name:String)
<html>
<head>
    <meta charset="utf-8"/>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
</head>
<body>
<h1>Hello @name !!!</h1>
<input id="message"/> <button onclick="javascript:send();">發送</button>
<div id="msg"></div>
</body>

<script>

    var wsurl = "ws://localhost:6696/ws";
    var ws = null;
    if ('WebSocket' in window) {
        ws = new WebSocket(wsurl);
    } else if ('MozWebSocket' in window) {
        ws = new MozWebSocket(wsurl + "?uid=" + uid);
    } else {
        console.error("初始化 Main websocket 對象失敗!");
    }

    ws.onopen = function (event) {
        var msg = "Hi";
        ws.send(msg)
    }
    ws.onmessage = function (event) {
        console.info(event.data);
        var data = new Date();
        document.getElementById("msg").innerHTML += "<h5>"+data.toTimeString()+" : "+event.data+"</h5>";
    }

    function send(){
        var msg = document.getElementById("message").value;
        ws.send(msg);
    }
</script>
</html>
相關文章
相關標籤/搜索