mysql驅動協議之loadbalance和replication

背景

偶然下和朋友聊到了mysql多節點集羣架場景,對應咱們各系代碼如何去使用它,牽扯到mysql的驅動包中已經支持的策略。發現這塊的概念有些模糊,索性就梳理了一番留着後用。重點是:replication\loadbalance,以mysql-connector-java:5.1.38爲例展開。html

官方java驅動文檔:Multi-Host Connectionsjava

java connector

Mysql的multi-connection策略,官方叫server failover。replication和loadbalance協議都是創建在failover基礎上演變而來對應mysql不一樣的部署架構:Cluster架構和Replication架構,這兩個再也不展開。mysql

jdbc:mysql://[primary host][:port],[secondary host 1][:port][,[secondary host 2][:port]]...[/[database]]»
[?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]

官方URL格式:primary host表明主庫首選鏈接,secondary host表明從庫,依次排列嘗試鏈接同時支持不少配置屬性(詳細去看文檔):linux

  • failOverReadOnly
  • secondsBeforeRetryMaster
  • queriesBeforeRetryMaster
  • retriesAllDown
  • autoReconnect
  • autoReconnectForPools

在驅動包中類NonRegisteringDriver.java定義了mysql支持的多種鏈接協議,能夠看出有四種方式。mxj是指mysql會根據不一樣的系統平臺切換不一樣的mysqld binary執行,支持macos、linux、window、Solaris等等,replicaiton和loadbalance後面會介紹。sql

private static final String REPLICATION_URL_PREFIX = "jdbc:mysql:replication://";
    private static final String URL_PREFIX = "jdbc:mysql://";
    private static final String MXJ_URL_PREFIX = "jdbc:mysql:mxj://";
    public static final String LOADBALANCE_URL_PREFIX = "jdbc:mysql:loadbalance://";

咱們根據Mysql支持的驅動類型也能夠看出一共三個實現類,常規使用的com.mysql.jdbc.Driver,主從庫模式下采用com.mysql.jdbc.ReplicationDriver,還有種爲兼容 MM.MySQL實現的org.djt.mm.mysql.Driver。數據庫

dirver-architecture

觸發的場景條件

當如下三個場景中會出發loadbalance策略執行:macos

  1. 一個事務提交或者回滾時;
  2. 以08開頭的sql state 通訊異常發生時;
  3. 自定義模式匹配條件時;一個SQLException匹配到mysql支持的loadBalanceSQLStateFailover,loadBalanceSQLExceptionSubclassFailover ,loadBalanceExceptionChecker 三個方案自定義的conditions時;

協議:replication

jdbc.diver= com.mysql.jdbc.ReplicationDriver
jdbc.url= jdbc:mysql:replication://127.0.0.1:3306,11.12.3.44:3306/mydb?autoReconnect=false服務器

具體格式相似failover,比較大的變化是第一個host爲master庫是write/read模式,後面都是slave庫是read模式,也是支持參數進行配置官網有詳細介紹。架構

jdbc\:mysql\:replication\://\[master host]\[:port],\[slave host 1]\[:port]\[,\[slave host 2][:port]]...[/[database]] »
[?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]

replication協議是創建在failover和loadbalance基礎上,適應Replication架構須要爲解決讀寫分離、負載均衡場景的。在事務read only模式下請求會被轉向到slave host,若果多個slave狀況下采用round-robin(輪詢)策略。對於非read only請求(write/read)都將轉向到master host。5.1.27後版本支持多個master,多個master下采用load balance策略,具體參考loadbalance協議介紹。5.1.28版本後又支持動態添加節點,也就是程序運行是動態添加新的host到URL中而不須要重啓服務器,咱們常常會聊的動態數據源場景。負載均衡

Read only這裏是與數據庫創建的connection屬性,如Connection.setReadOnly(false)、Connection.setReadOnly(true)。


協議:loadbalance

jdbc.dirver= com.mysql.jdbc.Driver
jdbc.url= jdbc:mysql:loadbalance://127.0.0.1:3306,11.12.3.44:3306/mydb?autoReconnect=false&loadBalanceStrategy=bestResponseTime

jdbc:mysql:loadbalance://[host1][:port],[host2][:port][,[host3][:port]]...[/[database]] »
[?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]

格式同failover\replication相似,全部host沒有主次之分都是平級,也支持參數控制:

  • loadBalanceConnectionGroup:自定義分組名,能夠經過com.mysql.jdbc.jmx.LoadBalanceConnectionGroupManagerMBean類的方法控制,如:查看激活host數量、移除host、添加host等等操做。
  • loadBalanceEnableJMX:啓動jmx監控,經過程序啓動添加jvm參數 -Dcom.sun.management.jmxremote,經過jvm工具jconsole等能夠查看。

