觀察者模式就是對對象內部的變化進行觀察,當發生改變時作出相應的響應。代碼樣例見 設計模式整理 !java
由於觀察者模式較爲重要,使用頻率較高,JDK早已經提供了內置的觀察者接口以及被觀察者父類。設計模式
JDK中的觀察者接口源碼以下數組
public interface Observer { /** * 當被觀察者發生變化時,執行的方法 * * @param o 被觀察者父類,這裏通常要強制轉化成被觀察者子類 * @param arg an argument passed to the <code>notifyObservers</code> * method. */ void update(Observable o, Object arg); }
被觀察者父類源碼,咱們能夠看到它使用了Vector的List列表來保存觀察者接口對象,Vector自己是線程安全的,雖然如今已經用的並很少。緩存
public class Observable { //被觀察者是否發生了改變 private boolean changed = false; //保存觀察者接口對象的列表 private Vector<Observer> obs; /** Construct an Observable with zero Observers. */ public Observable() { obs = new Vector<>(); } /** * 註冊觀察者接口對象,加了顯示鎖來保證線程安全 * * @param o an observer to be added. * @throws NullPointerException if the parameter o is null. */ public synchronized void addObserver(Observer o) { if (o == null) throw new NullPointerException(); if (!obs.contains(o)) { obs.addElement(o); } } /** * 移除觀察者接口對象 * @param o the observer to be deleted. */ public synchronized void deleteObserver(Observer o) { obs.removeElement(o); } /** * 當被觀察者發生改變時,通知觀察者 * * @see java.util.Observable#clearChanged() * @see java.util.Observable#hasChanged() * @see java.util.Observer#update(java.util.Observable, java.lang.Object) */ public void notifyObservers() { notifyObservers(null); } /** * 調用觀察者對象完成響應操做 * * @param arg any object. * @see java.util.Observable#clearChanged() * @see java.util.Observable#hasChanged() * @see java.util.Observer#update(java.util.Observable, java.lang.Object) */ public void notifyObservers(Object arg) { /* * a temporary array buffer, used as a snapshot of the state of * current Observers. */ Object[] arrLocal; synchronized (this) { if (!changed) return; //將觀察者列表轉化成數組 arrLocal = obs.toArray(); clearChanged(); } //調用逐個觀察者進行響應 for (int i = arrLocal.length-1; i>=0; i--) ((Observer)arrLocal[i]).update(this, arg); } /** * 清除全部的觀察者接口對象 */ public synchronized void deleteObservers() { obs.removeAllElements(); } /** * 告知被觀察者已經發生了改變,這是一個線程安全的 */ protected synchronized void setChanged() { changed = true; } /** * 觀察者已經作出反應後恢復被觀察者的改變狀態爲未改變 * * @see java.util.Observable#notifyObservers() * @see java.util.Observable#notifyObservers(java.lang.Object) */ protected synchronized void clearChanged() { changed = false; } /** * 測試被觀察者是否發生了改變 * * @return <code>true</code> if and only if the <code>setChanged</code> * method has been called more recently than the * <code>clearChanged</code> method on this object; * <code>false</code> otherwise. * @see java.util.Observable#clearChanged() * @see java.util.Observable#setChanged() */ public synchronized boolean hasChanged() { return changed; } /** * 獲取觀察者的數量 * * @return the number of observers of this object. */ public synchronized int countObservers() { return obs.size(); } }
咱們先用一個簡單的樣例來講明如何使用JDK的這一套觀察者模式安全
首先咱們須要一個實際的觀察者來實現觀察者接口app
public class Subscribe implements Observer { /** * 構造函數,讓被觀察者註冊本身,便於本身對被觀察者進行觀察 * @param o */ public Subscribe(Observable o) { o.addObserver(this); } /** * 被觀察者發生變化時,作出響應 * @param o * @param arg */ @Override public void update(Observable o, Object arg) { System.out.println("收到通知" + ((Publish)o).getData()); } }
被觀察者ide
/** * 被觀察者子類 */ public class Publish extends Observable { @Getter private String data = ""; /** * 當data屬性發生變化時,通知觀察者作出響應 * @param data */ public void setData(String data) { if (this.data !=null && !this.data.equals(data)) { this.data = data; setChanged(); } notifyObservers(); } }
main方法函數
public class ObserverMain { public static void main(String[] args) { //建立被觀察者子類對象 Publish publish = new Publish(); //建立觀察者對象,並註冊進被觀察者子類中 new Subscribe(publish); //被觀察者發生變化 publish.setData("開始"); } }
運行結果測試
收到通知開始ui
這是一個相對簡單的樣例,通常咱們會使用觀察者模式來進行MQ消息隊列的發送。
以RabbitMQ爲例,有一個門店的隊列,交換機
/** * rabbitmq配置 * */ @Configuration public class RabbitmqConfig { public static final String STORE_QUEUE = "store"; @Bean public Queue storeQueue() { return new Queue(STORE_QUEUE); } @Bean public TopicExchange providerExchange() { return new TopicExchange(ServiceProviderCenterMq.MQ_EXCHANGE_PROVIDER); } @Bean public Binding bingingStoreToProvider(){ return BindingBuilder.bind(storeQueue()).to(providerExchange()) .with(ServiceProviderCenterMq.ROUTING_KEY_ROLE_ADD); } }
重寫消息生產者
@Slf4j @Component public class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void send(String exchange,String routingKey,Object content) { log.info("send content=" + content); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content)); } /** * 確認後回調: * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.info("send ack fail, cause = " + cause); } else { log.info("send ack success"); } } /** * 失敗後return回調: * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); } /** * 對消息對象進行二進制序列化 * @param o * @return */ private byte[] serialize(Object o) { Kryo kryo = new Kryo(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); Output output = new Output(stream); kryo.writeObject(output, o); output.close(); return stream.toByteArray(); } }
咱們的觀察者類,對門店服務的服務集合新增服務對象進行觀察
/** * 服務新增觀察者 */ public class ServiceObserver implements Observer { private MessageSender sender = SpringBootUtil.getBean(MessageSender.class); public ServiceObserver(Observable o) { o.addObserver(this); } @Override public void update(Observable o, Object arg) { CompletableFuture.runAsync(() -> this.sender.send(ServiceProviderCenterMq.MQ_EXCHANGE_PROVIDER, ServiceProviderCenterMq.ROUTING_KEY_ROLE_ADD, ((ProviderServiceLevel) o).getServiceProviders().poll()) ); } }
具體的被觀察者類爲門店服務的分類
public class ProviderServiceLevel extends Observable implements Provider
它有一個服務的隊列
@Getter private Queue<Provider> serviceProviders = new ConcurrentLinkedQueue<>();
也有一個服務的列表
@Getter private List<Provider> serviceListProviders = new CopyOnWriteArrayList<>();
服務分類添加服務對象的方法,你們能夠思考一下爲何使用隊列,而不是直接使用列表在觀察者中取出服務對象
@Override public boolean addProvider(Provider provider) { ServiceDao serviceDao = SpringBootUtil.getBean(ServiceDao.class); //若是添加的是服務分類 if (provider instanceof ProviderServiceLevel) { ParamLevel paramLevel = new ParamLevel(this.id, ((ProviderServiceLevel) provider).getId()); ((ProviderServiceLevel) provider).setLevel(2); serviceDao.addLevelToLevel(paramLevel); //若是添加的是服務 }else if (provider instanceof ProviderService) { ParamLevel paramLevel = new ParamLevel(this.id, ((ProviderService) provider).getService().getId()); serviceDao.addServiceToLevel(paramLevel); this.serviceListProviders.add(provider); //將添加的服務入隊列 this.serviceProviders.add(provider); setChanged(); //通知觀察者取出隊列服務對象,進行MQ的發送 notifyObservers(); return true; } return this.serviceListProviders.add(provider); }
最後在訪問的controller中會放三個緩存
//服務分類緩存 private Map<Long,Provider> cacheLevelProvider = new ConcurrentHashMap<>(); //門店緩存 private Map<Long,Provider> cacheStoreProvider = new ConcurrentHashMap<>(); //觀察者緩存 private Map<Long,ServiceObserver> observerMap = new ConcurrentHashMap<>();
具體的添加服務方法以下
/** * 新增商家服務 * @param providerService * @return */ @Transactional @SuppressWarnings("uncheck") @PostMapping("/serviceprovider-anon/newproviderservice") public Result<String> newProviderService(@RequestParam("storeid") Long storeId, @RequestParam("servicelevelid") Long serviceLevelId, @RequestBody ProviderService providerService) { Provider serviceLevelProvider; Provider service = ProviderFactory.createProviderService(providerService, true); if (cacheLevelProvider.containsKey(serviceLevelId)) { serviceLevelProvider = this.cacheLevelProvider.get(serviceLevelId); }else { serviceLevelProvider = this.levelProvider.findProvider(serviceLevelId); this.cacheLevelProvider.put(serviceLevelId,serviceLevelProvider); } //此處若是不將觀察者對象加入緩存,就會不斷建立觀察者,其實咱們只須要一個觀察者 if (!observerMap.containsKey(serviceLevelId)) { observerMap.put(serviceLevelId,new ServiceObserver((ProviderServiceLevel) serviceLevelProvider)); } //服務分類添加了服務對象,就會發生被觀察者狀態的改變,使觀察者作出響應 serviceLevelProvider.addProvider(service); Provider storeProvider; if (cacheStoreProvider.containsKey(storeId)) { storeProvider = cacheStoreProvider.get(storeId); }else { storeProvider = this.storeProvider.findProvider(storeId); cacheStoreProvider.put(storeId,storeProvider); } storeProvider.addProvider(service); return Result.success("添加商家服務成功"); }