使用JDK的觀察者接口進行消息推送

觀察者模式就是對對象內部的變化進行觀察,當發生改變時作出相應的響應。代碼樣例見 設計模式整理 !java

由於觀察者模式較爲重要,使用頻率較高,JDK早已經提供了內置的觀察者接口以及被觀察者父類。設計模式

JDK中的觀察者接口源碼以下數組

public interface Observer {
    /**
     * 當被觀察者發生變化時,執行的方法
     *
     * @param   o     被觀察者父類,這裏通常要強制轉化成被觀察者子類
     * @param   arg   an argument passed to the <code>notifyObservers</code>
     *                 method.
     */
    void update(Observable o, Object arg);
}

被觀察者父類源碼,咱們能夠看到它使用了Vector的List列表來保存觀察者接口對象,Vector自己是線程安全的,雖然如今已經用的並很少。緩存

public class Observable {
    //被觀察者是否發生了改變
    private boolean changed = false;
    //保存觀察者接口對象的列表
    private Vector<Observer> obs;

    /** Construct an Observable with zero Observers. */

    public Observable() {
        obs = new Vector<>();
    }

    /**
     * 註冊觀察者接口對象,加了顯示鎖來保證線程安全
     *
     * @param   o   an observer to be added.
     * @throws NullPointerException   if the parameter o is null.
     */
    public synchronized void addObserver(Observer o) {
        if (o == null)
            throw new NullPointerException();
        if (!obs.contains(o)) {
            obs.addElement(o);
        }
    }

    /**
     * 移除觀察者接口對象
     * @param   o   the observer to be deleted.
     */
    public synchronized void deleteObserver(Observer o) {
        obs.removeElement(o);
    }

    /**
     * 當被觀察者發生改變時,通知觀察者
     *
     * @see     java.util.Observable#clearChanged()
     * @see     java.util.Observable#hasChanged()
     * @see     java.util.Observer#update(java.util.Observable, java.lang.Object)
     */
    public void notifyObservers() {
        notifyObservers(null);
    }

    /**
     * 調用觀察者對象完成響應操做
     *
     * @param   arg   any object.
     * @see     java.util.Observable#clearChanged()
     * @see     java.util.Observable#hasChanged()
     * @see     java.util.Observer#update(java.util.Observable, java.lang.Object)
     */
    public void notifyObservers(Object arg) {
        /*
         * a temporary array buffer, used as a snapshot of the state of
         * current Observers.
         */
        Object[] arrLocal;

        synchronized (this) {
            if (!changed)
                return;
            //將觀察者列表轉化成數組
            arrLocal = obs.toArray();
            clearChanged();
        }
        //調用逐個觀察者進行響應
        for (int i = arrLocal.length-1; i>=0; i--)
            ((Observer)arrLocal[i]).update(this, arg);
    }

    /**
     * 清除全部的觀察者接口對象
     */
    public synchronized void deleteObservers() {
        obs.removeAllElements();
    }

    /**
     * 告知被觀察者已經發生了改變,這是一個線程安全的
     */
    protected synchronized void setChanged() {
        changed = true;
    }

    /**
     * 觀察者已經作出反應後恢復被觀察者的改變狀態爲未改變
     *
     * @see     java.util.Observable#notifyObservers()
     * @see     java.util.Observable#notifyObservers(java.lang.Object)
     */
    protected synchronized void clearChanged() {
        changed = false;
    }

    /**
     * 測試被觀察者是否發生了改變
     *
     * @return  <code>true</code> if and only if the <code>setChanged</code>
     *          method has been called more recently than the
     *          <code>clearChanged</code> method on this object;
     *          <code>false</code> otherwise.
     * @see     java.util.Observable#clearChanged()
     * @see     java.util.Observable#setChanged()
     */
    public synchronized boolean hasChanged() {
        return changed;
    }

    /**
     * 獲取觀察者的數量
     *
     * @return  the number of observers of this object.
     */
    public synchronized int countObservers() {
        return obs.size();
    }
}

咱們先用一個簡單的樣例來講明如何使用JDK的這一套觀察者模式安全

首先咱們須要一個實際的觀察者來實現觀察者接口app

public class Subscribe implements Observer {
    /**
     * 構造函數,讓被觀察者註冊本身,便於本身對被觀察者進行觀察
     * @param o
     */
    public Subscribe(Observable o) {
        o.addObserver(this);
    }

    /**
     * 被觀察者發生變化時,作出響應
     * @param o
     * @param arg
     */
    @Override
    public void update(Observable o, Object arg) {
        System.out.println("收到通知" + ((Publish)o).getData());
    }
}

被觀察者ide

/**
 * 被觀察者子類
 */
public class Publish extends Observable {
    @Getter
    private String data = "";

    /**
     * 當data屬性發生變化時,通知觀察者作出響應
     * @param data
     */
    public void setData(String data) {
        if (this.data !=null && !this.data.equals(data)) {
            this.data = data;
            setChanged();
        }
        notifyObservers();
    }
}

main方法函數

public class ObserverMain {
    public static void main(String[] args) {
        //建立被觀察者子類對象
        Publish publish = new Publish();
        //建立觀察者對象,並註冊進被觀察者子類中
        new Subscribe(publish);
        //被觀察者發生變化
        publish.setData("開始");
    }
}

運行結果測試

收到通知開始ui

這是一個相對簡單的樣例,通常咱們會使用觀察者模式來進行MQ消息隊列的發送。

以RabbitMQ爲例,有一個門店的隊列,交換機

/**
 * rabbitmq配置
 *
 */
