【分佈式事務】spring cloud集成lcn解決分佈式事務

參考地址:https://blog.csdn.net/u010882691/article/details/82256587html

參考地址:http://www.javashuo.com/article/p-foggtcdt-mr.htmljava

參考地址:https://blog.csdn.net/small_to_large/article/details/77836672 Spring Cloud Ribbon和Spring Cloud Feignnode

參考地址:http://www.javashuo.com/article/p-nucprzrj-bg.htmlgit

 

事務等級:https://blog.csdn.net/gududedabai/article/details/82993700github

目前,在Spring cloud 中服務之間經過restful方式調用有兩種方式 
- restTemplate+Ribbon 
- feignweb

從實踐上看,採用feign的方式更優雅(feign內部也使用了ribbon作負載均衡)。redis

zuul也有負載均衡的功能,它是針對外部請求作負載,那客戶端ribbon的負載均衡又是怎麼一回事?spring

客戶端ribbon的負載均衡,解決的是服務發起方(在Eureka註冊的服務)對被調用的服務的負載,好比咱們查詢商品服務要調用顯示庫存和商品明細服務,經過商品服務的接口將兩個服務組合,能夠減小外部應用的請求,好比手機App發起一次請求便可,能夠節省網絡帶寬,也更省電。json

ribbon是對服務之間調用作負載,是服務之間的負載均衡,zuul是能夠對外部請求作負載均衡。

api

參考地址:https://blog.csdn.net/jrn1012/article/details/77837658/

由於LCN實現分佈式事務的回滾,須要在服務內部 微服務之間的 負載均衡的 請求操做,故而須要在配置文件中加上ribbon的相關配置,它不與使用feign衝突!!!

 

lcn使用spring boot2.0 報錯解決方案:https://www.jianshu.com/p/453741e0f28f

lcn集成到本身到本身的spring cloud項目中:http://www.javashuo.com/article/p-pfdcwsbl-mt.html

 

參考使用步驟1:

https://m.wang1314.com/doc/webapp/topic/20308073.html

 

 修改LCN ,集成spring boot2.0

注意:LCN 4.1.0版本 目前不支持spring boot 2.x的版本,因此須要進行更改!!

【【由於我已經更改完成,打包了jar了,jar能夠在百度網盤下載,而後直接走這一步的上傳第三方jar包到本地maven倉庫,而後在項目中直接引用便可】】

第一步:

先在lcn官網【http://www.txlcn.org/】 找到GitHub 地址【https://github.com/codingapi/tx-lcn】,拷下全部的源碼

 

 

第二步:

解壓下載的zip,放置在一個目錄下,用IDEA打開【注意打開父層項目】

 

 導入完整的jar包,而後下面就要開始更改源碼中不支持spring boot 2.X的部分

 

第三步:

修改

transaction-springcloud 項目下com.codingapi.tx.springcloud.listener包中的ServerListener.java

源碼更改成:

package com.codingapi.tx.springcloud.listener;

import com.codingapi.tx.listener.service.InitService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * @Component註解會自動掃描配置文件中的server.port值
 */
@Component
public class ServerListener implements ApplicationListener<ApplicationEvent> {

    private Logger logger = LoggerFactory.getLogger(ServerListener.class);

    private int serverPort;

    @Value("${server.port}")
    private String port;

    @Autowired
    private InitService initService;

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        //        logger.info("onApplicationEvent -> onApplicationEvent. "+event.getEmbeddedServletContainer());
        //        this.serverPort = event.getEmbeddedServletContainer().getPort();
        //TODO Spring boot 2.0.0沒有EmbeddedServletContainerInitializedEvent 此處寫死;modify by young
        this.serverPort = Integer.parseInt(port);

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                // 若鏈接不上txmanager start()方法將阻塞
                initService.start();
            }
        });
        thread.setName("TxInit-thread");
        thread.start();
    }

    public int getPort() {
        return this.serverPort;
    }

    public void setServerPort(int serverPort) {
        this.serverPort = serverPort;
    }
}
View Code

 

 第四步:

修改tx-manager項目下com.codingapi.tm.listener包中的ApplicationStartListener.java

package com.codingapi.tm.listener;

import com.codingapi.tm.Constants;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

import java.net.InetAddress;
import java.net.UnknownHostException;

@Component
public class ApplicationStartListener implements ApplicationListener<ApplicationEvent> {


    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        //TODO Spring boot 2.0.0沒有EmbeddedServletContainerInitializedEvent 此處寫死;modify by young
//        int serverPort = event.getEmbeddedServletContainer().getPort();
        String ip = getIp();
        Constants.address = ip+":48888";//寫死端口號,反正TxManager端口也是配置文件配好的(●′ω`●)
    }

    private String getIp(){
        String host = null;
        try {
            host = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return host;
    }
}
View Code

 

 

