spring事務與消息隊列

在開發過程當中,遇到一個bug,產生bug的緣由是spring事務提交晚於消息隊列的生產消息,致使消息隊列消費消息時獲取到的數據不正確。這篇文章介紹問題的產生和一步步的解決過程。java

一.問題的產生:spring

場景還原:接口中的一個方法,首先修改訂單狀態,而後向消息隊列中生產消息,消息隊列的消費者獲取到消息檢測訂單狀態,發現訂單狀態未更改。數據庫

代碼:代理

@Service(orderApi)
public class OrderApiImpl implements OrderApi {
	@Resource MqService mqService;
	@OrderDao orderDao;
	
	public void push(String orderId) {
		// 更新訂單狀態,以前的狀態是1
		updateStatus(orderId, 3);
		// 產生消息
		mqService.produce(orderId);
	}
	public viod updateStatus(String orderId, Integer status) {
		orderDao.updateStatus(orderId, status);
	}
}

問題產生緣由:orderApi中的全部方法都有事務,事務類型PROPAGATION_REQUIRED,因此push方法對數據的操做會在push代碼所有執行以後提交,而在事務提交以前消息隊列的消息已經產生因此消息隊列中消費到的訂單從數據庫查詢出的狀態可能還爲1。爲了讓bug現象更明顯,能夠在push方法最後添加:對象

try {
	Thread.sleep(10000);
} catch (InterruptedException e) {
	// TODO Auto-generated catch block
	e.printStackTrace();
}

這樣就會發現消費消息時,訂單狀態必定是未修改的。blog

 

二.問題的解決:接口

解決方案:在更新數據時,新建一個事物,保證更新代碼執行完成後,更新數據庫的事務已被提交。(確保消息產生前數據庫操做已提交)隊列

按照上述方案,我首先想到的是直接修改updateStatus方法的事務類型;我將此方法的事務類型改成PROPAGATION_REQUIRES_NEW(新建事務,若是當前存在事務,把當前事務掛起)。事務

可是這麼作有兩點不合適:開發

  1.強制修改了updateStaus的事務類型,可能影響其餘流程。

  2.未起到做用,updateStaus方法中沒有新建事務。

關於第二點的解釋:spring添加事務是經過BeanNameAutoProxyCreator實現的動態代理,只是給bean對象添加了事務,如今在類內部調用方法,是不會觸發新事物的建立的。

因此在通過以上嘗試後,我建立了一個新的類:

@Service("orderExtApi")
public class OrderExtApiImpl {
	@Resource OrderApi orderApi;
	
	public void updateStatusNewPropagation(String orderId) {
		orderApi.updateStatus(orderId);
	}
}

併爲updateStatusNewPropagation方法添加事務PROPAGATION_REQUIRES_NEW

這個類就只是爲了給orderApi中的updateStaus方法新起一個事務。

ok,到此爲止bug已經解決了。

可是代碼中仍是存在問題:對數據庫的操做已經提交,若是生產消息出現異常對業務邏輯來講仍是錯誤的。因此須要檢測消息的產生是否完成。

最終orderApi中的代碼以下:

@Service(orderApi)
public class OrderApiImpl implements OrderApi {
	@Resource MqService mqService;
	@Resource OrderDao orderDao;
	@Resource OrderExtApiImpl orderExtApi;
	
	public void push(String orderId) {
		// 更新訂單狀態,以前的狀態是1
		orderExtApi.updateStatusNewPropagation(orderId, 3);
		// 產生消息--produce會檢測是否出現異常 當返回1時表示生產消息成功
		Response response = mqService.produce(orderId);
		if (response.getCode() != 1) {
			log.info("消息隊列生產消息異常:" + response.getErrorMsg())
			// 生產消息異常,重置狀態  等待下次從新執行
			orderExtApi.updateStatusNewPropagation(orderId, 1);
		}
		
	}
	public viod updateStatus(String orderId, Integer status) {
		orderDao.updateStatus(orderId, status);
	}
}
相關文章
相關標籤/搜索