@Configuration
public class RabbitmqConfig {
   public static final String STORE_QUEUE = "store";
   @Bean
   public Queue storeQueue() {
      return new Queue(STORE_QUEUE);
   }


   @Bean
   public TopicExchange providerExchange() {
      return new TopicExchange(ServiceProviderCenterMq.MQ_EXCHANGE_PROVIDER);
   }

   @Bean
   public Binding bingingStoreToProvider(){
      return BindingBuilder.bind(storeQueue()).to(providerExchange())
            .with(ServiceProviderCenterMq.ROUTING_KEY_ROLE_ADD);
   }
}

重寫消息生產者

@Slf4j
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String exchange,String routingKey,Object content) {
        log.info("send content=" + content);
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content));
    }

    /**
     * 確認後回調:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.info("send ack fail, cause = " + cause);
        } else {
            log.info("send ack success");
        }
    }

    /**
     * 失敗後return回調:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
    }

    /**
     * 對消息對象進行二進制序列化
     * @param o
     * @return
     */
    private byte[] serialize(Object o) {
        Kryo kryo = new Kryo();
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Output output = new Output(stream);
        kryo.writeObject(output, o);
        output.close();
        return stream.toByteArray();
    }
}

咱們的觀察者類,對門店服務的服務集合新增服務對象進行觀察

/**
 * 服務新增觀察者
 */
public class ServiceObserver implements Observer {
    private MessageSender sender = SpringBootUtil.getBean(MessageSender.class);
    public ServiceObserver(Observable o) {
        o.addObserver(this);
    }
    @Override
    public void update(Observable o, Object arg) {
        CompletableFuture.runAsync(() ->
            this.sender.send(ServiceProviderCenterMq.MQ_EXCHANGE_PROVIDER,
                ServiceProviderCenterMq.ROUTING_KEY_ROLE_ADD,
                ((ProviderServiceLevel) o).getServiceProviders().poll())
        );
    }
}

具體的被觀察者類爲門店服務的分類

public class ProviderServiceLevel extends Observable implements Provider

它有一個服務的隊列

@Getter
private Queue<Provider> serviceProviders = new ConcurrentLinkedQueue<>();

也有一個服務的列表

@Getter
private List<Provider> serviceListProviders = new CopyOnWriteArrayList<>();

服務分類添加服務對象的方法,你們能夠思考一下爲何使用隊列,而不是直接使用列表在觀察者中取出服務對象

@Override
public boolean addProvider(Provider provider) {
    ServiceDao serviceDao = SpringBootUtil.getBean(ServiceDao.class);
    //若是添加的是服務分類
    if (provider instanceof ProviderServiceLevel) {
        ParamLevel paramLevel = new ParamLevel(this.id, ((ProviderServiceLevel) provider).getId());
        ((ProviderServiceLevel) provider).setLevel(2);
        serviceDao.addLevelToLevel(paramLevel);
    //若是添加的是服務    
    }else if (provider instanceof ProviderService) {
        ParamLevel paramLevel = new ParamLevel(this.id, ((ProviderService) provider).getService().getId());
        serviceDao.addServiceToLevel(paramLevel);
        this.serviceListProviders.add(provider);
        //將添加的服務入隊列
        this.serviceProviders.add(provider);
        setChanged();
        //通知觀察者取出隊列服務對象,進行MQ的發送
        notifyObservers();
        return true;
    }
    return this.serviceListProviders.add(provider);
}

最後在訪問的controller中會放三個緩存

//服務分類緩存
private Map<Long,Provider> cacheLevelProvider = new ConcurrentHashMap<>();
//門店緩存
private Map<Long,Provider> cacheStoreProvider = new ConcurrentHashMap<>();
//觀察者緩存
private Map<Long,ServiceObserver> observerMap = new ConcurrentHashMap<>();

具體的添加服務方法以下

/**
 * 新增商家服務
 * @param providerService
 * @return
 */
@Transactional
@SuppressWarnings("uncheck")
@PostMapping("/serviceprovider-anon/newproviderservice")
public Result<String> newProviderService(@RequestParam("storeid") Long storeId,
                                         @RequestParam("servicelevelid") Long serviceLevelId,
                                         @RequestBody ProviderService providerService) {
    Provider serviceLevelProvider;
    Provider service = ProviderFactory.createProviderService(providerService, true);
    if (cacheLevelProvider.containsKey(serviceLevelId)) {
        serviceLevelProvider = this.cacheLevelProvider.get(serviceLevelId);
    }else {
        serviceLevelProvider = this.levelProvider.findProvider(serviceLevelId);
        this.cacheLevelProvider.put(serviceLevelId,serviceLevelProvider);
    }
    //此處若是不將觀察者對象加入緩存,就會不斷建立觀察者,其實咱們只須要一個觀察者
    if (!observerMap.containsKey(serviceLevelId)) {
        observerMap.put(serviceLevelId,new ServiceObserver((ProviderServiceLevel) serviceLevelProvider));
    }
    //服務分類添加了服務對象,就會發生被觀察者狀態的改變,使觀察者作出響應
    serviceLevelProvider.addProvider(service);
    Provider storeProvider;
    if (cacheStoreProvider.containsKey(storeId)) {
        storeProvider = cacheStoreProvider.get(storeId);
    }else {
        storeProvider = this.storeProvider.findProvider(storeId);
        cacheStoreProvider.put(storeId,storeProvider);
    }
    storeProvider.addProvider(service);
    return Result.success("添加商家服務成功");
}
相關文章
相關標籤/搜索