第五步:

修改

tx-manager項目下com.codingapi.tm.manager.service.impl包中MicroServiceImpl.java類的getState()方法

package com.codingapi.tm.manager.service.impl;

import com.codingapi.tm.Constants;
import com.codingapi.tm.config.ConfigReader;
import com.codingapi.tm.framework.utils.SocketManager;
import com.codingapi.tm.manager.service.MicroService;
import com.codingapi.tm.model.TxServer;
import com.codingapi.tm.model.TxState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * create by lorne on 2017/11/11
 */
@Service
public class MicroServiceImpl implements MicroService {


    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private ConfigReader configReader;


    @Autowired
    private DiscoveryClient discoveryClient;



    private boolean isIp(String ipAddress) {
        String ip = "([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}";
        Pattern pattern = Pattern.compile(ip);
        Matcher matcher = pattern.matcher(ipAddress);
        return matcher.matches();
    }



    @Override
    public TxState getState() {
        TxState state = new TxState();
        String ipAddress = "";
        //TODO Spring boot 2.0.0沒有discoveryClient.getLocalServiceInstance() 用InetAddress獲取host;modify by young
        //String ipAddress = discoveryClient.getLocalServiceInstance().getHost();
        try {
            ipAddress = InetAddress.getLocalHost().getHostAddress();
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (!isIp(ipAddress)) {
            ipAddress = "127.0.0.1";
        }
        state.setIp(ipAddress);
        state.setPort(Constants.socketPort);
        state.setMaxConnection(SocketManager.getInstance().getMaxConnection());
        state.setNowConnection(SocketManager.getInstance().getNowConnection());
        state.setRedisSaveMaxTime(configReader.getRedisSaveMaxTime());
        state.setTransactionNettyDelayTime(configReader.getTransactionNettyDelayTime());
        state.setTransactionNettyHeartTime(configReader.getTransactionNettyHeartTime());
        state.setNotifyUrl(configReader.getCompensateNotifyUrl());
        state.setCompensate(configReader.isCompensateAuto());
        state.setCompensateTryTime(configReader.getCompensateTryTime());
        state.setCompensateMaxWaitTime(configReader.getCompensateMaxWaitTime());
        state.setSlbList(getServices());
        return state;
    }


    private List<String> getServices(){
        List<String> urls = new ArrayList<>();
        List<ServiceInstance>  serviceInstances = discoveryClient.getInstances(tmKey);
        for (ServiceInstance instanceInfo : serviceInstances) {
            urls.add(instanceInfo.getUri().toASCIIString());
        }
        return urls;
    }

    @Override
    public TxServer getServer() {
        List<String> urls= getServices();
        List<TxState> states = new ArrayList<>();
        for(String url:urls){
            try {
                TxState state = restTemplate.getForObject(url + "/tx/manager/state", TxState.class);
                states.add(state);
            } catch (Exception e) {
            }

        }
        if(states.size()<=1) {
            TxState state = getState();
            if (state.getMaxConnection() > state.getNowConnection()) {
                return TxServer.format(state);
            } else {
                return null;
            }
        }else{
            //找默認數據
            TxState state = getDefault(states,0);
            if (state == null) {
                //沒有知足的默認數據
                return null;
            }
            return TxServer.format(state);
        }
    }

    private TxState getDefault(List<TxState> states, int index) {
        TxState state = states.get(index);
        if (state.getMaxConnection() == state.getNowConnection()) {
            index++;
            if (states.size() - 1 >= index) {
                return getDefault(states, index);
            } else {
                return null;
            }
        } else {
            return state;
        }
    }

}
View Code

 

 

第六步:

