任務調度

>>返回《C# 併發編程》html

1. 調度到線程池

Task task = Task.Run(() =>
{
    Thread.Sleep(TimeSpan.FromSeconds(2));
});

Task.Run 也能正常地返回結果,能使用異步 Lambda 表達式。下面代碼中 Task.Run 返回的 task 會在 2 秒後完成,並返回結果 13:編程

Task<int> task = Task.Run(async () =>
{ 
    await Task.Delay(TimeSpan.FromSeconds(2));
    return 13;
});

Task.Run 返回一個 Task (或 Task<T>)對象,該對象能夠被異步或響應式代碼正常使用。併發

注意: 但不要在 ASP.NET 中使用 Task.Run,除非你有絕對的把握。在 ASP.NET 中, 處理請求的代碼原本就是在 ASP.NET 線程池線程中運行的,強行把它放到另外一個線程池線程一般會拔苗助長。
但UI程序,使用Task.Run能夠執行耗時操做,有效的防止頁面卡住問題。框架

在進行動態並行開發時, 必定要用 Task.Factory.StartNew 來代替 Task.Run異步

  • 由於根據默認配置, Task.Run 返回的 Task 對象適合被異步調用(即被異步代碼或響應式代碼使用)。
  • Task.Run 也不支持動態並行代碼中廣泛使用的高級概念,例如 父/子任務。

2. 任務調度器

須要讓多個代碼段按照指定的方式運行。例如async

  • 讓全部代碼段在 UI 線程中運行
  • 只容許特定數量的代碼段同時運行。

2.1. Default 調度器

TaskScheduler.Default,它的做用是讓任務在線程池中排隊, Task.Run、並行、數據流的代碼用的都是 TaskScheduler.Defaultspa

2.2. 捕獲當前同步上下文 調度器

能夠捕獲一個特定的上下文,用 TaskScheduler.FromCurrentSynchronizationContext 調度任務,讓它回到該上下文:pwa

TaskScheduler scheduler = TaskScheduler.FromCurrentSynchronizationContext();
這條語句建立了一個捕獲當前 同步上下文TaskScheduler 對象,並將代碼調度到這個上下文中線程

  • SynchronizationContext 類表示一個通用的調度上下文。
  • 大多數 UI 框架有一個表示 UI 線程的 同步上下文
  • ASP.NET 有一個表示 HTTP 請求的 同步上下文

建議:
在 UI 線程上執行代碼時,永遠不要使用針對特定平臺的類型。 + WPF、IOS、Android 都有 Dispatchercode

  • Windows 應用商店平臺使用 CoreDispatcher
  • WinForms 有 ISynchronizeInvoke 接口(即 Control.Invoke

不要在新寫的代碼中使用這些類型,就當它們不存在吧。使用這些類型會使代碼無謂地綁定在某個特定平臺上。

同步上下文 是通用的、基於上述類型的抽象類。

2.3. ConcurrentExclusiveSchedulerPair 調度器

它其實是兩個互相關聯的調度器。 只要 ExclusiveScheduler 上沒有運行任務, ConcurrentScheduler 就可讓多個任務同時執行。只有當 ConcurrentScheduler 沒有執行任務時, ExclusiveScheduler 才能夠執行任務,而且每次只容許運行一個任務:

public static void ConcurrentExclusiveSchedulerPairRun()
{
    var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, maxConcurrencyLevel: 2);
    //因爲並行被限流,因此ConcurrentScheduler 會兩個兩個輸出,而後執行完這兩個開啓的8個串行任務
    TaskScheduler concurrent = schedulerPair.ConcurrentScheduler;
    TaskScheduler exclusive = schedulerPair.ExclusiveScheduler;

    //Default 因爲沒有限制,因此第一層會先輸出,所有隨機
    // TaskScheduler concurrent = TaskScheduler.Default;
    // TaskScheduler exclusive =TaskScheduler.Default;

    var list = new List<List<int>>();
    for (int i = 0; i < 4; i++)
    {
        var actionList = new List<int>();
        list.Add(actionList);
        for (int j = 0; j < 4; j++)
        {
            actionList.Add(i * 10 + j);
        }
    }

    var tasks = list.Select(u => Task.Factory.StartNew(state =>
    {
        System.Console.WriteLine($"ConcurrentScheduler");
        ((List<int>)state).Select(i => Task.Factory.StartNew(state2 => System.Console.WriteLine($"ExclusiveScheduler:{state2}"), i, CancellationToken.None, TaskCreationOptions.None, exclusive)).ToArray();
    }, u, CancellationToken.None, TaskCreationOptions.None, concurrent));


    Task.WaitAll(tasks.ToArray());
}

輸出:

