偶然下和朋友聊到了mysql多節點集羣架場景,對應咱們各系代碼如何去使用它,牽扯到mysql的驅動包中已經支持的策略。發現這塊的概念有些模糊,索性就梳理了一番留着後用。重點是:replication\loadbalance,以mysql-connector-java:5.1.38爲例展開。html
官方java驅動文檔:Multi-Host Connectionsjava
Mysql的multi-connection策略,官方叫server failover。replication和loadbalance協議都是創建在failover基礎上演變而來對應mysql不一樣的部署架構:Cluster架構和Replication架構,這兩個再也不展開。mysql
jdbc:mysql://[primary host][:port],[secondary host 1][:port][,[secondary host 2][:port]]...[/[database]]» [?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]
官方URL格式:primary host表明主庫首選鏈接,secondary host表明從庫,依次排列嘗試鏈接同時支持不少配置屬性(詳細去看文檔):linux
在驅動包中類NonRegisteringDriver.java定義了mysql支持的多種鏈接協議,能夠看出有四種方式。mxj是指mysql會根據不一樣的系統平臺切換不一樣的mysqld binary執行,支持macos、linux、window、Solaris等等,replicaiton和loadbalance後面會介紹。sql
private static final String REPLICATION_URL_PREFIX = "jdbc:mysql:replication://"; private static final String URL_PREFIX = "jdbc:mysql://"; private static final String MXJ_URL_PREFIX = "jdbc:mysql:mxj://"; public static final String LOADBALANCE_URL_PREFIX = "jdbc:mysql:loadbalance://";
咱們根據Mysql支持的驅動類型也能夠看出一共三個實現類,常規使用的com.mysql.jdbc.Driver,主從庫模式下采用com.mysql.jdbc.ReplicationDriver,還有種爲兼容 MM.MySQL實現的org.djt.mm.mysql.Driver。數據庫
當如下三個場景中會出發loadbalance策略執行:macos
jdbc.diver= com.mysql.jdbc.ReplicationDriver
jdbc.url= jdbc:mysql:replication://127.0.0.1:3306,11.12.3.44:3306/mydb?autoReconnect=false服務器
具體格式相似failover,比較大的變化是第一個host爲master庫是write/read模式,後面都是slave庫是read模式,也是支持參數進行配置官網有詳細介紹。架構
jdbc\:mysql\:replication\://\[master host]\[:port],\[slave host 1]\[:port]\[,\[slave host 2][:port]]...[/[database]] » [?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]
replication協議是創建在failover和loadbalance基礎上,適應Replication架構須要爲解決讀寫分離、負載均衡場景的。在事務read only模式下請求會被轉向到slave host,若果多個slave狀況下采用round-robin(輪詢)策略。對於非read only請求(write/read)都將轉向到master host。5.1.27後版本支持多個master,多個master下采用load balance策略,具體參考loadbalance協議介紹。5.1.28版本後又支持動態添加節點,也就是程序運行是動態添加新的host到URL中而不須要重啓服務器,咱們常常會聊的動態數據源場景。負載均衡
Read only這裏是與數據庫創建的connection屬性,如Connection.setReadOnly(false)、Connection.setReadOnly(true)。
jdbc.dirver= com.mysql.jdbc.Driver
jdbc.url= jdbc:mysql:loadbalance://127.0.0.1:3306,11.12.3.44:3306/mydb?autoReconnect=false&loadBalanceStrategy=bestResponseTime
jdbc:mysql:loadbalance://[host1][:port],[host2][:port][,[host3][:port]]...[/[database]] » [?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]
格式同failover\replication相似,全部host沒有主次之分都是平級,也支持參數控制:
BalanceStrategy.java
public interface BalanceStrategy extends Extension { public abstract ConnectionImpl pickConnection(LoadBalancedConnectionProxy proxy, List<String> configuredHosts, Map<String, ConnectionImpl> liveConnections,long[] responseTimes, int numRetries) throws SQLException; }
loadbalance負載均衡策略定義了BalanceStrategy接口,mysql支持已經實現該接口的策略有:BestResponseTimeBalanceStrategy、RandomBalanceStrategy(默認)、SequentialBalanceStrategy。
經過源碼 Driver -> NonRegisteringDriver.connect(xx) -> NonRegisteringDriver.connectLoadBalanced() -> LoadBalancedConnectionProxy.createProxyInstance() 能夠看到默認採用策略是 RandomBalanceStrategy。
RandomBalanceStrategy.java
int random = (int) Math.floor((Math.random() * whiteList.size())); String hostPortSpec = whiteList.get(random); ConnectionImpl conn = liveConnections.get(hostPortSpec);
BestResponseTimeBalanceStrategy.pickConnection(..)
for (int i = 0; i < responseTimes.length; i++) { long candidateResponseTime = responseTimes[i]; if (candidateResponseTime < minResponseTime && !blackList.containsKey(configuredHosts.get(i))) { if (candidateResponseTime == 0) { bestHostIndex = i; break; } bestHostIndex = i; minResponseTime = candidateResponseTime; } }
SequentialBalanceStrategy.pickConnection(..)
if (numHosts == 1) { this.currentHostIndex = 0; // pathological case } else if (this.currentHostIndex == -1) { int random = (int) Math.floor((Math.random() * numHosts)); for (int i = random; i < numHosts; i++) { if (!blackList.containsKey(configuredHosts.get(i))) { this.currentHostIndex = i; break; } } if (this.currentHostIndex == -1) { for (int i = 0; i < random; i++) { if (!blackList.containsKey(configuredHosts.get(i))) { this.currentHostIndex = i; break; } } } //... } else { int i = this.currentHostIndex + 1; boolean foundGoodHost = false; for (; i < numHosts; i++) { if (!blackList.containsKey(configuredHosts.get(i))) { this.currentHostIndex = i; foundGoodHost = true; break; } } if (!foundGoodHost) { for (i = 0; i < this.currentHostIndex; i++) { if (!blackList.containsKey(configuredHosts.get(i))) { this.currentHostIndex = i; foundGoodHost = true; break; } } } //... }
NonRegisteringDriver.java
public class NonRegisteringDriver implements java.sql.Driver { if (StringUtils.startsWithIgnoreCase(url, LOADBALANCE_URL_PREFIX)) { return connectLoadBalanced(url, info); } else if (StringUtils.startsWithIgnoreCase(url, REPLICATION_URL_PREFIX)) { return connectReplicationConnection(url, info); } }
LoadBalancedConnectionProxy.java
public class LoadBalancedConnectionProxy extends MultiHostConnectionProxy implements PingTarget { /** * Creates a proxy for java.sql.Connection that routes requests between the given list of host:port and uses the given properties when creating connections. * * @param hosts * The list of the hosts to load balance. * @param props * Connection properties from where to get initial settings and to be used in new connections. * @throws SQLException */ private LoadBalancedConnectionProxy(List<String> hosts, Properties props) throws SQLException { super(); String group = props.getProperty("loadBalanceConnectionGroup", null); boolean enableJMX = false; String enableJMXAsString = props.getProperty("loadBalanceEnableJMX", "false"); try { enableJMX = Boolean.parseBoolean(enableJMXAsString); } catch (Exception e) { throw SQLError.createSQLException( Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceEnableJMX", new Object[] { enableJMXAsString }), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null); } if (group != null) { this.connectionGroup = ConnectionGroupManager.getConnectionGroupInstance(group); if (enableJMX) { ConnectionGroupManager.registerJmx(); } this.connectionGroupProxyID = this.connectionGroup.registerConnectionProxy(this, hosts); hosts = new ArrayList<String>(this.connectionGroup.getInitialHosts()); } // hosts specifications may have been reset with settings from a previous connection group int numHosts = initializeHostsSpecs(hosts, props); this.liveConnections = new HashMap<String, ConnectionImpl>(numHosts); this.hostsToListIndexMap = new HashMap<String, Integer>(numHosts); for (int i = 0; i < numHosts; i++) { this.hostsToListIndexMap.put(this.hostList.get(i), i); } this.connectionsToHostsMap = new HashMap<ConnectionImpl, String>(numHosts); this.responseTimes = new long[numHosts]; String retriesAllDownAsString = this.localProps.getProperty("retriesAllDown", "120"); try { this.retriesAllDown = Integer.parseInt(retriesAllDownAsString); } catch (NumberFormatException nfe) { throw SQLError.createSQLException( Messages.getString("LoadBalancedConnectionProxy.badValueForRetriesAllDown", new Object[] { retriesAllDownAsString }), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null); } String blacklistTimeoutAsString = this.localProps.getProperty(BLACKLIST_TIMEOUT_PROPERTY_KEY, "0"); try { this.globalBlacklistTimeout = Integer.parseInt(blacklistTimeoutAsString); } catch (NumberFormatException nfe) { throw SQLError.createSQLException( Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceBlacklistTimeout", new Object[] { retriesAllDownAsString }), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null); } String strategy = this.localProps.getProperty("loadBalanceStrategy", "random"); if ("random".equals(strategy)) { this.balancer = (BalanceStrategy) Util.loadExtensions(null, props, "com.mysql.jdbc.RandomBalanceStrategy", "InvalidLoadBalanceStrategy", null) .get(0); } else if ("bestResponseTime".equals(strategy)) { this.balancer = (BalanceStrategy) Util .loadExtensions(null, props, "com.mysql.jdbc.BestResponseTimeBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0); } else { this.balancer = (BalanceStrategy) Util.loadExtensions(null, props, strategy, "InvalidLoadBalanceStrategy", null).get(0); } String autoCommitSwapThresholdAsString = props.getProperty("loadBalanceAutoCommitStatementThreshold", "0"); try { this.autoCommitSwapThreshold = Integer.parseInt(autoCommitSwapThresholdAsString); } catch (NumberFormatException nfe) { throw SQLError.createSQLException(Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceAutoCommitStatementThreshold", new Object[] { autoCommitSwapThresholdAsString }), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null); } String autoCommitSwapRegex = props.getProperty("loadBalanceAutoCommitStatementRegex", ""); if (!("".equals(autoCommitSwapRegex))) { try { "".matches(autoCommitSwapRegex); } catch (Exception e) { throw SQLError.createSQLException( Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceAutoCommitStatementRegex", new Object[] { autoCommitSwapRegex }), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null); } } if (this.autoCommitSwapThreshold > 0) { String statementInterceptors = this.localProps.getProperty("statementInterceptors"); if (statementInterceptors == null) { this.localProps.setProperty("statementInterceptors", "com.mysql.jdbc.LoadBalancedAutoCommitInterceptor"); } else if (statementInterceptors.length() > 0) { this.localProps.setProperty("statementInterceptors", statementInterceptors + ",com.mysql.jdbc.LoadBalancedAutoCommitInterceptor"); } props.setProperty("statementInterceptors", this.localProps.getProperty("statementInterceptors")); } this.balancer.init(null, props); String lbExceptionChecker = this.localProps.getProperty("loadBalanceExceptionChecker", "com.mysql.jdbc.StandardLoadBalanceExceptionChecker"); this.exceptionChecker = (LoadBalanceExceptionChecker) Util.loadExtensions(null, props, lbExceptionChecker, "InvalidLoadBalanceExceptionChecker", null) .get(0); pickNewConnection(); } }
做者:Owen Jia,推薦關注他的博客:Owen Blog。