本文主要研究一下artemis的transactionTimeoutScanPeriodjava
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.javagit
public class ConfigurationImpl implements Configuration, Serializable { //...... private long transactionTimeoutScanPeriod = ActiveMQDefaultConfiguration.getDefaultTransactionTimeoutScanPeriod(); //...... @Override public long getTransactionTimeoutScanPeriod() { return transactionTimeoutScanPeriod; } @Override public ConfigurationImpl setTransactionTimeoutScanPeriod(final long period) { transactionTimeoutScanPeriod = period; return this; } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.javagithub
public class ActiveMQServerImpl implements ActiveMQServer { //...... synchronized boolean initialisePart1(boolean scalingDown) throws Exception { //...... resourceManager = new ResourceManagerImpl((int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool); //...... } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.javaapache
public class ResourceManagerImpl implements ResourceManager { private final ConcurrentMap<Xid, Transaction> transactions = new ConcurrentHashMap<>(); private final List<HeuristicCompletionHolder> heuristicCompletions = new ArrayList<>(); private final int defaultTimeoutSeconds; private boolean started = false; private TxTimeoutHandler task; private final long txTimeoutScanPeriod; private final ScheduledExecutorService scheduledThreadPool; public ResourceManagerImpl(final int defaultTimeoutSeconds, final long txTimeoutScanPeriod, final ScheduledExecutorService scheduledThreadPool) { this.defaultTimeoutSeconds = defaultTimeoutSeconds; this.txTimeoutScanPeriod = txTimeoutScanPeriod; this.scheduledThreadPool = scheduledThreadPool; } // ActiveMQComponent implementation @Override public int size() { return transactions.size(); } @Override public void start() throws Exception { if (started) { return; } task = new TxTimeoutHandler(); Future<?> future = scheduledThreadPool.scheduleAtFixedRate(task, txTimeoutScanPeriod, txTimeoutScanPeriod, TimeUnit.MILLISECONDS); task.setFuture(future); started = true; } @Override public void stop() throws Exception { if (!started) { return; } if (task != null) { task.close(); } started = false; } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.javaide
private class TxTimeoutHandler implements Runnable { private boolean closed = false; private Future<?> future; @Override public void run() { if (closed) { return; } Set<Transaction> timedoutTransactions = new HashSet<>(); long now = System.currentTimeMillis(); for (Transaction tx : transactions.values()) { if (tx.hasTimedOut(now, defaultTimeoutSeconds)) { Transaction removedTX = removeTransaction(tx.getXid()); if (removedTX != null) { ActiveMQServerLogger.LOGGER.timedOutXID(removedTX.getXid()); timedoutTransactions.add(removedTX); } } } for (Transaction failedTransaction : timedoutTransactions) { try { failedTransaction.rollback(); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorTimingOutTX(e, failedTransaction.getXid()); } } } synchronized void setFuture(final Future<?> future) { this.future = future; } void close() { if (future != null) { future.cancel(false); } closed = true; } }
ActiveMQServerImpl的initialisePart1使用configuration.getTransactionTimeout()、configuration.getTransactionTimeoutScanPeriod()、scheduledPool建立了ResourceManagerImpl;ResourceManagerImpl實現了ResourceManager接口,其start方法建立了TxTimeoutHandler,並以txTimeoutScanPeriod的fixedRate去調度執行;TxTimeoutHandler實現了Runnable接口,其run方法會遍歷transactions,挨個執行tx.hasTimedOut(now, defaultTimeoutSeconds),對於timeout的則執行removeTransaction(tx.getXid()),以後挨個執行rollbackthis