基於時間輪片方式處理超時任務

背景

最近收到小夥伴的一個吐槽:「項目裏的某個函數是同步阻塞的,沒法肯定其運行時間,某些狀況下,可能出現長時間阻塞致使應用沒法響應」。爲了解決這個問題,他嘗試過用子線程+定時器的方式去異步處理,若是超時,則從新調用,但該函數會被頻繁調用,意味着每次調用都要建立一個定時器。聽到這個場景後,下意識想起以前看到的一篇文章:時間輪片(Timing Wheel)實現心跳機制。該文章主要描述了使用時間輪片的方式去處理TCP心跳鏈接,從而避免每一個鏈接都要開啓一個計時器。明確了時間輪片方式的優點後,便嘗試着手實現一個通用的基於時間輪片方式處理超時任務的框架。git

時間輪

簡單來講,時間輪就是一個循環列表,每一個列表中包含一個稱爲槽的結構,這個結構一般也能夠是一個列表,且每隔必定時間就會將指針向前移動。github

圖片來源

iOS 時間輪實現方案

可使用一個嵌套數組的形式來定義時間輪結構,並用定時器去定時遍歷列表中的元素。swift

class TimeWheel {
    private var capacity: Int
    private var interval: TimeInterval
    private var timeWheel: [[Any]]
    var index: Int
    private var timer: Timer?
    weak var delegate: TimeWheelDelegate?
}
複製代碼
  • 初始化時,咱們須要創建N個空槽,用於存取數據
init(_ capacity: Int, _ interval: TimeInterval) {
    self.capacity = capacity
    self.interval = interval
    self.index = 0
    timeWheel = []
    for _ in 0 ..< capacity { //先填充空數組,建立若干個「空槽」
        self.timeWheel.append([])
    }
}
複製代碼
  • 添加任務時,如未啓動定時器,則啓動定時器,並把元素添加到當前槽位中