BalanceStrategy.java

public interface BalanceStrategy extends Extension {

public abstract ConnectionImpl pickConnection(LoadBalancedConnectionProxy proxy, List<String> configuredHosts, Map<String, ConnectionImpl> liveConnections,long[] responseTimes, int numRetries) throws SQLException;
}

loadbalance負載均衡策略定義了BalanceStrategy接口,mysql支持已經實現該接口的策略有:BestResponseTimeBalanceStrategy、RandomBalanceStrategy(默認)、SequentialBalanceStrategy。

經過源碼 Driver -> NonRegisteringDriver.connect(xx) -> NonRegisteringDriver.connectLoadBalanced() -> LoadBalancedConnectionProxy.createProxyInstance() 能夠看到默認採用策略是 RandomBalanceStrategy。

  • RandomBalanceStrategy:隨機選中一個host。
  • BestResponseTimeBalanceStrategy:選中事務響應最快的host。
  • SequentialBalanceStrategy:第一次隨機以後順序選後一個至循環往復。

RandomBalanceStrategy.java

int random = (int) Math.floor((Math.random() * whiteList.size()));
String hostPortSpec = whiteList.get(random);
ConnectionImpl conn = liveConnections.get(hostPortSpec);

BestResponseTimeBalanceStrategy.pickConnection(..)

for (int i = 0; i < responseTimes.length; i++) {
    long candidateResponseTime = responseTimes[i];
    if (candidateResponseTime < minResponseTime && !blackList.containsKey(configuredHosts.get(i))) {
        if (candidateResponseTime == 0) {
            bestHostIndex = i;
            break;
        }
        bestHostIndex = i;
        minResponseTime = candidateResponseTime;
    }
}

SequentialBalanceStrategy.pickConnection(..)

if (numHosts == 1) {
    this.currentHostIndex = 0; // pathological case
} else if (this.currentHostIndex == -1) {
    int random = (int) Math.floor((Math.random() * numHosts));

    for (int i = random; i < numHosts; i++) {
        if (!blackList.containsKey(configuredHosts.get(i))) {
            this.currentHostIndex = i;
            break;
        }
    }

    if (this.currentHostIndex == -1) {
        for (int i = 0; i < random; i++) {
            if (!blackList.containsKey(configuredHosts.get(i))) {
                this.currentHostIndex = i;
                break;
            }
        }
    }

    //...
} else {

    int i = this.currentHostIndex + 1;
    boolean foundGoodHost = false;

    for (; i < numHosts; i++) {
        if (!blackList.containsKey(configuredHosts.get(i))) {
            this.currentHostIndex = i;
            foundGoodHost = true;
            break;
        }
    }

    if (!foundGoodHost) {
        for (i = 0; i < this.currentHostIndex; i++) {
            if (!blackList.containsKey(configuredHosts.get(i))) {
                this.currentHostIndex = i;
                foundGoodHost = true;
                break;
            }
        }
    }
    //...
}

NonRegisteringDriver.java

public class NonRegisteringDriver implements java.sql.Driver {
	if (StringUtils.startsWithIgnoreCase(url, LOADBALANCE_URL_PREFIX)) {
		return connectLoadBalanced(url, info);
	} else if (StringUtils.startsWithIgnoreCase(url, REPLICATION_URL_PREFIX)) {
		return connectReplicationConnection(url, info);
	}
}

LoadBalancedConnectionProxy.java