 修改

tx-client下的com.codingapi.tx.aop.service.impl下的TransactionServerFactoryServiceImpl.java

 

修改這一截代碼:

//分佈式事務已經開啓,業務進行中 **/
        if (info.getTxTransactionLocal() != null || StringUtils.isNotEmpty(info.getTxGroupId())) {
            //檢查socket通信是否正常 (第一次執行時啓動txRunningTransactionServer的業務處理控制,而後嵌套調用其餘事務的業務方法時都併到txInServiceTransactionServer業務處理下)
            if (SocketManager.getInstance().isNetState()) {
                if (info.getTxTransactionLocal() != null) {
                    return txDefaultTransactionServer;
                } else {
//                    if(transactionControl.isNoTransactionOperation() // 表示整個應用沒有獲取過DB鏈接
//                        || info.getTransaction().readOnly()) { //無事務業務的操做
//                        return txRunningNoTransactionServer;
//                    }else {
//                        return txRunningTransactionServer;
//                    }
                    if(!transactionControl.isNoTransactionOperation()) { //TODO 有事務業務的操做 
                        return txRunningTransactionServer;
                    }else {
                        return txRunningNoTransactionServer;
                    }
                }
            } else {
                logger.warn("tx-manager not connected.");
                return txDefaultTransactionServer;
            }
        }
        //分佈式事務處理邏輯*結束***********/
View Code

 

 

第七步:

如今都更改完成了,而後須要將全部的項目打包,注意我將本組項目中全部pom文件中全部的的4.2.0-SNAPSHOT 都更改爲了4.2.0,注意  是全部

改爲了

 

而後點擊右側maven插件對每個ms挨個進行打包【打包可能報錯,解決方案:http://www.javashuo.com/article/p-zjpiyyqs-by.html   http://www.javashuo.com/article/p-dnoesbhv-r.html

 

 

按照報錯後的兩個解決方案,進行打包完成後,能夠看到

 

這些ms都已經打包完成了

 

 

第八步:

最後須要將全部修改完成的打包好的jar上傳到本身本地的maven倉庫中 【操做地址:http://www.javashuo.com/article/p-qmvjxlam-e.html 最下方能夠進行第三方jar包上傳到本身的maven倉庫中】

【jar能夠在百度網盤下載,而後直接走這一步的上傳第三方jar包到本地maven倉庫,而後在項目中直接引用便可】【最新jar的使用參見http://www.javashuo.com/article/p-qmvjxlam-e.html最下方】

 【這裏把上傳第三方jar到本地倉庫的命令給出來,也就是這兩個jar】

mvn deploy:deploy-file -DgroupId=com.codingapi -DartifactId=transaction-springcloud -Dversion=4.2.0 -Dpackaging=jar -Dfile=D:\document\IdeaProjects\myTestDocument\jar\transaction-springcloud-4.2.0.jar -Durl=http://localhost:8081/repository/myself_hosted/ -DrepositoryId=myself_hosted


mvn deploy:deploy-file -DgroupId=com.codingapi -DartifactId=tx-plugins-db -Dversion=4.2.0 -Dpackaging=jar -Dfile=D:\document\IdeaProjects\myTestDocument\jar\tx-plugins-db-4.2.0.jar -Durl=http://localhost:8081/repository/myself_hosted/ -DrepositoryId=myself_hosted

 

上傳完了之後的位置以下:

 

 

注意 這裏的版本都修改爲了4.2.0

因此在引用的時候,在服務中引用的版本是4.2.0

 

 此次啓動引用了這兩個jar的spring boot2.0的微服務,就能夠成功了 

 

 

 

將tx-Manager服務加入項目組,並啓用【此時是eureka服務已經啓動的狀況下了,也就是說,微服務組引入修改之後的LCN jar依賴,已經能夠成功啓動的狀況下】

1.同上面的第一步同樣,進入官網,進入GitHub,拷貝全部源碼

先在lcn官網【http://www.txlcn.org/】 找到GitHub 地址【https://github.com/codingapi/tx-lcn】,拷下全部的源碼

 

2.解壓縮,取出tx-manager服務,拷貝至項目組根目錄下

 