func addObject(_ task: Any) {
    if timer == nil {
        timer = Timer.scheduledTimer(timeInterval: 1.0, target: self, selector: #selector(detectTimeoutItem(_:)), userInfo: nil, repeats: true)
        RunLoop.current.add(timer!, forMode: .common)
    }
    
    if index < timeWheel.count {
        var arr = timeWheel[index]
        arr.append(task)
        timeWheel[index] = arr
    }
}
複製代碼
  • 定時檢查,先將位置移動到下一位,而後將對應槽位的元素傳遞給外部,最後清除該槽位的元素
@objc
private func detectTimeoutItem(_ timer: Timer) {
    moveToNextTimeSlot()
    delegate?.timeoutItems(self.currentObjects(), self)
    removeExpiredObjects()
}
複製代碼

完整代碼api

protocol TimeWheelDelegate : class {
    func timeoutItems(_ items: [Any]?, _ timeWheel: TimeWheel)
}

class TimeWheel {
    private var capacity: Int
    private var interval: TimeInterval
    private var timeWheel: [[Any]]
    var index: Int
    private var timer: Timer?
    weak var delegate: TimeWheelDelegate?
    
    init(_ capacity: Int, _ interval: TimeInterval) {
        self.capacity = capacity
        self.interval = interval
        self.index = 0
        timeWheel = []
        for _ in 0 ..< capacity { //先填充空數組,建立若干個「空槽」
            self.timeWheel.append([])
        }
    }
    
    func addObject(_ task: Any) {
        if timer == nil {
            timer = Timer.scheduledTimer(timeInterval: 1.0, target: self, selector: #selector(detectTimeoutItem(_:)), userInfo: nil, repeats: true)
            RunLoop.current.add(timer!, forMode: .common)
        }
        
        if index < timeWheel.count {
            var arr = timeWheel[index]
            arr.append(task)
            timeWheel[index] = arr
        }
    }
    
    func currentObjects() -> [Any]? {
        if index < timeWheel.count {
            return timeWheel[index]
        }
        return nil
    }
    
    func cleanup() {
        self.timeWheel.removeAll()
        if timer != nil {
            timer?.invalidate()
            timer = nil
        }
    }
    
    private func removeExpiredObjects() {
        if index < timeWheel.count {
            var arr = timeWheel[index]
            arr.removeAll()
        }
    }
    
    private func moveToNextTimeSlot() {
        index = (index + 1) % timeWheel.count
    }
    
    @objc
    private func detectTimeoutItem(_ timer: Timer) {
        moveToNextTimeSlot()
        delegate?.timeoutItems(self.currentObjects(), self)
        removeExpiredObjects()
    }
}
複製代碼

任務管理

  • 定義一個任務協議,用於定義其通用行爲
protocol Task {
    associatedtype T
    func taskKey() -> String //任務對應的惟一key,用於區分任務
    func doTask() -> T // 實現任務行爲
    var completion: ((_ result: T?, _ timeout: Bool) -> Void)? {get set} //返回的異步結果
}
複製代碼
  • 定義一個具體的Task
class NetworkTask: Task {
    typealias T = String
    var completion: ((String?, Bool) -> Void)?
    
    var hostName: String
    
    init(_ name: String) {
        hostName = name
    }
    
    func taskKey() -> String {
        return hostName
    }
    
    func doTask() -> String {
        Thread.sleep(forTimeInterval: Double.random(in: 1...20)) //模擬耗時任務
        return "\(hostName)'s result"
    }
    
}
複製代碼
  • 任務管理

爲了保證任務的獨立容許,須要建立一個併發隊列,且使用字典存儲已添加的任務,以便確認任務是按時完成回調的,仍是超時致使回調的。數組

class TaskManager<T: Task> : TimeWheelDelegate {
    
    private var timeWheel: TimeWheel?
    private var timeInterval: TimeInterval
    private var timeoutSeconds: Int
    private var queue: DispatchQueue
    private var callbackDict: Dictionary<String, T>
    
    init(_ timeout: Int, _ timeInterval: TimeInterval) {
        timeoutSeconds = timeout
        self.timeInterval = timeInterval
        queue = DispatchQueue(label: "com.task.queue", qos: .default, attributes: .concurrent, autoreleaseFrequency: .workItem, target: nil)
        callbackDict = [:]
    }
}
複製代碼
  • 添加任務:開啓時間輪,且將任務提交到隊列中
func appendTask(_ task: T, _ completion:@escaping (_ result: T.T?, _ timeout: Bool) -> (Void)) {
    
    if timeWheel == nil {
        timeWheel = TimeWheel(timeoutSeconds, timeInterval)
        timeWheel?.delegate = self
    }
    
    var task = task
    task.completion = completion
    self.callbackDict[task.taskKey()] = task
    self.timeWheel?.addObject(task) //將任務添加到對應的時間輪槽位中
    
    self.queue.async {
        let result = task.doTask()
        DispatchQueue.main.async { //保證數據的一致性
            let key = task.taskKey()
            if let item = self.callbackDict[key] {
                item.completion?(result, false) //返回按時完成任務的結果
                self.callbackDict.removeValue(forKey: key)
            }
        }
    }
}
複製代碼
  • 處理超時任務:經過定時輪返回的過時數據,將任務超時回調返回。
func timeoutItems(_ items: [Any]?, _ timeWheel: TimeWheel) {
    if let callbacks = items {
        for callback in callbacks {
            if let item = callback as? T, let task = self.callbackDict[item.taskKey()] {
                task.completion?(nil, true)
                self.callbackDict.removeValue(forKey: task.taskKey())
            }
        }
    }
}
複製代碼

完整代碼併發

class TaskManager<T: Task> : TimeWheelDelegate {
    
    private var timeWheel: TimeWheel?
    private var timeInterval: TimeInterval
    private var timeoutSeconds: Int
    private var queue: DispatchQueue
    private var callbackDict: Dictionary<String, T>
    
    init(_ timeout: Int, _ timeInterval: TimeInterval) {
        timeoutSeconds = timeout
        self.timeInterval = timeInterval
        queue = DispatchQueue(label: "com.task.queue", qos: .default, attributes: .concurrent, autoreleaseFrequency: .workItem, target: nil)
        callbackDict = [:]
    }
    
    func appendTask(_ task: T, _ completion:@escaping (_ result: T.T?, _ timeout: Bool) -> (Void)) {
        
        if timeWheel == nil {
            timeWheel = TimeWheel(timeoutSeconds, timeInterval)
            timeWheel?.delegate = self
        }
        
        var task = task
        task.completion = completion
        self.callbackDict[task.taskKey()] = task
        self.timeWheel?.addObject(task) //將任務添加到對應的時間輪槽位中
        
        self.queue.async {
            let result = task.doTask()
            DispatchQueue.main.async { //保證數據的一致性
                let key = task.taskKey()
                if let item = self.callbackDict[key] {
                    item.completion?(result, false) //返回按時完成任務的結果
                    self.callbackDict.removeValue(forKey: key)
                }
            }
        }
    }
    
    func timeoutItems(_ items: [Any]?, _ timeWheel: TimeWheel) {
        if let callbacks = items {
            for callback in callbacks {
                if let item = callback as? T, let task = self.callbackDict[item.taskKey()] {
                    task.completion?(nil, true)
                    self.callbackDict.removeValue(forKey: task.taskKey())
                }
            }
        }
    }
}
複製代碼

使用示例

定義任務超時時間爲10s,並每1s進行檢查一次。這裏加了一個隨機時間添加任務,以便測試到時間輪不一樣輪的狀況。app

let manager = TaskManager<NetworkTask>(10, 1)
for i in 0 ..< 5 {
    let task = NetworkTask("host-\(i)")
    DispatchQueue.main.asyncAfter(deadline: .now()+Double.random(in: 0...20.0)) {
        print("task:\(task.hostName) do task in \(Date.init())")
        manager.appendTask(task) { (result, timeout) -> (Void) in
            print("task:\(task.hostName), result:\(result ?? "null"), timeout:\(timeout), time:\(Date.init())")
        }
    }
}

結果數據:
task:host-4 do task in 2020-03-19 11:56:46 +0000
task:host-1 do task in 2020-03-19 11:56:47 +0000
task:host-2 do task in 2020-03-19 11:56:56 +0000
task:host-4, result:null, timeout:true, time:2020-03-19 11:56:56 +0000
task:host-1, result:null, timeout:true, time:2020-03-19 11:56:56 +0000
task:host-2, result:host-2's result, timeout:false, time:2020-03-19 11:57:01 +0000
task:host-3 do task in 2020-03-19 11:57:03 +0000
task:host-0 do task in 2020-03-19 11:57:03 +0000
task:host-0, result:host-0's result, timeout:false, time:2020-03-19 11:57:09 +0000
task:host-3, result:null, timeout:true, time:2020-03-19 11:57:12 +0000
複製代碼

根據結果,能夠看到,若任務10s內能按時完成,則返回對應的任務結果,不然返回timeouttrue,並返回一個空結果。框架

總結

經過此次的事例,實現一個基於時間輪方式來處理超時任務的簡單框架,從必定程度上避免了性能的消耗。dom

demo異步

相關文章
相關標籤/搜索