public class LoadBalancedConnectionProxy extends MultiHostConnectionProxy implements PingTarget {
/**
     * Creates a proxy for java.sql.Connection that routes requests between the given list of host:port and uses the given properties when creating connections.
     * 
     * @param hosts
     *            The list of the hosts to load balance.
     * @param props
     *            Connection properties from where to get initial settings and to be used in new connections.
     * @throws SQLException
     */
    private LoadBalancedConnectionProxy(List<String> hosts, Properties props) throws SQLException {
        super();

        String group = props.getProperty("loadBalanceConnectionGroup", null);
        boolean enableJMX = false;
        String enableJMXAsString = props.getProperty("loadBalanceEnableJMX", "false");
        try {
            enableJMX = Boolean.parseBoolean(enableJMXAsString);
        } catch (Exception e) {
            throw SQLError.createSQLException(
                    Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceEnableJMX", new Object[] { enableJMXAsString }),
                    SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
        }

        if (group != null) {
            this.connectionGroup = ConnectionGroupManager.getConnectionGroupInstance(group);
            if (enableJMX) {
                ConnectionGroupManager.registerJmx();
            }
            this.connectionGroupProxyID = this.connectionGroup.registerConnectionProxy(this, hosts);
            hosts = new ArrayList<String>(this.connectionGroup.getInitialHosts());
        }

        // hosts specifications may have been reset with settings from a previous connection group
        int numHosts = initializeHostsSpecs(hosts, props);

        this.liveConnections = new HashMap<String, ConnectionImpl>(numHosts);
        this.hostsToListIndexMap = new HashMap<String, Integer>(numHosts);
        for (int i = 0; i < numHosts; i++) {
            this.hostsToListIndexMap.put(this.hostList.get(i), i);
        }
        this.connectionsToHostsMap = new HashMap<ConnectionImpl, String>(numHosts);
        this.responseTimes = new long[numHosts];

        String retriesAllDownAsString = this.localProps.getProperty("retriesAllDown", "120");
        try {
            this.retriesAllDown = Integer.parseInt(retriesAllDownAsString);
        } catch (NumberFormatException nfe) {
            throw SQLError.createSQLException(
                    Messages.getString("LoadBalancedConnectionProxy.badValueForRetriesAllDown", new Object[] { retriesAllDownAsString }),
                    SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
        }

        String blacklistTimeoutAsString = this.localProps.getProperty(BLACKLIST_TIMEOUT_PROPERTY_KEY, "0");
        try {
            this.globalBlacklistTimeout = Integer.parseInt(blacklistTimeoutAsString);
        } catch (NumberFormatException nfe) {
            throw SQLError.createSQLException(
                    Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceBlacklistTimeout", new Object[] { retriesAllDownAsString }),
                    SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
        }

        String strategy = this.localProps.getProperty("loadBalanceStrategy", "random");
        if ("random".equals(strategy)) {
            this.balancer = (BalanceStrategy) Util.loadExtensions(null, props, "com.mysql.jdbc.RandomBalanceStrategy", "InvalidLoadBalanceStrategy", null)
                    .get(0);
        } else if ("bestResponseTime".equals(strategy)) {
            this.balancer = (BalanceStrategy) Util
                    .loadExtensions(null, props, "com.mysql.jdbc.BestResponseTimeBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0);
        } else {
            this.balancer = (BalanceStrategy) Util.loadExtensions(null, props, strategy, "InvalidLoadBalanceStrategy", null).get(0);
        }

        String autoCommitSwapThresholdAsString = props.getProperty("loadBalanceAutoCommitStatementThreshold", "0");
        try {
            this.autoCommitSwapThreshold = Integer.parseInt(autoCommitSwapThresholdAsString);
        } catch (NumberFormatException nfe) {
            throw SQLError.createSQLException(Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceAutoCommitStatementThreshold",
                    new Object[] { autoCommitSwapThresholdAsString }), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
        }

        String autoCommitSwapRegex = props.getProperty("loadBalanceAutoCommitStatementRegex", "");
        if (!("".equals(autoCommitSwapRegex))) {
            try {
                "".matches(autoCommitSwapRegex);
            } catch (Exception e) {
                throw SQLError.createSQLException(
                        Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceAutoCommitStatementRegex", new Object[] { autoCommitSwapRegex }),
                        SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
            }
        }

        if (this.autoCommitSwapThreshold > 0) {
            String statementInterceptors = this.localProps.getProperty("statementInterceptors");
            if (statementInterceptors == null) {
                this.localProps.setProperty("statementInterceptors", "com.mysql.jdbc.LoadBalancedAutoCommitInterceptor");
            } else if (statementInterceptors.length() > 0) {
                this.localProps.setProperty("statementInterceptors", statementInterceptors + ",com.mysql.jdbc.LoadBalancedAutoCommitInterceptor");
            }
            props.setProperty("statementInterceptors", this.localProps.getProperty("statementInterceptors"));

        }

        this.balancer.init(null, props);

        String lbExceptionChecker = this.localProps.getProperty("loadBalanceExceptionChecker", "com.mysql.jdbc.StandardLoadBalanceExceptionChecker");
        this.exceptionChecker = (LoadBalanceExceptionChecker) Util.loadExtensions(null, props, lbExceptionChecker, "InvalidLoadBalanceExceptionChecker", null)
                .get(0);

        pickNewConnection();
    }
}

做者:Owen Jia,推薦關注他的博客:Owen Blog

相關文章
相關標籤/搜索