接Java鏈接數據庫 #01# JDBC單線程適用,主要是改成多線程適用,順便對DAO層結構進行改良:java
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -mysql
① LocalConnectionFactory.java 負責建立connection/用ThreadLocal變量保存connectionlinux
package org.sample.db; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.ResourceBundle; public class LocalConnectionFactory { private LocalConnectionFactory() { // Exists to defeat instantiation } private static ResourceBundle rb = ResourceBundle.getBundle("org.sample.db.db-config"); private static final String JDBC_URL = rb.getString("jdbc.url"); private static final String JDBC_USER = rb.getString("jdbc.username"); private static final String JDBC_PASSWORD = rb.getString("jdbc.password"); private static final ThreadLocal<Connection> LocalConnectionHolder = new ThreadLocal<>(); public static Connection getConnection() throws SQLException { Connection conn = LocalConnectionHolder.get(); if (conn == null || conn.isClosed()) { conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD); LocalConnectionHolder.set(conn); } return conn; } public static void removeLocalConnection() { LocalConnectionHolder.remove(); } }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -sql
② LocalConnectionProxy.java 經過該類間接控制connection,在Service層控制事務(代碼分層有錯誤!)數據庫
package org.sample.manager; import org.sample.db.LocalConnectionFactory; import org.sample.exception.DaoException; import java.sql.Connection; import java.sql.SQLException; public class LocalConnectionProxy { public static void setAutoCommit(boolean autoCommit) throws DaoException { try { Connection conn = LocalConnectionFactory.getConnection(); conn.setAutoCommit(autoCommit); } catch (SQLException e) { throw new DaoException(e); } } // public static void setTransactionIsolation(int level) throws DaoException { // try { // Connection conn = LocalConnectionFactory.getConnection(); // conn.setTransactionIsolation(level); // } catch (SQLException e) { // throw new DaoException(e); // } // } public static void commit() throws DaoException { try { Connection conn = LocalConnectionFactory.getConnection(); conn.commit(); } catch (SQLException e) { throw new DaoException(e); } } public static void rollback() throws DaoException { try { Connection conn = LocalConnectionFactory.getConnection(); conn.rollback(); } catch (SQLException e) { throw new DaoException(e); } } public static void close() throws DaoException { try { Connection conn = LocalConnectionFactory.getConnection(); conn.close(); LocalConnectionFactory.removeLocalConnection(); } catch (SQLException e) { throw new DaoException(e); } } }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -segmentfault
③ ProfileDAO.java-2.0 標示出了可能拋出的異常緩存
package org.sample.dao; import org.sample.entity.Profile; import org.sample.exception.DaoException; import java.util.List; public interface ProfileDAO { int saveProfile(Profile profile); List<Profile> listProfileByNickname(String nickname); Profile getProfileByUsername(String username); int updateProfileById(Profile profile); int updatePassword(String username, String password); int updateLastOnline(String username); }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -安全
④ ProfileDAOImpl.java-2.0 改進了savePOJO方法,該類是線程安全的,而且適合採用單例模式。至於爲何不直接用靜態方法,在網上看到的是:靜態方法通常都是業務無關的,而DAO方法都是業務相關的,而且建立對象能夠方便框架(Spring)進行依賴注入,採用多態等。服務器
package org.sample.dao.impl; import org.sample.dao.ProfileDAO; import org.sample.db.LocalConnectionFactory; import org.sample.entity.Profile; import org.sample.exception.DaoException; import org.sample.util.DbUtil; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; public class ProfileDAOImpl implements ProfileDAO { public static final ProfileDAO INSTANCE = new ProfileDAOImpl(); private ProfileDAOImpl() {} @Override public int saveProfile(Profile profile) { int i = 0; try { Connection conn = LocalConnectionFactory.getConnection(); String sql = "INSERT ignore INTO `profiles`.`profile` (`username`, `password`, `nickname`) " + "VALUES (?, ?, ?)"; // 添加ignore出現重複不會拋出異常而是返回0 try (PreparedStatement ps = conn.prepareStatement(sql)) { ps.setString(1, profile.getUsername()); ps.setString(2, profile.getPassword()); ps.setString(3, profile.getNickname()); i = ps.executeUpdate(); } } catch (SQLException e) { throw new DaoException(e); } return i; } @Override public List<Profile> listProfileByNickname(String nickname) { List<Profile> result = new ArrayList<>(); try { Connection conn = LocalConnectionFactory.getConnection(); String sql = "SELECT `profile_id`, `username`, `password`, `nickname`, `last_online`, `gender`, `birthday`, `location`, `joined`" + "FROM `profiles`.`profile`" + "WHERE `nickname`=?"; try (PreparedStatement ps = conn.prepareStatement(sql)) { ps.setString(1, nickname); try (ResultSet rs = ps.executeQuery()) { while (rs.next()) { Profile profile = DbUtil.extractProfileFromResultSet(rs); result.add(profile); } } } } catch (SQLException e) { throw new DaoException(e); } return result; } @Override public Profile getProfileByUsername(String username) { Profile result = null; try { Connection conn = LocalConnectionFactory.getConnection(); String sql = "SELECT `profile_id`, `username`, `password`, `nickname`, `last_online`, `gender`, `birthday`, `location`, `joined`" + "FROM `profiles`.`profile`" + "WHERE `username`=?"; try (PreparedStatement ps = conn.prepareStatement(sql)) { ps.setString(1, username); try (ResultSet rs = ps.executeQuery()) { if (rs.next()) { result = DbUtil.extractProfileFromResultSet(rs); } } } } catch (SQLException e) { throw new DaoException(e); } return result; } @Override public int updateProfileById(Profile profile) { int i = 0; try { Connection conn = LocalConnectionFactory.getConnection(); String sql = "UPDATE `profiles`.`profile`" + "SET `nickname`=?, `gender`=?, `birthday`=?, `location`=? " + "WHERE `profile_id`=?"; try (PreparedStatement ps = conn.prepareStatement(sql)) { ps.setString(1, profile.getNickname()); ps.setString(2, profile.getGender() != null ? String.valueOf(profile.getGender()) : null); ps.setTimestamp(3, profile.getBirthday()); ps.setString(4, profile.getLocation()); ps.setLong(5, profile.getProfileId()); i = ps.executeUpdate(); } } catch (SQLException e) { throw new DaoException(e); } return i; } @Override public int updatePassword(String username, String password) { int i = 0; try { Connection conn = LocalConnectionFactory.getConnection(); String sql = "UPDATE `profiles`.`profile`" + "SET `password`=? " + "WHERE `username`=?"; try (PreparedStatement ps = conn.prepareStatement(sql)) { ps.setString(1, password); ps.setString(2, username); i = ps.executeUpdate(); } } catch (SQLException e) { throw new DaoException(e); } return i; } @Override public int updateLastOnline(String username) { int i = 0; try { Connection conn = LocalConnectionFactory.getConnection(); String sql = "UPDATE `profiles`.`profile`" + "SET `last_online`=CURRENT_TIMESTAMP " + "WHERE `username`=?"; try (PreparedStatement ps = conn.prepareStatement(sql)) { ps.setString(1, username); i = ps.executeUpdate(); } } catch (SQLException e) { throw new DaoException(e); } return i; } }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
⑤ DaoException.java(RuntimeException)
package org.sample.exception; public class DaoException extends OnesProfileException { public DaoException() { super(); } public DaoException(String message) { super(message); } public DaoException(String message, Throwable cause) { super(message, cause); } public DaoException(Throwable cause) { super(cause); } protected DaoException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
⑥DbUtil.java-2.0 改成一個單純的數據庫操做輔助類。。。
package org.sample.util; import org.sample.entity.Profile; import java.sql.ResultSet; import java.sql.SQLException; public class DbUtil { public static Profile extractProfileFromResultSet(ResultSet rs) throws SQLException { Profile profile = new Profile(); profile.setBirthday(rs.getTimestamp("birthday")); profile.setJoined(rs.getTimestamp("joined")); profile.setLast_online(rs.getTimestamp("last_online")); profile.setLocation(rs.getString("location")); profile.setNickname(rs.getString("nickname")); profile.setPassword(rs.getString("password")); profile.setProfileId(rs.getLong("profile_id")); profile.setUsername(rs.getString("username")); if (rs.getString("gender") != null) { profile.setGender(rs.getString("gender").charAt(0)); } return profile; } }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
⑦ ProfileDAOTest.java-2.0 各個方法測試之間再也不相互依賴
package org.sample.dao; import org.junit.Test; import org.sample.dao.impl.ProfileDAOImpl; import org.sample.entity.Profile; import org.sample.manager.LocalConnectionProxy; import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; public class ProfileDAOTest { private static final ProfileDAO PROFILE_DAO = ProfileDAOImpl.INSTANCE; private static final String ORIGIN_STRING = "hello"; private static final String PASSWORD = ORIGIN_STRING; private static String RandomString() { return Math.random() + ORIGIN_STRING + Math.random(); } private static Profile RandomProfile() { Profile profile = new Profile(RandomString(), PASSWORD, RandomString()); return profile; } @Test public void saveProfile() throws Exception { Profile profile = RandomProfile(); int i = PROFILE_DAO.saveProfile(profile); int j = PROFILE_DAO.saveProfile(profile); LocalConnectionProxy.close(); assertEquals(1, i); assertEquals(0, j); } @Test public void listProfileByNickname() throws Exception { final String nickName = RandomString(); Profile profile1 = new Profile(RandomString(), PASSWORD, nickName); Profile profile2 = new Profile(RandomString(), PASSWORD, nickName); Profile profile3 = new Profile(RandomString(), PASSWORD, nickName); PROFILE_DAO.saveProfile(profile1); PROFILE_DAO.saveProfile(profile2); PROFILE_DAO.saveProfile(profile3); List result = PROFILE_DAO.listProfileByNickname(nickName); LocalConnectionProxy.close(); assertEquals(3, result.size()); } @Test public void getProfileByUsername() throws Exception { Profile profile = RandomProfile(); PROFILE_DAO.saveProfile(profile); Profile result = PROFILE_DAO.getProfileByUsername(profile.getUsername()); LocalConnectionProxy.close(); assertNotNull(result); } @Test public void updateProfileById() throws Exception { Profile profile = RandomProfile(); PROFILE_DAO.saveProfile(profile); Profile temp = PROFILE_DAO.getProfileByUsername(profile.getUsername()); int i = PROFILE_DAO.updateProfileById(temp); LocalConnectionProxy.close(); assertEquals(1, i); } @Test public void updatePassword() throws Exception { Profile profile = RandomProfile(); PROFILE_DAO.saveProfile(profile); int i = PROFILE_DAO.updatePassword(profile.getUsername(), RandomString()); LocalConnectionProxy.close(); assertEquals(1, i); } @Test public void updateLastOnline() throws Exception { Profile profile = RandomProfile(); PROFILE_DAO.saveProfile(profile); int i = PROFILE_DAO.updateLastOnline(profile.getUsername()); LocalConnectionProxy.close(); assertEquals(1, i); } }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
⑧ DaoTest.java 簡單測試了下在併發狀況下代碼的運行情況
package org.sample.manager; import org.junit.Test; import org.sample.dao.ProfileDAO; import org.sample.dao.impl.ProfileDAOImpl; import org.sample.entity.Profile; import org.sample.exception.DaoException; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import static org.junit.Assert.assertTrue; public class DaoTest { private static final Logger LOGGER = Logger.getLogger(DaoTest.class.getName()); private static final String ORIGIN_STRING = "hello"; private static String RandomString() { return Math.random() + ORIGIN_STRING + Math.random(); } private static Profile RandomProfile() { Profile profile = new Profile(RandomString(), ORIGIN_STRING, RandomString()); return profile; } private static final ProfileDAO PROFILE_DAO = ProfileDAOImpl.INSTANCE; private class Worker implements Runnable { private final Profile profile = RandomProfile(); @Override public void run() { LOGGER.info(Thread.currentThread().getName() + " has started his work"); try { LocalConnectionProxy.setAutoCommit(false); PROFILE_DAO.saveProfile(profile); LocalConnectionProxy.commit(); } catch (DaoException e) { e.printStackTrace(); } finally { try { LocalConnectionProxy.close(); } catch (DaoException e) { e.printStackTrace(); } } LOGGER.info(Thread.currentThread().getName() + " has finished his work"); } } private static final int numTasks = 100; @Test public void test() throws Exception { List<Runnable> workers = new LinkedList<>(); for(int i = 0; i != numTasks; ++i) { workers.add(new Worker()); } assertConcurrent("Dao test ", workers, Integer.MAX_VALUE); } public static void assertConcurrent(final String message, final List<? extends Runnable> runnables, final int maxTimeoutSeconds) throws InterruptedException { final int numThreads = runnables.size(); final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>()); final ExecutorService threadPool = Executors.newFixedThreadPool(numThreads); try { final CountDownLatch allExecutorThreadsReady = new CountDownLatch(numThreads); final CountDownLatch afterInitBlocker = new CountDownLatch(1); final CountDownLatch allDone = new CountDownLatch(numThreads); for (final Runnable submittedTestRunnable : runnables) { threadPool.submit(new Runnable() { public void run() { allExecutorThreadsReady.countDown(); try { afterInitBlocker.await(); submittedTestRunnable.run(); } catch (final Throwable e) { exceptions.add(e); } finally { allDone.countDown(); } } }); } // wait until all threads are ready assertTrue("Timeout initializing threads! Perform long lasting initializations before passing runnables to assertConcurrent", allExecutorThreadsReady.await(runnables.size() * 10, TimeUnit.MILLISECONDS)); // start all test runners afterInitBlocker.countDown(); assertTrue(message +" timeout! More than" + maxTimeoutSeconds + "seconds", allDone.await(maxTimeoutSeconds, TimeUnit.SECONDS)); } finally { threadPool.shutdownNow(); } assertTrue(message + "failed with exception(s)" + exceptions, exceptions.isEmpty()); } }
在鏈接數量超出100時會拋出 com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Data source rejected establishment of connection, message from server: "Too many connections" ,這是由於超出了mysql默認的最大鏈接數,這個值是能夠自定義的,應根據具體的併發量、服務器性能、業務場景等各類因素綜合決定。參考鏈接池最大/最小鏈接數,自動釋放時間,設置多少合適?
★查看數據庫鏈接相關信息:
mysql> show status like 'Threads%'; +-------------------+-------+ | Variable_name | Value | +-------------------+-------+ | Threads_cached | 8 | | Threads_connected | 1 | | Threads_created | 3419 | | Threads_running | 1 | +-------------------+-------+ 4 rows in set (0.00 sec)
Threads_cached是數據庫緩存的鏈接數。
★查看最大鏈接數:
mysql> show variables like '%max_connections%'; +-----------------+-------+ | Variable_name | Value | +-----------------+-------+ | max_connections | 100 | +-----------------+-------+ 1 row in set (0.01 sec)
mysql> show global status like 'Max_used_connections';
ps. 官方解釋 - The maximum number of connections that have been in use simultaneously since the server started. +----------------------+-------+ | Variable_name | Value | +----------------------+-------+ | Max_used_connections | 101 | +----------------------+-------+ 1 row in set (0.00 sec)
可是我很奇怪爲何max_used_connections會大於max_connections,查文檔結果以下 ↓
mysqld actually permits max_connections+1
clients to connect. The extra connection is reserved for use by accounts that have theCONNECTION_ADMIN
or SUPER
privilege. By granting the SUPER
privilege to administrators and not to normal users (who should not need it), an administrator can connect to the server and use SHOW PROCESSLIST
to diagnose problems even if the maximum number of unprivileged clients are connected. See Section 13.7.6.29, 「SHOW PROCESSLIST Syntax」.
順便mark 10 MySQL variables that you should monitor - TechRepublic
一、簡化dao層方法命名
二、更改LocalConnectionFactory.java,依然採用Class.forName
package org.sample.db; import org.sample.exception.DaoException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.ResourceBundle; public class LocalConnectionFactory { private LocalConnectionFactory() { // Exists to defeat instantiation } private static ResourceBundle rb = ResourceBundle.getBundle("org.sample.db.db-config"); private static final String JDBC_URL = rb.getString("jdbc.url"); private static final String JDBC_USER = rb.getString("jdbc.username"); private static final String JDBC_PASSWORD = rb.getString("jdbc.password"); private static final ThreadLocal<Connection> LocalConnectionHolder = new ThreadLocal<>(); static { try { Class.forName("com.mysql.jdbc.Driver"); // 雖說JDBC4以後已經再也不須要Class.forName,可是可否 // 自動註冊和環境、版本的相關性很大,因此安全起見仍是加上這句比較好。 } catch (ClassNotFoundException e) { // TODO 日誌 throw new DaoException("could not register JDBC driver", e); } } public static Connection getConnection() throws SQLException { Connection conn = LocalConnectionHolder.get(); if (conn == null || conn.isClosed()) { conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD); LocalConnectionHolder.set(conn); } return conn; } public static void removeLocalConnection() { // TODO 應該先關掉再remove? LocalConnectionHolder.remove(); } }
三、補一張結構草圖(WRONG!!!):