Web項目(四)————異步隊列的實現

目錄:java

1.概述web

2.JedisAdapterredis

3.EventModelspring

4.EventHandlerapache

5.LikeHandlerjson

6.EventProducer服務器

7.EventConsumerapp

 

1.概述異步

在一個網站中,一個業務發生的同時,還有一些後續業務須要發生。async

好比點贊,除了完成點贊功能外,還有一系列,好比提醒被點讚的用戶等等,爲了可以實現這些操做而且不拖慢單純點贊功能的實現,咱們將這些使用異步隊列實現。

處理流程以下圖:

Biz爲業務部門,理解爲點讚的實現,也就是在實現點讚的同時經過EventProducer發送一個事件;

這個事件進入隊列等待,隊列另外一頭,有一個EventConsumer,不斷消費事件;

EventConsumer下面有不少EventHandler,只要EventHandler發現本身須要處理的事件類型,就會進行相應的操做。

優勢:①後續業務的實現,不會拖慢主業務。②若是後續業務的服務器掛掉,只要重啓,繼續從優先隊列消費事件便可。

2.JedisAdapter

在jedisAdapter中添加以下方法

public void setObject(String key, Object obj) {//將一個對象轉換爲一個jsoon串存入set中
        set(key, JSON.toJSONString(obj));
}


public <T> T getObject(String key, Class<T> clazz) {//從set中取出json串,並轉換爲相應object
        String value = get(key);
        if (value != null) {
            return JSON.parseObject(value, clazz);
        }
        return null;
}

3.EventModel

即發送的隊列的事件模型,只有一些基本屬性和get、set方法。

其中一些set的return 設置爲this,是由於方便連續set多個屬性。

package com.nowcoder.async;

import java.awt.*;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by nowcoder on 2016/7/14.
 */
public class EventModel {
    private EventType type;
    private int actorId;
    private int entityId;
    private int entityType;
    private int entityOwnerId;
    private Map<String, String> exts = new HashMap<>();

    public Map<String, String> getExts() {
        return exts;
    }
    public EventModel() {

    }
    public EventModel(EventType type) {
        this.type = type;
    }

    public String getExt(String name) {
        return exts.get(name);
    }

    public EventModel setExt(String name, String value) {
        exts.put(name, value);
        return this;
    }

    public EventType getType() {
        return type;
    }

    public EventModel setType(EventType type) {
        this.type = type;
        return this;
    }

    public int getActorId() {
        return actorId;
    }

    public EventModel setActorId(int actorId) {
        this.actorId = actorId;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public EventModel setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public EventModel setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityOwnerId() {
        return entityOwnerId;
    }

    public EventModel setEntityOwnerId(int entityOwnerId) {
        this.entityOwnerId = entityOwnerId;
        return this;
    }
}

EventType

package com.nowcoder.async;

/**
 * Created by nowcoder on 2016/7/14.
 */
public enum EventType {//枚舉類
    LIKE(0),
    COMMENT(1),
    LOGIN(2),
    MAIL(3);

    private int value;
    EventType(int value) {
        this.value = value;
    }
    public int getValue() {
        return value;
    }
}

4.EventHandler

設計爲一個接口,handler都實現此接口。

package com.nowcoder.async;

import java.util.List;

/**
 * Created by nowcoder on 2016/7/14.
 */
public interface EventHandler {
    void doHandle(EventModel model);//處理此事件
    List<EventType> getSupportEventTypes();//添加監視的事件類型
}

5.LikeHandler

實現EventHandler接口。

package com.nowcoder.async.handler;

import com.nowcoder.async.EventHandler;
import com.nowcoder.async.EventModel;
import com.nowcoder.async.EventType;
import com.nowcoder.model.Message;
import com.nowcoder.model.User;
import com.nowcoder.service.MessageService;
import com.nowcoder.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.Date;
import java.util.List;

/**
 * Created by nowcoder on 2016/7/14.
 */
@Component
public class LikeHandler implements EventHandler {
    @Autowired
    MessageService messageService;

