手寫MQ框架(三)-客戶端實現

1、背景

書接手寫MQ框架(二)-服務端實現  ,前面介紹了服務端的實現。可是具體使用框架過程當中,用戶確定是以客戶端的形式跟服務端打交道的。客戶端的好壞直接影響了框架使用的便利性。html

雖然框架目前是經過web的形式提供功能的,可是某的目標實際上是經過socket實現,因此不只須要有客戶端,還要包裝一下,讓用戶在使用過程當中不須要關心服務端是如何實現的。java

簡單來講,就是客戶端使用必須方便。git

2、客戶端實現

一、HttpUtil

目前客戶端的核心功能是HttpUtil這個類,使用httpClient實現的,主要是爲了請求服務端。web

具體實現以下:數據庫

package com.shuimutong.gmq.client.util;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.shuimutong.gmq.client.bean.HttpResponseBean;
import com.shuimutong.gutil.common.GUtilCommonUtil;

/**
 * http請求工具類
 * @ClassName: HttpUtil
 * @Description:(這裏用一句話描述這個類的做用)
 * @author: 水木桶
 * @date: 2019年10月29日 下午9:43:54
 * @Copyright: 2019 [水木桶] All rights reserved.
 */
public class HttpUtil {
    private final static Logger log = LoggerFactory.getLogger(HttpUtil.class);
    private static CloseableHttpClient HTTP_CLIENT = HttpClients.createMinimal();
    static {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                try {
                    HTTP_CLIENT.close();
                } catch (IOException e) {
                    log.error("HTTP_CLIENT-closeException", e);
                }
            }
        });
    }

    /**
     * get請求
     * 
     * @param url
     * @return
     * @throws IOException
     */
    public static HttpResponseBean get(String url) throws IOException {
        HttpResponseBean responseBean = null;
        HttpGet httpGet = new HttpGet(url);
        CloseableHttpResponse res = HTTP_CLIENT.execute(httpGet);
        try {
            HttpEntity httpEntity = res.getEntity();
            String body = EntityUtils.toString(httpEntity);
            responseBean = new HttpResponseBean(res.getStatusLine(), body);
            EntityUtils.consume(httpEntity);
        } finally {
            res.close();
        }
        return responseBean;
    }
    
    /**
     * 帶參數的get請求
     * @param url
     * @param requsetParams
     * @return
     * @throws IOException
     * @throws URISyntaxException
     */
    public static HttpResponseBean get(String url, Map<String, String> requsetParams) throws IOException {
        HttpResponseBean responseBean = null;
        HttpGet httpGet;
        try {
            URIBuilder uriBuilder = new URIBuilder(url);
            if(!GUtilCommonUtil.checkListEmpty(requsetParams)) {
                List<NameValuePair> nvps = new ArrayList<NameValuePair>();
                requsetParams.forEach((k,v) -> {
                    nvps.add(new BasicNameValuePair(k, v));
                });
                uriBuilder.setParameters(nvps);
            }
            httpGet = new HttpGet(uriBuilder.build());
        } catch (Exception e) {
            throw new IOException(e);
        }
        CloseableHttpResponse res = HTTP_CLIENT.execute(httpGet);
        try {
            HttpEntity httpEntity = res.getEntity();
            String body = EntityUtils.toString(httpEntity);
            responseBean = new HttpResponseBean(res.getStatusLine(), body);
            EntityUtils.consume(httpEntity);
        } finally {
            res.close();
        }
        return responseBean;
    }

    /**
     * post請求
     * @param url
     * @param requsetParams
     * @return
     * @throws IOException
     */
    public static HttpResponseBean post(String url, Map<String, String> requsetParams) throws IOException {
        HttpResponseBean responseBean = null;
        HttpPost httpPost = new HttpPost(url);
        if(!GUtilCommonUtil.checkListEmpty(requsetParams)) {
            List<NameValuePair> nvps = new ArrayList<NameValuePair>();
            requsetParams.forEach((k,v) -> {
                nvps.add(new BasicNameValuePair(k, v));
            });
            httpPost.setEntity(new UrlEncodedFormEntity(nvps));
        }
        CloseableHttpResponse response = HTTP_CLIENT.execute(httpPost);
        try {
            HttpEntity httpEntity = response.getEntity();
            String body = EntityUtils.toString(httpEntity);
            responseBean = new HttpResponseBean(response.getStatusLine(), body);
            EntityUtils.consume(httpEntity);
        } finally {
            response.close();
        }
        return responseBean;
    }
}

 