ConcurrentScheduler
ConcurrentScheduler
ExclusiveScheduler:0
ExclusiveScheduler:1
ExclusiveScheduler:2
ExclusiveScheduler:3
ExclusiveScheduler:10
ExclusiveScheduler:11
ExclusiveScheduler:12
ExclusiveScheduler:13
ConcurrentScheduler
ConcurrentScheduler
ExclusiveScheduler:20
ExclusiveScheduler:21
ExclusiveScheduler:22
ExclusiveScheduler:23
ExclusiveScheduler:30
ExclusiveScheduler:31
ExclusiveScheduler:32
ExclusiveScheduler:33

ConcurrentExclusiveSchedulerPair 的常見用法是

  • ExclusiveScheduler 來確保每次只運行一個任務。
  • ExclusiveScheduler 執行的代碼會在線程池中運行,可是使用了同一個 ExclusiveScheduler 對象的其餘代碼不能同時運行。

ConcurrentExclusiveSchedulerPair 的另外一個用法是做爲限流調度器。

  • 建立的 ConcurrentExclusiveSchedulerPair 對象能夠限制自身的併發數量。
  • 這時一般不使用 ExclusiveScheduler
var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default,maxConcurrencyLevel: 8);
TaskScheduler scheduler = schedulerPair.ConcurrentScheduler;

3. 調度並行代碼

public static void RotateMatricesRun()
{
    List<List<Action<float>>> actionLists = new List<List<Action<float>>>();
    for (int i = 0; i < 15; i++)
    {
        var actionList = new List<Action<float>>();
        actionLists.Add(actionList);
        for (int j = 0; j < 15; j++)
        {
            actionList.Add(new Action<float>(degrees =>
            {
                Thread.Sleep(200);
                System.Console.WriteLine("degrees:" + degrees + " " + DateTime.Now.ToString("HHmmss.fff"));
            }));
        }
    }
    RotateMatrices(actionLists, 10);
    //雖然兩個並行嵌套可是因爲調度器的設置,致使任務是8個8個執行的,結果是8個後200ms再8個
}

static void RotateMatrices(IEnumerable<IEnumerable<Action<float>>> collections, float degrees)
{
    var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, maxConcurrencyLevel: 8);
    TaskScheduler scheduler = schedulerPair.ConcurrentScheduler;
    ParallelOptions options = new ParallelOptions
    {
        TaskScheduler = scheduler
    };
    Parallel.ForEach(collections, options,
        matrices =>
        {
            Parallel.ForEach(matrices,
                options,
                matrix => matrix.Invoke(degrees)
            );
            System.Console.WriteLine($"============");
        });
}

輸出:

degrees:10 190424.120
...  118個 ...
degrees:10 190426.963
============
============
============
============
============
============
============
============
degrees:10 190427.167
...  6個 ...
degrees:10 190427.167
... 5個 ...
degrees:10 190428.589
...  6個 ...
degrees:10 190428.589
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
============
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.994
...  6個 ...
degrees:10 190428.994
============
degrees:10 190429.194
...  6個 ...
degrees:10 190429.194
============
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
============
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
============
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
============
degrees:10 190429.800
============

4. 用調度器實現數據流的同步

Stopwatch sw = Stopwatch.StartNew();
// 模擬 UI同步上下文
AsyncContext.Run(() =>
{
    var options = new ExecutionDataflowBlockOptions
    {
        //使用次調度器,則代碼會放到建立線程的同步上下文上執行(如果當前同步上下文是UI Context 或 此例的AsyncContext)
        //運行和註釋下行運行觀察Creator和Executor線程Id的變化
        TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(),
    };
    var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
    System.Console.WriteLine($"Creator ThreadId: {Thread.CurrentThread.ManagedThreadId}.");
    var displayBlock = new ActionBlock<int>(result =>
    {
        // ListBox.Items.Add(result)
        System.Console.WriteLine($"Executor ThreadId: {Thread.CurrentThread.ManagedThreadId} res:{result}.");
    }, options);
    multiplyBlock.LinkTo(displayBlock);

    for (int i = 0; i < 5; i++)
    {
        multiplyBlock.Post(i);
        System.Console.WriteLine($"Post {i}");
    }
    multiplyBlock.Completion.Wait(2000);
});
System.Console.WriteLine($"Cost {sw.ElapsedMilliseconds}ms.");

輸出:

Creator ThreadId: 1.
Post 0
Post 1
Post 2
Post 3
Post 4
Executor ThreadId: 1 res:0.
Executor ThreadId: 1 res:2.
Executor ThreadId: 1 res:4.
Executor ThreadId: 1 res:6.
Executor ThreadId: 1 res:8.
Cost 2062ms.
相關文章
相關標籤/搜索