個人物聯網項目(八)簡單分佈式調度

定時調度基本在任何平臺或多或少的要用到,實現定時調度的功能很簡單,我作過的項目中用到更多的是spring quartz或者spring task,它們在單機上使用定時任務配置是很是簡單的,可是在集羣環境中就須要面臨一個必須解決的問題:如何限定只有一臺機器在執行定時任務?java

其實spring quartz也能夠實現此功能,它是由數據庫的數據來肯定調度任務是否正在執行, 正在執行則其餘服務器就不能去執行該行調度數據,因此須要數據庫的11張表來執行此種功能,總的來講成本較高,操做起來也比較複雜。另一些開源的分佈式調度平臺也有一些,如噹噹網的elastic-job,淘寶的TBSchedule,包括阿里雲也有SchedulerX,這些分佈式調度平臺在必定程度上也能夠知足集羣環境下的功能需求。我當初在技術選型上,只想簡單無門檻,不想有太多的學習成本在裏面,尤爲針對目前階段的項目,想用一種簡單的方式來實現目的就行,因此儘可能基於目前的代碼和技術。redis

一 實現思路

主要利用Redis的(Redis用的雲集羣,暫時不須要考慮單點故障或者不穩定的狀況)函數setNX()來實現分佈式鎖,大概流程是首先是某個集羣環境的單邊服務器將某一任務標識名(簡單來講就是key)做爲鍵存到redis裏,併爲其設個過時時間,若是這個時候另外的單邊服務器也請求過來,先是經過setNX()看看是否能將任務標識名(同一個標識名)插入到redis裏,能夠的話就返回true,不能夠就返回false,若是返回false,說明此次的任務調度別的服務器已經在作了,不須要執行此次任務。若是返回true,說明此次任何調度是由本身來執行。spring

這個裏面因爲集羣環境下的每臺服務器到了時間點都會去執行一遍,固然確定只有一臺才能執行成功,這個裏面須要注意兩個事情:數據庫

  1. 定時調度的策略應該上一個任務完成到下一個任務開始的時間間隔,這樣的話才能保證集羣環境下其它的服務器下次搶佔鎖的機率,如spring task的fixedDelay。
  2. 調度時間循環間隔設置固然以具體業務場景爲準,但最好算好大概的每次業務執行的時間長短,而後根據這個時間長短來設置定時調度的循環間隔時間。好比說若是小於1s的調用,因爲使用redis會有10幾毫秒的運算耗費,所以不能保證在1s如下的時間間隔比較均勻。因此儘可能保證每臺服務器的均勻分佈來執行計劃任務。

二 代碼實現

1.鎖對象服務器

public class Lock { private String name; private String value; public Lock(String name, String value) { this.name = name; this.value = value; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } }

2.分佈式鎖工具類負載均衡

public class DistributedLockService { private final Logger log = LoggerFactory.getLogger(getClass()); private final static long LOCK_EXPIRE = 10;//單個業務持有鎖的時間10s,防止死鎖 private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); @Autowired private RedisTemplate<String, String> redisTemplate; /** * 嘗試獲取全局鎖 */ public boolean tryLock(Lock lock) { return getLock(lock,LOCK_EXPIRE); } /** * 操做redis獲取全局鎖 */ public boolean getLock(Lock lock,long lockExpireTime){ if (StringUtils.isEmpty(lock.getName()) || StringUtils.isEmpty(lock.getValue())) { return false; } // SETNX成功,則成功獲取一個鎖 if (setNX(lock.getName(), lock.getValue(),lockExpireTime)) { return true; }else {// SETNX失敗,說明鎖仍然被其餘對象保持 log.info(lock.getName()+" lock is exist!" + dateFormat.format(new Date()) + "###"); return false; } } /** * @Title: setNX * @Description: 設置鎖 */ private boolean setNX(final String key, final String value, final long expire) { return (Boolean) redisTemplate.execute(new RedisCallback<Boolean>() { @SuppressWarnings("unchecked") public Boolean doInRedis(RedisConnection connection) { byte[] keyBytes = ((RedisSerializer<String>) redisTemplate.getKeySerializer()).serialize(key); boolean locked = connection.setNX(keyBytes, ((RedisSerializer<String>)redisTemplate.getValueSerializer()).serialize(value)); if(locked){ connection.expire(keyBytes, expire); } return locked; } }); } /** * @Title: get * @Description: 根據key獲取value */ public Object get(final String key) { return redisTemplate.execute(new RedisCallback<Object>() { @SuppressWarnings("unchecked") public Object doInRedis(RedisConnection connection) throws DataAccessException { byte[] bs = connection.get(((RedisSerializer<String>)redisTemplate.getKeySerializer()).serialize(key)); return redisTemplate.getDefaultSerializer().deserialize(bs); } }); } /** * 釋放鎖 */ public void releaseLock(Lock lock) { if (!StringUtils.isEmpty(lock.getName())) { redisTemplate.delete(lock.getName()); } log.info(lock.getName()+" lock is unchecked!" + dateFormat.format(new Date()) + "###"); } }

 

3.定時調度實現運維

public class ScheduledTasks { @Autowired private DistributedLockService distributedLockService; private final static Logger log= Logger.getLogger(ScheduledTasks.class); private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); //5秒執行一次 @Scheduled(fixedDelay = 5000) public void doJob() { log.info("###sync start:"+ dateFormat.format(new Date()) + "###"); Lock lock = new Lock("xxlock" , "xxx"); if(distributedLockService.tryLock(lock)){ log.info("Gets the lock!" + dateFormat.format(new Date()) + "###"); //作具體業務...... distributedLockService.releaseLock(lock); } log.info("###sync end:"+ dateFormat.format(new Date()) + "###"); } }

 

三 繼續優化

上面將作具體業務的代碼耦合到了定時調度ScheduledTasks裏面,這塊須要優化下,後面咱們將具體的業務代碼單獨抽離出來作成一個rest服務,ScheduledTasks裏面經過接口請求去執行業務邏輯便可。分佈式

定時調度這塊後續咱們還在繼續優化,主要有以下:函數

1.將調度時間間隔,調度http請求接口,調度的動態開啓和關閉,查看目前的調度任務和執行的日誌作成後臺可視化界面,方便統一管理和運維。工具

2.集羣環境下的服務器作到分片,負載均衡。其實如今的並無嚴格作到負載均衡,其實集羣環境下每臺服務器都在執行,只是沒有執行具體業務而已,因此後續這塊本身將用代碼實現。

相關文章
相關標籤/搜索