封裝了get請求和post請求,封裝了響應結果。apache

加了一個鉤子,在jvm關閉時可以主動關閉建立的資源。緩存

二、訂閱消息、生產消息

這兩部分主要就是調用上面的HttpUtil,而後將結果包裝一下。mvc

具體代碼請參考前文的git。框架

三、實例管理

爲了使得用戶不須要關心具體實現,因此建了實例管理類。jvm

package com.shuimutong.gmq.client.util;

import com.shuimutong.gmq.client.cache.CommonObjCache;
import com.shuimutong.gmq.client.cache.impl.CommonObjCacheImpl;
import com.shuimutong.gmq.client.consumer.GmqConsumer;
import com.shuimutong.gmq.client.producer.GmqProducer;

public class GmqInstanceManage {
    public static GmqProducer getGmqProducer(String gmqServerUrl) {
        return new GmqProducer(gmqServerUrl);
    }
    
    public static GmqConsumer getGmqConsumer(String gmqServerUrl) {
        return new GmqConsumer(gmqServerUrl);
    }
    
    public static CommonObjCache getCommonCache(String serverUrl) {
        return new CommonObjCacheImpl(serverUrl);
    }
}

 

主要是爲了封裝變化。由於以後再迭代的話,實例的具體實現確定不是目前這麼簡單,因此要儘可能讓使用者少關心具體實現。

使用時關心的越多,後續項目迭代確定越困難。

3、使用示例

一、生產消息

@Test
    public void produceMsg() {
        GmqProducer producer = GmqInstanceManage.getGmqProducer(gmqServerUrl);
        for(int i=0; i<5; i++) {
            String message = "message:" + i;
            try {
                SendMqResult res = producer.sendMq(topic, message);
                System.out.println(res.getRes());
            } catch (SendMqException e) {
                e.printStackTrace();
            }
        }
    }

 

二、消費消息

主要思路是:消費消息以前,先查詢當前已經消費到了哪條消息。消息消費以後,將消費的編號存入緩存。

典型的主動拉消息,消息是否消費由本身負責的模式。

實現以下:

@Test
    public void comsumerMsgByCache() {
        GmqConsumer comsumer = GmqInstanceManage.getGmqConsumer(gmqServerUrl);
        CommonObjCache commonCache = GmqInstanceManage.getCommonCache(gmqServerUrl);
        String gmqSign = "gmq_consumer_id";
        long consumerId = 0;
        int size = 2;
        for(int i=0; i<5; i++) {
            try {
                CacheObj cacheId = commonCache.getById(gmqSign);
                if(cacheId != null) {
                    consumerId = Long.parseLong(cacheId.getContent());
                }
                
                List<MqContent> res = comsumer.getMq(topic, consumerId, size);
                for(MqContent mq : res) {
                    System.out.println(JSONObject.toJSONString(mq));
                    if(mq.getId() > consumerId) {
                        consumerId = mq.getId();
                    }
                }
                commonCache.save(gmqSign, String.valueOf(consumerId));
                System.out.println("保存consumerId:" + consumerId);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

 

4、總結

gmq的第一版至今已經完成,固然這只是開始。

後續計劃先將gmvc框架替換掉,直接使用netty進行通訊。

而後把消息存到數據庫改成存到磁盤上。

而後就是服務的高可用改造。

屆時歡迎指導。

第2版設計、開發中……

相關文章
相關標籤/搜索