 3.將tx-Manager修改成本微服務組能夠識別的子模塊module

導入更新

 

4.更改tx-manager的application.properties,修改eureka的配置和redis的相關配置

#######################################txmanager-start#################################################
#服務端口
server.port=7000

#tx-manager不得修改
spring.application.name=tx-manager

spring.mvc.static-path-pattern=/**
spring.resources.static-locations=classpath:/static/
#######################################txmanager-end#################################################


#zookeeper地址
#spring.cloud.zookeeper.connect-string=127.0.0.1:2181
#spring.cloud.zookeeper.discovery.preferIpAddress = true

#eureka 地址
eureka.client.service-url.defaultZone=http://127.0.0.1:8000/eureka/
eureka.instance.prefer-ip-address=true

#######################################redis-start#################################################
#redis 配置文件,根據狀況選擇集羣或者單機模式

##redis 集羣環境配置
##redis cluster
#spring.redis.cluster.nodes=127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003
#spring.redis.cluster.commandTimeout=5000

##redis 單點環境配置
#redis
#redis主機地址
spring.redis.host=127.0.0.1
#redis主機端口
spring.redis.port=6379
#redis連接密碼
spring.redis.password=
spring.redis.pool.maxActive=10
spring.redis.pool.maxWait=-1
spring.redis.pool.maxIdle=5
spring.redis.pool.minIdle=0
spring.redis.timeout=0
#####################################redis-end###################################################




#######################################LCN-start#################################################
#業務模塊與TxManager之間通信的最大等待時間(單位:秒)
#通信時間是指:發起方與響應方之間完成一次的通信時間。
#該字段表明的是Tx-Client模塊與TxManager模塊之間的最大通信時間,超過該時間未響應本次請求失敗。
tm.transaction.netty.delaytime = 5

#業務模塊與TxManager之間通信的心跳時間(單位:秒)
tm.transaction.netty.hearttime = 15

#存儲到redis下的數據最大保存時間(單位:秒)
#該字段僅表明的事務模塊數據的最大保存時間,補償數據會永久保存。
tm.redis.savemaxtime=30

#socket server Socket對外服務端口
#TxManager的LCN協議的端口
tm.socket.port=9999

#最大socket鏈接數
#TxManager最大容許的創建鏈接數量
tm.socket.maxconnection=100

#事務自動補償 (true:開啓,false:關閉)
# 說明:
# 開啓自動補償之後,必需要配置 tm.compensate.notifyUrl 地址,僅當tm.compensate.notifyUrl 在請求補償確認時返回success或者SUCCESS時,纔會執行自動補償,不然不會自動補償。
# 關閉自動補償,當出現數據時也會 tm.compensate.notifyUrl 地址。
# 當tm.compensate.notifyUrl 無效時,不影響TxManager運行,僅會影響自動補償。
tm.compensate.auto=false

#事務補償記錄回調地址(rest api 地址,post json格式)
#請求補償是在開啓自動補償時纔會請求的地址。請求分爲兩種:1.補償決策,2.補償結果通知,可經過經過action參數區分compensate爲補償請求、notify爲補償通知。
#*注意當請求補償決策時,須要補償服務返回"SUCCESS"字符串之後才能夠執行自動補償。
#請求補償結果通知則只須要接受通知便可。
#請求補償的樣例數據格式:
#{"groupId":"TtQxTwJP","action":"compensate","json":"{\"address\":\"133.133.5.100:8081\",\"className\":\"com.example.demo.service.impl.DemoServiceImpl\",\"currentTime\":1511356150413,\"data\":\"C5IBLWNvbS5leGFtcGxlLmRlbW8uc2VydmljZS5pbXBsLkRlbW9TZXJ2aWNlSW1wbAwSBHNhdmUbehBqYXZhLmxhbmcuT2JqZWN0GAAQARwjeg9qYXZhLmxhbmcuQ2xhc3MYABABJCo/cHVibGljIGludCBjb20uZXhhbXBsZS5kZW1vLnNlcnZpY2UuaW1wbC5EZW1vU2VydmljZUltcGwuc2F2ZSgp\",\"groupId\":\"TtQxTwJP\",\"methodStr\":\"public int com.example.demo.service.impl.DemoServiceImpl.save()\",\"model\":\"demo1\",\"state\":0,\"time\":36,\"txGroup\":{\"groupId\":\"TtQxTwJP\",\"hasOver\":1,\"isCompensate\":0,\"list\":[{\"address\":\"133.133.5.100:8899\",\"isCompensate\":0,\"isGroup\":0,\"kid\":\"wnlEJoSl\",\"methodStr\":\"public int com.example.demo.service.impl.DemoServiceImpl.save()\",\"model\":\"demo2\",\"modelIpAddress\":\"133.133.5.100:8082\",\"channelAddress\":\"/133.133.5.100:64153\",\"notify\":1,\"uniqueKey\":\"bc13881a5d2ab2ace89ae5d34d608447\"}],\"nowTime\":0,\"startTime\":1511356150379,\"state\":1},\"uniqueKey\":\"be6eea31e382f1f0878d07cef319e4d7\"}"}
#請求補償的返回數據樣例數據格式:
#SUCCESS
#請求補償結果通知的樣例數據格式:
#{"resState":true,"groupId":"TtQxTwJP","action":"notify"}
tm.compensate.notifyUrl=http://ip:port/path

#補償失敗,再次嘗試間隔(秒),最大嘗試次數3次,當超過3次即爲補償失敗,失敗的數據依舊還會存在TxManager下。
tm.compensate.tryTime=30

#各事務模塊自動補償的時間上限(毫秒)
#指的是模塊執行自動超時的最大時間,該最大時間若過段會致使事務機制異常,該時間必需要模塊之間通信的最大超過期間。
#例如,若模塊A與模塊B,請求超時的最大時間是5秒,則建議改時間至少大於5秒。
tm.compensate.maxWaitTime=5000
#######################################LCN-end#################################################




logging.level.com.codingapi=debug
View Code

 

5.啓動啓動類,便可訪問tx-manager主頁面

地址:http://localhost:7000/

 

========

相關文章
相關標籤/搜索