package com.xiaoniu.im.rest;
import com.xiaoniu.im.utils.Runner;
import com.xiaoniu.im.utils.Utils;
import io.netty.util.internal.StringUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.LocalMap;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.HTreeMap;
import org.mapdb.Serializer;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/** * http 服務處理除聊天之外的業務 * Created by sweet on 2017/9/26. */
public class HttpVerticle extends AbstractVerticle {
private static final Logger log = LoggerFactory.getLogger(HttpVerticle.class);
public static void main(String[] args) {
Runner.runExample(HttpVerticle.class);
}
private static final String filePath = "C:\\xiaoniu_doc\\vertx\\sweet-im\\";
private static final Integer port = 8080;
private DB db;
private HTreeMap<String, String> userMap; // 保存註冊用戶信息
private LocalMap<String, Object> userMapCache; // 保存註冊用戶信息(內存共享版)
private HTreeMap<String, String> userNameAndIdMap; // 用戶id - 用戶名 方便快速查詢
private LocalMap<String, Object> userNameAndIdMapCache; // 用戶id - 用戶名 方便快速查詢 (內存版)
private HTreeMap<String, String> friendMap; // 保存好友關係
private LocalMap<String, Object> friendMapCache; // 保存好友關係(內存共享版)
private HTreeMap<String, String> onlineMap; // 當前登陸用戶
private LocalMap<String, Object> onlineMapCache; // 當前登陸用戶 (內存共享版)
private LocalMap<String, Object> messageMapCache; // 保存用戶私密聊天記錄 (內存共享版)
private LocalMap<String, Object> messageGroupMapCache; // 保存羣組聊天記錄 (內存共享版)
private HTreeMap<String, String> groupMap; // 羣組關係
private LocalMap<String, Object> groupMapCache; // 羣組關係 (內存共享版)
private EventBus eventBus;
@Override
public void start(Future<Void> future) throws Exception {
eventBus = vertx.eventBus();
initDB().setHandler(init -> {
if (init.succeeded() && init.result()) {
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
// 用戶註冊
router.post("/register").consumes("application/json").handler(this::register);
// 用戶登陸
router.post("/login").consumes("application/json").handler(this::login);
// 用戶退出 (暫定)
router.get("/logout").handler(this::logout);
// 添加好友
router.post("/api/friend").consumes("application/json").handler(this::addFriend);
// 查詢 用戶的好友列表
router.get("/api/friends").handler(this::queryFriends);
// 查詢 私密用戶聊天記錄
router.get("/api/message").handler(this::queryMessage);
// 查詢當前登陸用戶數
router.get("/api/online").handler(this::queryUsersCount);
// 查詢全部註冊用戶
router.get("/api/users").handler(this::queryUsers);
// 新建羣組
router.post("/api/group").consumes("application/json").handler(this::createGroup);
// 查詢全部 Map (測試方便觀察數據)
router.get("/maps").handler(this::queryAll);
vertx.createHttpServer().requestHandler(router::accept).listen(port, server -> {
if (server.succeeded()){
log.info("Http服務啓動成功");
future.complete();
} else {
server.cause().printStackTrace();
log.error("Http服務啓動失敗," + server.cause().getMessage());
future.fail(server.cause());
}
});
vertx.exceptionHandler(ex -> {
ex.printStackTrace();
log.error("異常處理器: " + ex.getMessage());
});
} else {
future.fail(init.cause());
}
});
}
private void queryAll(RoutingContext ctx) {
JsonObject j = new JsonObject();
j.put("userMapCache", userMapCache);
j.put("userNameAndIdMapCache", userNameAndIdMapCache);
j.put("friendMapCache", friendMapCache);
j.put("onlineMapCache", onlineMapCache);
j.put("messageMapCache", messageMapCache);
j.put("groupMapCache", groupMapCache);
j.put("messageGroupMapCache", messageGroupMapCache);
resp(ctx.request(), j);
}
private void createGroup(RoutingContext ctx) {
JsonObject bodyAsJson = ctx.getBodyAsJson();
String groupName = bodyAsJson.getString("groupName");
String creater = bodyAsJson.getString("creater");
JsonArray members = bodyAsJson.getJsonArray("members");
if (StringUtil.isNullOrEmpty(groupName)
|| StringUtil.isNullOrEmpty(creater)
|| members == null
|| members.size() < 2) {
respError(ctx.request(), 404, null);
return;
}
Object o = userNameAndIdMapCache.get(creater);
long count = members.stream()
.filter(memberId -> userNameAndIdMapCache.get(memberId) != null)
.count();
boolean contains = members.contains(creater); // 要求成員中必須包含建立人
if (o == null || count != members.size() || !contains) {
respError(ctx.request(), 404, null);
return;
}
// TODO 檢查羣組中的人和建立人是不是好友關係
// TODO 建立人是否在線及細節羣組中的人不在線如何處理,在線如何處理
String groupId = Utils.uuid(); // 羣組ID
JsonObject body = new JsonObject();
body.put("id", groupId);
body.put("groupName", groupName);
body.put("creater", creater);
body.put("createTime", Utils.time());
body.put("members", members);
groupMap.put(groupId, body.encode());
groupMapCache.put(groupId, body);
resp(ctx.request(), body);
}
private void queryMessage(RoutingContext ctx) {
String senderName = ctx.request().getParam("sender");
String receiverName = ctx.request().getParam("receiver");
if (StringUtil.isNullOrEmpty(senderName) || StringUtil.isNullOrEmpty(receiverName)) {
respError(ctx.request(), 404, null);
return;
}
Object o = onlineMapCache.get(senderName);
if (o == null) {
respError(ctx.request(), 500, "請登陸後使用");
return;
}
Object o1 = userMapCache.get(receiverName);
if (o1 == null) {
respError(ctx.request(), 500, "不存在");
return;
}
JsonObject senderJson = (JsonObject) userMapCache.get(senderName);
JsonObject receiverJson = (JsonObject) o1;
Object o2 = friendMapCache.get(senderJson.getString("id"));
if (o2 == null) {
respError(ctx.request(), 500, "不存在");
return;
}
JsonArray friends = (JsonArray) o2;
if (friends.contains(receiverJson.getString("id"))) {
String senderId = senderJson.getString("id");
String receiverId = receiverJson.getString("id");
String msgMapKey = senderId.compareTo(receiverId) < 0 ? senderId+"-"+receiverId : receiverId+"-"+senderId;
Object msgList = messageMapCache.get(msgMapKey);
if (msgList == null) {
resp(ctx.request(), new JsonObject().put("msg", new JsonArray()));
} else {
JsonArray msgArr = (JsonArray) msgList;
List<Object> sortMsgs = msgArr.stream().sorted().collect(Collectors.toList());
resp(ctx.request(), new JsonObject().put("msg", sortMsgs));
}
} else {
respError(ctx.request(), 500, "不是好友關係");
}
}
private void queryFriends(RoutingContext context) {
String login = context.request().getParam("login");
if (StringUtil.isNullOrEmpty(login)) {
respError(context.request(), 404, "用戶不存在");
return;
}
Object o = onlineMapCache.get(login);
if (o == null) {
respError(context.request(), 500, "請先登陸");
return;
}
JsonObject userJson = (JsonObject) userMapCache.get(login);
Object friendsObj = friendMapCache.get(userJson.getString("id"));
if (friendsObj == null) {
respError(context.request(), 500, "請至少添加一個好友");
return;
}
log.info("friendsObj --> " + friendsObj.getClass().getName());
resp(context.request(), new JsonObject().put(login, friendsObj));
}
private void queryUsers(RoutingContext ctx) {
JsonObject data = new JsonObject();
userMapCache.forEach((k, v) -> {
data.put(k, v);
});
resp(ctx.request(), data);
}
private void queryUsersCount(RoutingContext ctx) {
long count = onlineMapCache.keySet().parallelStream().count();
resp(ctx.request(), new JsonObject().put("count", count));
}
private void logout(RoutingContext ctx) {
String login = ctx.request().getParam("login");
if (StringUtil.isNullOrEmpty(login)){
respError(ctx.request(), 404, null);
return;
}
Object o = userMapCache.get(login);
if (o == null) {
respError(ctx.request(), 404, null);
return;
}
Object o1 = onlineMapCache.get(login);
if (o1 == null) {
resp(ctx.request(), new JsonObject().put("msg", "成功退出"));
return;
} else {
onlineMapCache.remove(login);
onlineMap.remove(login);
JsonObject sendJson = new JsonObject();
sendJson.put("loginName", login);
eventBus.send("sweet-logout", sendJson, res -> {
if (res.succeeded()) {
JsonObject body = (JsonObject) res.result().body();
String code = body.getString("code");
if (code.equals("1")) {
resp(ctx.request(), new JsonObject().put("msg", "成功退出"));
} else {
resp(ctx.request(), new JsonObject().put("msg", "沒有數據"));
}
} else {
respError(ctx.request(), 500, res.cause().getMessage());
}
});
}
}
private void login(RoutingContext ctx) {
JsonObject bodyAsJson = ctx.getBodyAsJson();
String login = bodyAsJson.getString("login");
String passwd = bodyAsJson.getString("passwd");
if (StringUtil.isNullOrEmpty(login) || StringUtil.isNullOrEmpty(passwd)) {
respError(ctx.request(), 404, null);
return;
}
Object userValue = userMapCache.get(login);
if (userValue == null) {
respError(ctx.request(), 404, "用戶名或密碼錯誤");
return;
}
JsonObject userJson = (JsonObject) userValue;
if (userJson.getString("passwd").equals(passwd)) {
String time = Utils.time();
onlineMap.put(login, time);
onlineMapCache.put(login, time);
resp(ctx.request(), new JsonObject().put("msg", "登陸成功"));
} else {
respError(ctx.request(), 404, "用戶名或密碼錯誤");
}
}
private void addFriend(RoutingContext ctx) {
JsonObject bodyAsJson = ctx.getBodyAsJson();
String login = bodyAsJson.getString("login"); // 登陸用戶
String addName = bodyAsJson.getString("add"); // 要添加的好友
if (StringUtil.isNullOrEmpty(login) || StringUtil.isNullOrEmpty(addName)) {
respError(ctx.request(), 404, "找不到該用戶");
return;
}
Object o = userMapCache.get(login);
Object o1 = userMapCache.get(addName);
if (o == null || o1 == null) {
respError(ctx.request(), 404, "找不到該用戶");
return;
}
Object o2 = onlineMapCache.get(addName); // 檢查要添加的好友是否在線
if (o2 == null) {
respError(ctx.request(), 404, "該用戶不在線");
return;
}
JsonObject loginUser = (JsonObject) o;
JsonObject addUser = (JsonObject) o1;
String id = loginUser.getString("id");
String addUserId = addUser.getString("id");
Object o3 = friendMapCache.get(id);
if (o3 == null) {
JsonArray arr = new JsonArray();
arr.add(addUserId);
friendMap.put(id, arr.encode());
friendMapCache.put(id, arr);
} else {
JsonArray arr = (JsonArray) o3;
if (arr.contains(addUserId)) {
respError(ctx.request(), 500, "已添加過好友");
return;
}
arr.add(addUserId);
friendMap.put(id, arr.encode());
friendMapCache.put(id, arr);
}
resp(ctx.request(), new JsonObject().put("msg", "添加成功"));
}
private void register(RoutingContext routingContext) {
JsonObject bodyAsJson = routingContext.getBodyAsJson();
log.info(bodyAsJson);
String login = bodyAsJson.getString("login");
String name = bodyAsJson.getString("name");
String passwd = bodyAsJson.getString("passwd");
if (StringUtil.isNullOrEmpty(login)
|| StringUtil.isNullOrEmpty(name)
|| StringUtil.isNullOrEmpty(passwd)) {
respError(routingContext.request(), 404,null);
return;
}
Object v = userMapCache.get(login);
if (v != null) {
respError(routingContext.request(), 405, null);
return;
}
String uuid = Utils.uuid();
JsonObject obj = new JsonObject()
.put("id", uuid)
.put("name", name)
.put("login", login)
.put("passwd", passwd)
.put("createTime", Utils.time());
userMap.put(login, obj.encode());
userMapCache.put(login, obj);
userNameAndIdMap.put(uuid, login);
userNameAndIdMapCache.put(uuid, login);
JsonObject copy = obj.copy();
copy.remove("passwd");
resp(routingContext.request(), copy);
}
private Future<Boolean> initDB() {
Future<Boolean> initDBFuture = Future.future();
try {
db = DBMaker.fileDB(filePath +"sweet-im.db").closeOnJvmShutdown().make();
// 保存註冊用戶信息
userMap = db.hashMap("user-db", Serializer.STRING, Serializer.STRING).createOrOpen();
userMapCache = vertx.sharedData().getLocalMap("user-db-cache");
copyJsonObj(userMap, userMapCache); // 把文件中的用戶數據緩存到 內存中
// 保存好友關係
friendMap = db.hashMap("friend-db", Serializer.STRING, Serializer.STRING).createOrOpen();
friendMapCache = vertx.sharedData().getLocalMap("friend-db-cache");
copyJsonArray(friendMap, friendMapCache);
// 當前登陸用戶
onlineMap = db.hashMap("online-db", Serializer.STRING, Serializer.STRING).createOrOpen();
onlineMapCache = vertx.sharedData().getLocalMap("online-db-cache");
copyString(onlineMap, onlineMapCache);
// 私密聊天的消息記錄
messageMapCache = vertx.sharedData().getLocalMap("message-db-cache");
// 保存羣組聊天記錄
messageGroupMapCache = vertx.sharedData().getLocalMap("message-group-db-cache");
// 羣組關係
groupMap = db.hashMap("group-db", Serializer.STRING, Serializer.STRING).createOrOpen();
groupMapCache = vertx.sharedData().getLocalMap("group-db-cache");
copyJsonObj(groupMap, groupMapCache);
// 用戶名 - 用戶id
userNameAndIdMap = db.hashMap("username-id-db", Serializer.STRING, Serializer.STRING).createOrOpen();
userNameAndIdMapCache = vertx.sharedData().getLocalMap("username-id-db-cache");
copyString(userNameAndIdMap, userNameAndIdMapCache);
initDBFuture.complete(true);
} catch (Exception e) {
e.printStackTrace();
initDBFuture.fail(e.getCause());
}
return initDBFuture;
}
private void copyJsonObj(Map<String, String> sourceMap, LocalMap<String, Object> targetMap) {
sourceMap.forEach((k, v) -> targetMap.put(k, new JsonObject(v)));
}
private void copyJsonArray(Map<String, String> sourceMap, LocalMap<String, Object> targetMap) {
sourceMap.forEach((k, v) -> targetMap.put(k, new JsonArray(v)));
}
private void copyString(Map<String, String> sourceMap, LocalMap<String, Object> targetMap) {
sourceMap.forEach((k, v) -> targetMap.put(k, v));
}
private static void resp(HttpServerRequest request, JsonObject ret) {
request.response()
.putHeader("content-type", "application/json;charset=utf-8")
.putHeader("Access-Control-Allow-Origin", "*")
.putHeader("Access-Control-Allow-Credentials", "true")
.putHeader("Content-Disposition", "attachment")
.end(Json.encodePrettily(ret));
}
private static void respError(HttpServerRequest request, int code, String error) {
request.response()
.putHeader("content-type", "application/json;charset=utf-8")
.putHeader("Access-Control-Allow-Origin", "*")
.putHeader("Access-Control-Allow-Credentials", "true")
.putHeader("Content-Disposition", "attachment")
.setStatusCode(code)
.end(Json.encodePrettily(new JsonObject().put("error", error)));
}
@Override
public void stop(Future<Void> stopFuture) throws Exception {
userMap.close();
friendMap.close();
onlineMap.close();
groupMap.close();
userNameAndIdMap.close();
if (!db.isClosed()) {
db.commit();
db.close();
}
stopFuture.complete();
}
}
複製代碼
package com.xiaoniu.im.socket;
import com.xiaoniu.im.utils.Utils;
import io.netty.util.internal.StringUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.LocalMap;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.HTreeMap;
import org.mapdb.Serializer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** * socket 通訊模塊 主要處理聊天 * Created by sweet on 2017/9/26. */
public class SocketVerticle extends AbstractVerticle {
private static final Logger log = LoggerFactory.getLogger(SocketVerticle.class);
private static final String filePath = "C:\\xiaoniu_doc\\vertx\\sweet-im\\";
private static final Integer port = 8081;
private DB db;
private HTreeMap<String, String> messageMap; // 保存用戶私密聊天記錄
private LocalMap<String, Object> messageMapCache; // 保存用戶私密聊天記錄 (內存共享版)
private HTreeMap<String, String> messageGroupMap; // 保存羣組聊天記錄
private LocalMap<String, Object> messageGroupMapCache; // 保存羣組聊天記錄 (內存共享版)
private LocalMap<String, Object> userMapCache; // 保存註冊用戶信息(內存共享版)
private LocalMap<String, Object> friendMapCache; // 保存好友關係(內存共享版)
private LocalMap<String, Object> onlineMapCache; // 當前登陸用戶 (內存共享版)
private LocalMap<String, Object> userNameAndIdMapCache; // 用戶id - 用戶名 方便快速查詢 (內存版)
private LocalMap<String, Object> groupMapCache; // 羣組關係 (內存共享版)
private Map<String, ServerWebSocket> socketMap; // 每一個用戶對應一個 socket鏈接
private EventBus eventBus; // 處理用戶退出,關閉並刪除 socket 引用
@Override
public void start(Future<Void> future) throws Exception {
eventBus = vertx.eventBus();
initDB().setHandler(res -> {
vertx.createHttpServer().websocketHandler(serverWebSocket -> {
String path = serverWebSocket.path();
String query = serverWebSocket.query();
log.info("path: " + path + ", socket id: " + serverWebSocket.textHandlerID());
log.info("query: " + query);
if (!"/socket".equals(path) || StringUtil.isNullOrEmpty(query) || !query.startsWith("id=")) {
serverWebSocket.reject();
serverWebSocket.close();
return;
}
String userId = query.substring(3, query.length()); // 當前登陸用戶的ID
if (StringUtil.isNullOrEmpty(userId)) {
serverWebSocket.reject();
serverWebSocket.close();
return;
}
Object o = userNameAndIdMapCache.get(userId); // 判斷用戶是不是註冊用戶 o = 用戶name
if (o == null || StringUtil.isNullOrEmpty(o.toString())) {
serverWebSocket.reject();
serverWebSocket.close();
return;
}
Object o1 = onlineMapCache.get(o.toString()); // 判斷用戶是否在線
if (o1 == null) {
serverWebSocket.reject();
serverWebSocket.close();
return;
}
// TODO 用戶刷新如何處理 Socket鏈接 ?
put(socketMap, userId, serverWebSocket);
serverWebSocket.handler(buffer -> {
System.out.println("-------------Message------------");
System.out.println("收到的buffer : " + buffer);
System.out.println("-------------Message------------");
JsonObject jsonObject = buffer.toJsonObject();
// 好友之間聊天 所需字段
String to = jsonObject.getString("to"); // 接受人的名字
String from = jsonObject.getString("from"); // 發送人的名字
String msg = jsonObject.getString("msg");
// 羣組之間聊天 所需字段
String groupId = jsonObject.getString("groupId");
String fromForGroup = jsonObject.getString("from");
if (StringUtil.isNullOrEmpty(from) || !onlineMapCache.containsKey(from) || !from.equals(o)) {
serverWebSocket.writeTextMessage("字段不能爲空"); // 缺乏字段 和 發送人沒有登陸
return;
}
// 好友之間聊天 start ===========================================
// 好友之間聊天 TODO 所有暫定只能發送在線用戶
if (!StringUtil.isNullOrEmpty(to) && msg != null && onlineMapCache.containsKey(to)) {
Object o4 = friendMapCache.get(userId);
if (o4 == null) {
serverWebSocket.writeTextMessage("你尚未好友"); // 缺乏字段 和 發送人沒有登陸
return;
}
JsonArray fromFriends = (JsonArray) o4; // 發送人的好友
Object o2 = userMapCache.get(to);
JsonObject toUserJson = (JsonObject) o2;
Object o5 = friendMapCache.get(toUserJson.getString("id"));
if (o5 == null) {
serverWebSocket.writeTextMessage("你倆不是好友關係"); // 缺乏字段 和 發送人沒有登陸
return;
}
JsonArray toFriends = (JsonArray) o5;
if (fromFriends.contains(toUserJson.getString("id")) && toFriends.contains(userId)) { // 肯定雙方好友關係
String toUserId = toUserJson.getString("id"); // 接收人ID
ServerWebSocket toUserServerWebSocket = socketMap.get(toUserId); // TODO 暫時不作判斷 是否在線,不在線如何處理
String msgMapKey = toUserId.compareTo(userId) < 0 ? toUserId+"-"+userId : userId+"-"+toUserId;
String msgValue = Utils.time()+"-"+from+"-"+msg;
if (messageMapCache.containsKey(msgMapKey)) {
Object o3 = messageMapCache.get(msgMapKey);
JsonArray msgArr = (JsonArray) o3;
msgArr.add(msgValue);
messageMap.put(msgMapKey, msgArr.encode());
messageMapCache.put(msgMapKey, msgArr);
} else {
JsonArray jsonArray = new JsonArray();
jsonArray.add(msgValue);
messageMap.put(msgMapKey, jsonArray.encode());
messageMapCache.put(msgMapKey, jsonArray);
}
toUserServerWebSocket.writeTextMessage(msg);
return;
} else {
serverWebSocket.writeTextMessage("你倆不是好友關係2"); // 缺乏字段 和 發送人沒有登陸
return;
}// 好友之間聊天 end ===========================================
// 羣組之間聊天 start ===========================================
} else if (!StringUtil.isNullOrEmpty(groupId)
&& !StringUtil.isNullOrEmpty(fromForGroup)
&& msg != null && groupMapCache.get(groupId) != null) {
log.info("==================羣組聊天==================");
Object o2 = groupMapCache.get(groupId);
JsonObject groupJsonObj = (JsonObject) o2;
if (onlineMapCache.containsKey(fromForGroup)) {
JsonArray members = groupJsonObj.getJsonArray("members"); // 羣組成員
JsonObject fromUserJson = (JsonObject) userMapCache.get(fromForGroup);
if (members.contains(fromUserJson.getString("id"))) {
sendGroupMessage(groupId, fromUserJson.getString("login"),
fromUserJson.getString("id"), members, msg); // 給所有羣組成員發送消息
} else {
serverWebSocket.writeTextMessage("你不是羣組成員");
}
return;
} else {
serverWebSocket.writeTextMessage("請先登陸");
return;
} // 羣組之間聊天 end ===========================================
} else {
// 其餘不符合狀況
serverWebSocket.writeTextMessage("字段不能爲空!!!");
return;
}
});
// 異常處理
serverWebSocket.exceptionHandler(ex -> {
ex.printStackTrace();
log.error(ex.getMessage());
});
}).listen(port, server -> {
if (server.succeeded()) {
log.info("socket server 啓動成功 端口8081");
future.complete();
} else {
log.error(server.cause().getMessage());
server.cause().printStackTrace();
future.fail(server.cause());
}
});
vertx.setPeriodic(10000, timer -> {
System.out.println("------------Socket Map Start---------");
socketMap.forEach((k, v) -> System.out.println("k: " + k + ", v: " + v));
System.out.println("------------Socket Map End ---------");
});
////
// vertx.setPeriodic(10000, timer -> {
// System.out.println("----------Message Map Start-----------");
// messageMapCache.forEach((k, v) -> System.out.println("k: " + k + ", v: " + v));
// System.out.println("----------Message Map End -----------");
// });
// 接受用戶退出消息,而後處理 SocketMap中的引用
eventBus.consumer("sweet-logout", msg -> {
JsonObject body = (JsonObject) msg.body();
String loginName = body.getString("loginName");
ServerWebSocket serverWebSocket = socketMap.get(loginName);
JsonObject replyMsg = new JsonObject();
if (serverWebSocket != null) {
serverWebSocket.close();
socketMap.remove(loginName);
replyMsg.put("code", "1"); // code 1 退出成功
msg.reply(replyMsg);
} else {
replyMsg.put("code", "0"); // code 0 沒有數據
msg.reply(replyMsg);
}
});
});
}
/** * @param groupId 羣組id * @param fromUserLogin 發送人login * @param fromUserId 發送人Id * @param members 羣組成員 id * @param msg 消息 */
private void sendGroupMessage(String groupId, String fromUserLogin, String fromUserId, JsonArray members, String msg) {
String msgValue = Utils.time()+"-"+fromUserLogin+"-"+msg; // 要保存的聊天記錄
JsonArray copy = members.copy();
copy.remove(fromUserId);
if (messageGroupMapCache.containsKey(groupId)) {
Object o = messageGroupMapCache.get(groupId);
JsonArray messageGroupArr = (JsonArray) o; // 羣組聊天記錄
messageGroupArr.add(msgValue);
messageGroupMap.put(groupId, messageGroupArr.encode());
messageGroupMapCache.put(groupId, messageGroupArr);
} else {
JsonArray msgArr = new JsonArray();
msgArr.add(msgValue);
messageGroupMap.put(groupId, msgArr.encode());
messageGroupMapCache.put(groupId, msgArr);
}
copy.forEach(memberId -> {
System.out.println("羣發消息 " + Utils.time());
ServerWebSocket serverWebSocket = socketMap.get(memberId); // TODO 沒有處理不在線的狀況,方便測試默認都在線
serverWebSocket.writeTextMessage(msg);
});
}
private void put(Map<String, ServerWebSocket> socketMap, String userId, ServerWebSocket serverWebSocket) {
try {
if (socketMap.containsKey(userId)) {
log.error(" ********* 鏈接存在,清除舊鏈接 ********* ");
ServerWebSocket serverWebSocket1 = socketMap.get(userId);
serverWebSocket1.close();
socketMap.remove(userId);
} else {
socketMap.put(userId, serverWebSocket);
}
} catch (Exception e) {
log.error("異常捕獲, " + e.getMessage());
socketMap.put(userId, serverWebSocket);
}
}
private Future<Boolean> initDB() {
Future<Boolean> initDBFuture = Future.future();
try {
// 保存用戶私密聊天記錄
db = DBMaker.fileDB(filePath + "sweet-msg-im.db").closeOnJvmShutdown().make();
messageMap = db.hashMap("message-db", Serializer.STRING, Serializer.STRING).createOrOpen();
messageMapCache = vertx.sharedData().getLocalMap("message-db-cache");
copyJsonArray(messageMap, messageMapCache);
// 保存羣組聊天記錄
messageGroupMap = db.hashMap("message-group-db", Serializer.STRING, Serializer.STRING).createOrOpen();
messageGroupMapCache = vertx.sharedData().getLocalMap("message-group-db-cache");
copyJsonArray(messageGroupMap, messageGroupMapCache);
// 保存註冊用戶信息
userMapCache = vertx.sharedData().getLocalMap("user-db-cache");
// 保存好友關係
friendMapCache = vertx.sharedData().getLocalMap("friend-db-cache");
// 當前登陸用戶
onlineMapCache = vertx.sharedData().getLocalMap("online-db-cache");
// 用戶名 - 用戶id
userNameAndIdMapCache = vertx.sharedData().getLocalMap("username-id-db-cache");
// 羣組關係
groupMapCache = vertx.sharedData().getLocalMap("group-db-cache");
socketMap = new ConcurrentHashMap<>();
initDBFuture.complete(true);
} catch (Exception e) {
e.printStackTrace();
initDBFuture.fail(e.getCause());
}
return initDBFuture;
}
@Override
public void stop(Future<Void> stopFuture) throws Exception {
messageMap.close();
messageGroupMap.close();
if (!db.isClosed()) {
db.commit();
db.close();
}
stopFuture.complete();
}
private void copyJsonArray(Map<String, String> sourceMap, LocalMap<String, Object> targetMap) {
sourceMap.forEach((k, v) -> targetMap.put(k, new JsonArray(v)));
}
}
複製代碼
package com.xiaoniu.im;
import com.xiaoniu.im.rest.HttpVerticle;
import com.xiaoniu.im.socket.SocketVerticle;
import com.xiaoniu.im.utils.Runner;
import io.vertx.core.*;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
/** * Verticle 啓動類 * Created by sweet on 2017/9/26. */
public class BootVerticle extends AbstractVerticle{
private static final Logger log = LoggerFactory.getLogger(BootVerticle.class);
public static void main(String[] args) {
Runner.runExample(BootVerticle.class);
}
@Override
public void start(Future<Void> startFuture) throws Exception {
Future<String> future = Future.future();
Future<String> future1 = Future.future();
vertx.deployVerticle(new HttpVerticle(), future1);
future1.setHandler(res -> {
if (res.succeeded()) {
vertx.deployVerticle(new SocketVerticle(), future);
} else {
startFuture.fail(res.cause());
return;
}
});
future.setHandler(handler -> {
if (handler.succeeded()) {
startFuture.complete();
log.info("所有部署 OK");
} else {
startFuture.fail(handler.cause());
}
});
}
}
複製代碼
使用MapDB存儲,須要修改代碼裏的文件存儲路徑,代碼寫的沒那麼精緻,沒有使用配置文件,源碼裏有readme,寫了如何使用 源碼地址 連接:http://pan.baidu.com/s/1o8ysUQQ 密碼:5crmjava