demo地址:BulkAllgit
實現目標:想要使用ElasticSearch的 .Net Api客戶端NEST批量導入數據,併發異步高效的批量導入
NEST提供了BulkAll
不廢話,上代碼github
const int size = 1000; var tokenSource = new CancellationTokenSource(); var observableBulk = elasticClient.BulkAll(list, f => f .MaxDegreeOfParallelism(8) .BackOffTime(TimeSpan.FromSeconds(10)) .BackOffRetries(2) .Size(size) .RefreshOnCompleted() .Index(indexName) .BufferToBulk((r, buffer) => r.IndexMany(buffer)) , tokenSource.Token); var countdownEvent = new CountdownEvent(1); Exception exception = null; var bulkAllObserver = new BulkAllObserver(); observableBulk.Subscribe(bulkAllObserver); countdownEvent.Wait(tokenSource.Token);
若是想要對處理導入過程進行監控能夠這麼替換BulkAllObserver
併發
var bulkAllObserver = new BulkAllObserver( onNext: response => { WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"); }, onError: ex => { WriteLine("BulkAll Error : {0}", ex); exception = ex; countdownEvent.Signal(); }, () => { WriteLine("BulkAll Finished"); countdownEvent.Signal(); });
還可使用C#的local function特性,以下所示異步
void OnCompleted() { WriteLine("BulkAll Finished"); countdownEvent.Signal(); } var bulkAllObserver = new BulkAllObserver( onNext: response => { WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"); }, onError: ex => { WriteLine("BulkAll Error : {0}", ex); exception = ex; countdownEvent.Signal(); }, OnCompleted);
完成demo,請點擊 BulkAll 查看code