    @Autowired
    UserService userService;

    @Override
    public void doHandle(EventModel model) {
        Message message = new Message();
        User user = userService.getUser(model.getActorId());
        message.setToId(model.getEntityOwnerId());
        message.setContent("用戶" + user.getName() +
                " 讚了你的資訊,http://127.0.0.1:8080/news/"
                + String.valueOf(model.getEntityId()));
        // SYSTEM ACCOUNT
        message.setFromId(3);
        message.setCreatedDate(new Date());
        messageService.addMessage(message);
    }

    @Override
    public List<EventType> getSupportEventTypes() {//只關注「like」類型的事件
        return Arrays.asList(EventType.LIKE);
    }
}

6.EventProducer

負責將事件添加到隊列中。

package com.nowcoder.async;

import com.alibaba.fastjson.JSONObject;
import com.nowcoder.util.JedisAdapter;
import com.nowcoder.util.RedisKeyUtil;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.ExceptionHandler;

/**
 * Created by nowcoder on 2016/7/14.
 */
@Service
public class EventProducer {

    @Autowired
    JedisAdapter jedisAdapter;

    public boolean fireEvent(EventModel eventModel) {
        try {
            String json = JSONObject.toJSONString(eventModel);//生成value
            String key = RedisKeyUtil.getEventQueueKey();//生成key
            jedisAdapter.lpush(key, json);//將事件添加到隊列中
            return true;
        } catch (Exception e) {
            return false;
        }
    }
}

7.EventConsumer

負責從隊列中消費事件。

package com.nowcoder.async;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.nowcoder.async.handler.LoginExceptionHandler;
import com.nowcoder.util.JedisAdapter;
import com.nowcoder.util.RedisKeyUtil;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by nowcoder on 2016/7/14.
 */
@Service
public class EventConsumer implements InitializingBean, ApplicationContextAware {

//InitializingBean  經過實現此接口的afterPropertiesSet()方法記錄哪些Event須要哪些handler來處理
//ApplicationContextAware  經過實現此接口的setApplicationContext方法獲取上下文

    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    //用來存儲一個事件類型對應的全部的eventhandler,下次有該事件產生時,便可直接調用對應的list
    private Map<EventType, List<EventHandler>> config = new HashMap<>();

    private ApplicationContext applicationContext;

    @Autowired
    private JedisAdapter jedisAdapter;

    @Override
    public void afterPropertiesSet() throws Exception {//記錄哪些Event須要哪些handler來處理
        //找出上下文中全部實現了EventHandler接口的類,存入beans
        Map<String, EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class);
        if (beans != null) {
            //遍歷全部的handler,將他們存入他們所監聽的eventType對應的list中
            for (Map.Entry<String, EventHandler> entry : beans.entrySet()) {
                List<EventType> eventTypes = entry.getValue().getSupportEventTypes();//查看事件的監視事件
                for (EventType type : eventTypes) {
                    if (!config.containsKey(type)) {
                        config.put(type, new ArrayList<EventHandler>());
                    }

                    // 註冊每一個事件的處理函數
                    config.get(type).add(entry.getValue());
                }
            }
        }

        // 啓動線程去消費事件
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                // 從隊列一直消費
                while (true) {
                    String key = RedisKeyUtil.getEventQueueKey();
                    List<String> messages = jedisAdapter.brpop(0, key);//從redis的隊列中取出事件,並存入list中
                    // 第一個元素是隊列名字,跳過
                    for (String message : messages) {
                        if (message.equals(key)) {
                            continue;
                        }

                        EventModel eventModel = JSON.parseObject(message, EventModel.class);
                        // 找到這個事件的處理handler列表
                        if (!config.containsKey(eventModel.getType())) {
                            logger.error("不能識別的事件");
                            continue;
                        }

                        for (EventHandler handler : config.get(eventModel.getType())) {//處理他的全部的handler
                            handler.doHandle(eventModel);
                        }
                    }
                }
            }
        });
        thread.start();
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
相關文章
相關標籤/搜索