[譯] 強化學習入門篇:Simmer 仿真平臺高級使用技巧

如何與環境交互

在仿真過程當中,許多 activity 是以函數的形式做爲參數傳入的。這些函數可能與環境交互,好比now函數用來提取環境當前的時間,get_capacity 函數用於提取環境中resource對應的容量,get_n_generated函數用於獲取生成器的狀態,或者用 get_mon 函數直接收集的歷史監測值。惟一須要注意的是,仿真環境必需要包含在軌跡之中,下面是一個錯誤示例:html

library(simmer)
library(simmer.plot)

t <- trajectory() %>%
  log_(function() as.character(now(env)))

env <- simmer() %>%
  add_generator("dummy", t, function() 1) %>%
  run(4)
#> 1: dummy0:
#> Error in now(env): object 'env' not found

由於,env 是全局變量,它沒法在運行時執行。仿真執行過程於仿真結果的賦值須要分開。在這個仿真用例中,環境 env 由軌跡 t 生成,能夠經過 run()方法將整個過程分離開來:git

t <- trajectory() %>%
  log_(function() as.character(now(env)))

env <- simmer() %>%
  add_generator("dummy", t, function() 1)

env %>% run(4) %>% invisible
#> 1: dummy0: 1
#> 2: dummy1: 2
#> 3: dummy2: 3

咱們獲取了預期結果。可是,做爲最佳實踐的通用規則,仍是建議環境在最初單獨初始化,這樣能夠避免沒必要要的錯誤,也使得代碼更具備可讀性:github

# 首先,初始化環境
env <- simmer()

# 生成軌跡
t <- trajectory() %>%
  log_(function() as.character(now(env)))

# 執行環境模擬過程
env %>%
  add_generator("dummy", t, function() 1) %>%
  run(4) %>% invisible
#> 1: dummy0: 1
#> 2: dummy1: 2
#> 3: dummy2: 3

行動集合

當生成器建立一個到達流的時候,它會給軌跡分配一個到達對象。軌跡在這裏的定義是由一個到達對象在系統中全生命週期的一系列行爲。一旦一個到達對象被分配到軌跡中,它一般會以必定的順序開始執行軌跡中的預期行爲,最後離開系統。好比:編程

patient_traj <- trajectory(name = "patient_trajectory") %>%
  seize(resource = "doctor", amount = 1) %>%
  timeout(task = 3) %>%
  release(resource = "doctor", amount = 1)

這裏咱們建立一個病人就醫3分鐘而後離開的例子。這是一個直截了當的例子,可是大部分軌跡相關的函數都在此基礎上演化高級用法,下面會一一介紹。segmentfault

此外, 建議你能夠嘗試下simmer的插件 simmer.bricks 包,它封裝了經常使用的一些軌跡。(見 simmer.bricks入門微信

log_()

log_(., message, level) 方法用來打印仿真過程當中的信息以輔助debug,經過不一樣的 level 能夠調整打印的層次:app

t <- trajectory() %>%
  log_("this is always printed") %>% # level = 0 by default
  log_("this is printed if `log_level>=1`", level = 1) %>%
  log_("this is printed if `log_level>=2`", level = 2)

simmer() %>%
  add_generator("dummy", t, at(0)) %>%
  run() %>% invisible
#> 0: dummy0: this is always printed

simmer(log_level = 1) %>%
  add_generator("dummy", t, at(0)) %>%
  run() %>% invisible
#> 0: dummy0: this is always printed
#> 0: dummy0: this is printed if `log_level>=1`

simmer(log_level = Inf) %>%
  add_generator("dummy", t, at(0)) %>%
  run() %>% invisible
#> 0: dummy0: this is always printed
#> 0: dummy0: this is printed if `log_level>=1`
#> 0: dummy0: this is printed if `log_level>=2`

set_attribute(), set_global()

set_attribute(., keys, values) 方法提供了設置到達流屬性的方法。keysvalues能夠以向量或者函數的形式返回。可是, values只可以以數值型表示。異步

patient_traj <- trajectory(name = "patient_trajectory") %>%
  set_attribute(keys = "my_key", values = 123) %>%
  timeout(5) %>%
  set_attribute(keys = "my_key", values = 456)

env <- simmer() %>%
  add_generator("patient", patient_traj, at(0), mon = 2) %>%
  run()

get_mon_attributes(env)
#>   time     name    key value replication
#> 1    0 patient0 my_key   123           1
#> 2    5 patient0 my_key   456           1

如上,軌跡的到達流在 0 時刻(經過 at 函數實現),僅包含 {my_key:123} 的屬性。add_generator的 參數 mon = 2表示對到達流的屬性進行持續觀察。咱們能夠用 get_mon_attributes 方法查看 my_key 對應的值在仿真過程當中的變化。ide

若是你想要設置一個存在依賴鏈路的屬性也是容許的。屬性能夠經過get_attribute(., keys) 的方式獲取。下面是一個實際用例:異步編程

patient_traj <- trajectory(name = "patient_trajectory") %>%
  set_attribute("my_key", 123) %>%
  timeout(5) %>%
  set_attribute("my_key", 1, mod="+") %>%
  timeout(5) %>%
  set_attribute("dependent_key", function() ifelse(get_attribute(env, "my_key")<=123, 1, 0)) %>%
  timeout(5) %>%
  set_attribute("independent_key", function() runif(1))

env<- simmer() %>%
  add_generator("patient", patient_traj, at(0), mon = 2)
env %>% run()
#> simmer environment: anonymous | now: 15 | next: 
#> { Monitor: in memory }
#> { Source: patient | monitored: 2 | n_generated: 1 }

get_mon_attributes(env)
#>   time     name             key       value replication
#> 1    0 patient0          my_key 123.0000000           1
#> 2    5 patient0          my_key 124.0000000           1
#> 3   10 patient0   dependent_key   0.0000000           1
#> 4   15 patient0 independent_key   0.5500812           1

對於每一次到達,屬性只對於到達者可見,其他人不可見。

writer <- trajectory() %>%
  set_attribute(keys = "my_key", values = 123)

reader <- trajectory() %>%
  log_(function() paste0(get_attribute(env, "my_key")))

env <- simmer() %>%
  add_generator("writer", writer, at(0), mon = 2) %>%
  add_generator("reader", reader, at(1), mon = 2)
env %>% run()
#> 1: reader0: NA
#> simmer environment: anonymous | now: 1 | next: 
#> { Monitor: in memory }
#> { Source: writer | monitored: 2 | n_generated: 1 }
#> { Source: reader | monitored: 2 | n_generated: 1 }

get_mon_attributes(env)
#>   time    name    key value replication
#> 1    0 writer0 my_key   123           1

所以,在前例中 reader 獲取的返回值是缺失值。不過,屬性也能夠經過 set_global(., keys, values) 全局變量聲明:

writer <- trajectory() %>%
  set_global(keys = "my_key", values = 123) 

reader <- trajectory() %>%
  log_(function() paste0(get_attribute(env, "my_key"), ", ", 
                         get_global(env, "my_key")))

env <- simmer() %>%
  add_generator("writer", writer, at(0), mon = 2) %>%
  add_generator("reader", reader, at(1), mon = 2)
env %>% run()
#> 1: reader0: NA, 123
#> simmer environment: anonymous | now: 1 | next: 
#> { Monitor: in memory }
#> { Source: writer | monitored: 2 | n_generated: 1 }
#> { Source: reader | monitored: 2 | n_generated: 1 }

get_mon_attributes(env)
#>   time name    key value replication
#> 1    0      my_key   123           1

如上顯示,全局變量經過 get_mon_attributes() 賦值未命名的鍵值對。

timeout(), timeout_from_attribute()

timeout(., task) 經過給軌跡分配必定的時間來延遲用戶的到達行爲,回顧以前最簡單的病人看病模型,經過賦予 task參數一個固定值實現超時機制。

patient_traj <- trajectory(name = "patient_trajectory") %>%
  timeout(task = 3)

env <- simmer() %>%
  add_generator("patient", patient_traj, at(0)) %>%
  run()

get_mon_arrivals(env)
#>       name start_time end_time activity_time finished replication
#> 1 patient0          0        3             3     TRUE           1

一般,超時是依賴於一個分佈假設或者經過 屬性進行設置的,它經過給 task 參數傳入一個函數實現。

patient_traj <- trajectory(name = "patient_trajectory") %>%
  set_attribute("health", function() sample(20:80, 1)) %>%
  # distribution-based timeout
  timeout(function() rexp(1, 10)) %>%
  # attribute-dependent timeout
  timeout(function() (100 - get_attribute(env, "health")) * 2)

env <- simmer() %>%
  add_generator("patient", patient_traj, at(0), mon = 2)
env %>% run()
#> simmer environment: anonymous | now: 52.123429586641 | next: 
#> { Monitor: in memory }
#> { Source: patient | monitored: 2 | n_generated: 1 }

get_mon_arrivals(env)
#>       name start_time end_time activity_time finished replication
#> 1 patient0          0 52.12343      52.12343     TRUE           1
get_mon_attributes(env)
#>   time     name    key value replication
#> 1    0 patient0 health    74           1

若是想經過 timeout() 方法動態地設置 task參數,須要經過回調函數的方式操做。好比 timeout(function() rexp(1, 10)),rexp(1, 10) 將被每次活動超時都執行。可是,若是你不經過回調函數方式操做,它只會以靜態值的方式在初始化的時候執行一次,好比 timeout(rexp(1, 10))

固然,經過回調函數的方式會使得代碼實現複雜功能,好比同時要檢查資源的狀態,和環境中其餘實體交互等等。一樣地,對於其餘活動類型,也都是能夠以泛函的方式操做。

若是你只須要延遲設置屬性值那麼能夠考慮 timeout_from_attribute(., key) 或者 timeout_from_global(., key), 所以,下面兩個個超時寫法是等價的,可是後者的顯然簡單不少。

traj <- trajectory() %>%
  set_attribute("delay", 2) %>%
  timeout(function() get_attribute(env, "delay")) %>%
  log_("first timeout") %>%
  timeout_from_attribute("delay") %>%
  log_("second timeout")

env <- simmer() %>%
  add_generator("dummy", traj, at(0))
env %>% run() %>% invisible
#> 2: dummy0: first timeout
#> 4: dummy0: second timeout

seize(), release()

seize(., resource, amount) 用於獲取指定數量的資源。相反地,release(., resource, amount) 用於釋放指定數量的資源。須要注意的是,爲了使用這些函數來指定資源,你須要在模擬環境中經過 add_resource 函數來初始化。

patient_traj <- trajectory(name = "patient_trajectory") %>%
  seize(resource = "doctor", amount = 1) %>%
  timeout(3) %>%
  release(resource = "doctor", amount = 1)

env <- simmer() %>%
  add_resource("doctor", capacity=1, mon = 1) %>%
  add_generator("patient", patient_traj, at(0)) %>%
  run()

get_mon_resources(env)
#>   resource time server queue capacity queue_size system limit replication
#> 1   doctor    0      1     0        1        Inf      1   Inf           1
#> 2   doctor    3      0     0        1        Inf      0   Inf           1

這裏 add_resource() 中的參數 mon=1 表示模擬環境監控資源使用狀況。使用 get_mon_resources(env) 能夠獲取資源在仿真系統中的日誌流水。

有時候,資源的獲取和釋放但願經過依賴的到達流屬性進行動態調整。爲了實現這個工恩呢該,你能夠在 amount參數中傳入get_attribute(.)來代替以前的固定值。

patient_traj <- trajectory(name = "patient_trajectory") %>%
  set_attribute("health", function() sample(20:80, 1)) %>%
  set_attribute("docs_to_seize", function() ifelse(get_attribute(env, "health")<50, 1, 2)) %>%
  seize("doctor", function() get_attribute(env, "docs_to_seize")) %>%
  timeout(3) %>%
  release("doctor", function() get_attribute(env, "docs_to_seize"))
#> Warning in is.na(env[[name]]): is.na() applied to non-(list or vector) of
#> type 'closure'
#> Warning in is.na(amount): is.na() applied to non-(list or vector) of type
#> 'closure'

env <- simmer() %>%
  add_resource("doctor", capacity = 2, mon = 1) %>%
  add_generator("patient", patient_traj, at(0), mon = 2)
env %>% run()
#> simmer environment: anonymous | now: 3 | next: 
#> { Monitor: in memory }
#> { Resource: doctor | monitored: 1 | server status: 0(2) | queue status: 0(Inf) }
#> { Source: patient | monitored: 2 | n_generated: 1 }

get_mon_resources(env)
#>   resource time server queue capacity queue_size system limit replication
#> 1   doctor    0      2     0        2        Inf      2   Inf           1
#> 2   doctor    3      0     0        2        Inf      0   Inf           1
get_mon_attributes(env)
#>   time     name           key value replication
#> 1    0 patient0        health    80           1
#> 2    0 patient0 docs_to_seize     2           1

默認狀況下,seize() 失敗會致使拒絕到達。下面的例子中,第二位病人嘗試找僅有的一名正在給另一位病人看病的醫生看病,在沒有等候區的狀況下就會發生拒絕。

patient_traj <- trajectory(name = "patient_trajectory") %>%
  log_("arriving...") %>%
  seize("doctor", 1) %>%
  # the second patient won't reach this point
  log_("doctor seized") %>%
  timeout(5) %>%
  release("doctor", 1)

env <- simmer() %>%
  add_resource("doctor", capacity = 1, queue_size = 0) %>%
  add_generator("patient", patient_traj, at(0, 1)) %>%
  run()
#> 0: patient0: arriving...
#> 0: patient0: doctor seized
#> 1: patient1: arriving...

get_mon_arrivals(env)
#>       name start_time end_time activity_time finished replication
#> 1 patient1          1        1             0    FALSE           1
#> 2 patient0          0        5             5     TRUE           1
get_mon_resources(env)
#>   resource time server queue capacity queue_size system limit replication
#> 1   doctor    0      1     0        1          0      1     1           1
#> 2   doctor    5      0     0        1          0      0     1           1

有時,你不想拒毫不成功的seize(),能夠提供另一條路徑。好比在例子中,咱們改成第二名病人也能夠先去看看護士:

patient_traj <- trajectory(name = "patient_trajectory") %>%
  log_("arriving...") %>%
  seize("doctor", 1, continue = FALSE,
        reject = trajectory("rejected patient") %>%
          log_("rejected!") %>%
          seize("nurse", 1) %>%
          log_("nurse seized") %>%
          timeout(2) %>%
          release("nurse", 1)) %>%
  # the second patient won't reach this point
  log_("doctor seized") %>%
  timeout(5) %>%
  release("doctor", 1)

env <- simmer() %>%
  add_resource("doctor", capacity = 1, queue_size = 0) %>%
  add_resource("nurse", capacity = 10, queue_size = 0) %>%
  add_generator("patient", patient_traj, at(0, 1)) %>%
  run()
#> 0: patient0: arriving...
#> 0: patient0: doctor seized
#> 1: patient1: arriving...
#> 1: patient1: rejected!
#> 1: patient1: nurse seized

get_mon_arrivals(env)
#>       name start_time end_time activity_time finished replication
#> 1 patient1          1        3             2     TRUE           1
#> 2 patient0          0        5             5     TRUE           1
get_mon_resources(env)
#>   resource time server queue capacity queue_size system limit replication
#> 1   doctor    0      1     0        1          0      1     1           1
#> 2    nurse    1      1     0       10          0      1    10           1
#> 3    nurse    3      0     0       10          0      0    10           1
#> 4   doctor    5      0     0        1          0      0     1           1

continue 標記意味着不管是否 reject發生,子軌跡都會緊跟着主軌跡執行。在這個例子中,continue=FALSE 意味着被拒絕的到達流獲取護士和釋放護士後就完全結束了到達流的生命週期。不然,它將繼續在主軌跡中執行行動。

注意第二位病人可能也會持續嘗試,若是他執意想看這位醫生。

patient_traj <- trajectory(name = "patient_trajectory") %>%
  log_("arriving...") %>%
  seize("doctor", 1, continue = FALSE,
        reject = trajectory("rejected patient") %>%
          log_("rejected!") %>%
          # go for a walk and try again
          timeout(2) %>%
          log_("retrying...") %>%
          rollback(amount = 4, times = Inf)) %>%
  # the second patient will reach this point after a couple of walks
  log_("doctor seized") %>%
  timeout(5) %>%
  release("doctor", 1) %>%
  log_("leaving")

env <- simmer() %>%
  add_resource("doctor", capacity = 1, queue_size = 0) %>%
  add_generator("patient", patient_traj, at(0, 1)) %>%
  run()
#> 0: patient0: arriving...
#> 0: patient0: doctor seized
#> 1: patient1: arriving...
#> 1: patient1: rejected!
#> 3: patient1: retrying...
#> 3: patient1: rejected!
#> 5: patient1: retrying...
#> 5: patient0: leaving
#> 5: patient1: doctor seized
#> 10: patient1: leaving

get_mon_arrivals(env)
#>       name start_time end_time activity_time finished replication
#> 1 patient0          0        5             5     TRUE           1
#> 2 patient1          1       10             9     TRUE           1
get_mon_resources(env)
#>   resource time server queue capacity queue_size system limit replication
#> 1   doctor    0      1     0        1          0      1     1           1
#> 2   doctor    5      0     0        1          0      0     1           1
#> 3   doctor    5      1     0        1          0      1     1           1
#> 4   doctor   10      0     0        1          0      0     1           1

post.seize 是另外一個可能的子軌跡選項,它在成功執行 seize() 後被執行。

patient_traj <- trajectory(name = "patient_trajectory") %>%
  log_("arriving...") %>%
  seize("doctor", 1, continue = c(TRUE, TRUE),
        post.seize = trajectory("admitted patient") %>%
          log_("admitted") %>%
          timeout(5) %>%
          release("doctor", 1),
        reject = trajectory("rejected patient") %>%
          log_("rejected!") %>%
          seize("nurse", 1) %>%
          timeout(2) %>%
          release("nurse", 1)) %>%
  # both patients will reach this point, as continue = c(TRUE, TRUE)
  timeout(10) %>%
  log_("leaving...")

env <- simmer() %>%
  add_resource("doctor", capacity = 1, queue_size = 0) %>%
  add_resource("nurse", capacity = 10, queue_size = 0) %>%
  add_generator("patient", patient_traj, at(0, 1)) %>%
  run()
#> 0: patient0: arriving...
#> 0: patient0: admitted
#> 1: patient1: arriving...
#> 1: patient1: rejected!
#> 13: patient1: leaving...
#> 15: patient0: leaving...

get_mon_arrivals(env)
#>       name start_time end_time activity_time finished replication
#> 1 patient1          1       13            12     TRUE           1
#> 2 patient0          0       15            15     TRUE           1
get_mon_resources(env)
#>   resource time server queue capacity queue_size system limit replication
#> 1   doctor    0      1     0        1          0      1     1           1
#> 2    nurse    1      1     0       10          0      1    10           1
#> 3    nurse    3      0     0       10          0      0    10           1
#> 4   doctor    5      0     0        1          0      0     1           1

set_capacity(), set_queue_size()

set_capacity(., resource, value) 能夠設置資源容量,set_queue_size(., resource, value) 則能夠設置隊列長度。注意,在使用這些函數以前,要記得在環境初始化時經過 add_resource 初始化資源,一樣這裏也支持靜態和動態兩種類型的賦值模式。

這些行爲頗有意思,它引入了動態變化的資源。例如,兩個軌跡爭取資源的能力:

set.seed(12345)

t1 <- trajectory() %>%
  seize("res1", 1) %>%
  set_capacity(resource = "res1", value = 1, mod="+") %>%
  set_capacity(resource = "res2", value = -1, mod="+") %>%
  timeout(function() rexp(1, 1)) %>%
  release("res1", 1)

t2 <- trajectory() %>%
  seize("res2", 1) %>%
  set_capacity(resource = "res2", value = 1, mod="+") %>%
  set_capacity(resource = "res1", value = -1, mod="+") %>%
  timeout(function() rexp(1, 1)) %>%
  release("res2", 1)

env <- simmer() %>%
  add_resource("res1", capacity = 20, queue_size = Inf) %>%
  add_resource("res2", capacity = 20, queue_size = Inf) %>%
  add_generator("t1_", t1, function() rexp(1, 1)) %>%
  add_generator("t2_", t2, function() rexp(1, 1)) %>%
  run(100)

plot(get_mon_resources(env), "usage", c("res1", "res2"), steps = TRUE)

clipboard.png

select()

當資源在環境中事先分配時,seize(), release(), set_capacity()set_queue_size() 能夠順利使用,但有時候資源也須要經過一些策略動態選擇。好比下面的狀況,select(., resources, policy, id)方法提供了選擇資源的一種方法,根據特定策略來選擇:
seize_selected(), release_selected(),set_capacity_selected(),set_queue_size_selected()

patient_traj <- trajectory(name = "patient_trajectory") %>%
  select(resources = c("doctor1", "doctor2", "doctor3"), policy = "round-robin") %>%
  set_capacity_selected(1) %>%
  seize_selected(amount = 1) %>%
  timeout(5) %>%
  release_selected(amount = 1)

env <- simmer() %>%
  add_resource("doctor1", capacity = 0) %>%
  add_resource("doctor2", capacity = 0) %>%
  add_resource("doctor3", capacity = 0) %>%
  add_generator("patient", patient_traj, at(0, 1, 2)) %>%
  run()

get_mon_arrivals(env)
#>       name start_time end_time activity_time finished replication
#> 1 patient0          0        5             5     TRUE           1
#> 2 patient1          1        6             5     TRUE           1
#> 3 patient2          2        7             5     TRUE           1
get_mon_resources(env)
#>   resource time server queue capacity queue_size system limit replication
#> 1  doctor1    0      0     0        1        Inf      0   Inf           1
#> 2  doctor1    0      1     0        1        Inf      1   Inf           1
#> 3  doctor2    1      0     0        1        Inf      0   Inf           1
#> 4  doctor2    1      1     0        1        Inf      1   Inf           1
#> 5  doctor3    2      0     0        1        Inf      0   Inf           1
#> 6  doctor3    2      1     0        1        Inf      1   Inf           1
#> 7  doctor1    5      0     0        1        Inf      0   Inf           1
#> 8  doctor2    6      0     0        1        Inf      0   Inf           1
#> 9  doctor3    7      0     0        1        Inf      0   Inf           1

若是你提供給 select()提供一組動態的資源,那麼後續能夠經過 seize_selected()調整獲取資源的策略。

patient_traj <- trajectory(name = "patient_trajectory") %>%
  set_attribute("resource", function() sample(1:3, 1)) %>%
  select(resources = function() paste0("doctor", get_attribute(env, "resource"))) %>%
  seize_selected(amount = 1) %>%
  timeout(5) %>%
  release_selected(amount = 1)

env <- simmer() %>%
  add_resource("doctor1", capacity = 1) %>%
  add_resource("doctor2", capacity = 1) %>%
  add_resource("doctor3", capacity = 1) %>%
  add_generator("patient", patient_traj, at(0, 1, 2), mon = 2)
env %>% run()
#> simmer environment: anonymous | now: 10 | next: 
#> { Monitor: in memory }
#> { Resource: doctor1 | monitored: TRUE | server status: 0(1) | queue status: 0(Inf) }
#> { Resource: doctor2 | monitored: TRUE | server status: 0(1) | queue status: 0(Inf) }
#> { Resource: doctor3 | monitored: TRUE | server status: 0(1) | queue status: 0(Inf) }
#> { Source: patient | monitored: 2 | n_generated: 3 }

get_mon_attributes(env)
#>   time     name      key value replication
#> 1    0 patient0 resource     3           1
#> 2    1 patient1 resource     3           1
#> 3    2 patient2 resource     2           1
get_mon_arrivals(env)
#>       name start_time end_time activity_time finished replication
#> 1 patient0          0        5             5     TRUE           1
#> 2 patient2          2        7             5     TRUE           1
#> 3 patient1          1       10             5     TRUE           1
get_mon_resources(env)
#>   resource time server queue capacity queue_size system limit replication
#> 1  doctor3    0      1     0        1        Inf      1   Inf           1
#> 2  doctor3    1      1     1        1        Inf      2   Inf           1
#> 3  doctor2    2      1     0        1        Inf      1   Inf           1
#> 4  doctor3    5      1     0        1        Inf      1   Inf           1
#> 5  doctor2    7      0     0        1        Inf      0   Inf           1
#> 6  doctor3   10      0     0        1        Inf      0   Inf           1

activate(), deactivate()

activate(., source)deactivate(., source) 方法可以分別按照ID來開始和暫停活動。這個名字能夠提供一個字符串或一個函數返回一個字符串。在如下簡單的例子中,使用這些方法經過設置固定的時間間隔 1 來體現:

t <- trajectory() %>%
  deactivate(source = "dummy") %>%
  timeout(1) %>%
  activate(source = "dummy")

simmer() %>%
  add_generator("dummy", t, function() 1) %>%
  run(10) %>%
  get_mon_arrivals()
#>     name start_time end_time activity_time finished replication
#> 1 dummy0          1        2             1     TRUE           1
#> 2 dummy1          3        4             1     TRUE           1
#> 3 dummy2          5        6             1     TRUE           1
#> 4 dummy3          7        8             1     TRUE           1

set_trajectory(), set_source()

set_trajectory(., source, trajectory)set_source(., source, object) 方法提供了單獨地軌跡切換的方法。 source 能夠是一個固定的字符串ID也能夠經過函數動態生成字符串ID。

在下面的分佈中,t2 切換分佈到 t1,t2 只有首次到達時被執行。

t1 <- trajectory() %>%
  timeout(1)

t2 <- trajectory() %>%
  set_source("dummy", function() 1) %>%
  set_trajectory("dummy", t1) %>%
  timeout(2)

simmer() %>%
  add_generator("dummy", trajectory = t2, distribution = function() 2) %>%
  run(10) %>%
  get_mon_arrivals()
#>     name start_time end_time activity_time finished replication
#> 1 dummy0          2        4             2     TRUE           1
#> 2 dummy1          3        4             1     TRUE           1
#> 3 dummy2          4        5             1     TRUE           1
#> 4 dummy3          5        6             1     TRUE           1
#> 5 dummy4          6        7             1     TRUE           1
#> 6 dummy5          7        8             1     TRUE           1
#> 7 dummy6          8        9             1     TRUE           1

set_prioritization()

add_generator() 經過給到達流賦予優先級的方式控制。set_prioritization(., values)get_prioritization(.) 方法能夠在軌跡中的任意一個節點中改變/獲取優先級。

set_attribute("priority", 3) %>%
  # static values
  set_prioritization(values = c(3, 7, TRUE)) %>%
  # dynamically with a function
  set_prioritization(values = function() {
    prio <- get_prioritization(env)
    attr <- get_attribute(env, "priority")
    c(attr, prio[[2]]+1, FALSE)
  })

branch()

The branch(., option, continue, ...) 提供在軌跡中️以必定機率添加替代路徑的方法。下面的例子顯示一個到達在軌跡中被隨機分叉:

t1 <- trajectory("trajectory with a branch") %>%
  seize("server", 1) %>%
  branch(option = function() sample(1:2, 1), continue = c(T, F), 
         trajectory("branch1") %>%
           timeout(function() 1),
         trajectory("branch2") %>%
           timeout(function() rexp(1, 3)) %>%
           release("server", 1)
  ) %>%
  release("server", 1)

當到達流被分叉,第一個參數 option 是用來傳後續的具體路徑的機率值,所以它必須是可執行的,返回值須要是在1到n條路徑之間。第二個參數 continue 表示在選擇路徑後是否到達必須繼續執行活動。上述例子中,只有第一個路徑會走到最後的 release() 流程。

有時,你可能須要統計一條肯定軌跡在一個肯定的分支上進入多少次,或者到達流進入那條軌跡花費了多少時間。對於這種場景,處於計數需求,能夠資源容量設置爲無限,以下舉例:

t0 <- trajectory() %>%
  branch(function() sample(c(1, 2), 1), continue = c(T, T),
         trajectory() %>%
           seize("branch1", 1) %>%
           # do stuff here
           timeout(function() rexp(1, 1)) %>%
           release("branch1", 1),
         trajectory() %>%
           seize("branch2", 1) %>%
           # do stuff here
           timeout(function() rexp(1, 1/2)) %>%
           release("branch2", 1))

env <- simmer() %>%
  add_generator("dummy", t0, at(rep(0, 1000))) %>%
  # Resources with infinite capacity, just for accounting purposes
  add_resource("branch1", Inf) %>%
  add_resource("branch2", Inf) %>%
  run()

arrivals <- get_mon_arrivals(env, per_resource = T)

# Times that each branch was entered
table(arrivals$resource)
#> 
#> branch1 branch2 
#>     496     504

# The `activity_time` is the total time inside each branch for each arrival
# Let's see the distributions
ggplot(arrivals) + geom_histogram(aes(x=activity_time)) + facet_wrap(~resource)

rollback()

rollback(., amount, times, check) 回滾方法容許到達流軌跡回滾若干步,好比一個字符串在超時函數中被打印出來,在第一次執行後,軌跡再回滾3次(所以總共打印 "Hello" 4次)。

t0 <- trajectory() %>%
  log_("Hello!") %>%
  timeout(1) %>%
  rollback(amount = 2, times = 3)

simmer() %>%
  add_generator("hello_sayer", t0, at(0)) %>% 
  run() %>% invisible
#> 0: hello_sayer0: Hello!
#> 1: hello_sayer0: Hello!
#> 2: hello_sayer0: Hello!
#> 3: hello_sayer0: Hello!

rollback() 方法也接受一個選項 check 來覆蓋默認的基於數值的行爲。該方法能夠傳入一個返回邏輯值的函數。每次一個到達接收到活動,check 都會判斷一下是否以指定步長調用 rollback()回滾:

t0 <- trajectory() %>%
  set_attribute("happiness", 0) %>%
  log_(function() {
    level <- get_attribute(env, "happiness")
    paste0(">> Happiness level is at: ", level, " -- ", 
           ifelse(level<25,"PETE: I'm feeling crappy...",
                  ifelse(level<50,"PETE: Feelin' a bit moody",
                         ifelse(level<75,"PETE: Just had a good espresso",
                                "PETE: Let's do this! (and stop this loop...)"))))
  }) %>%
  set_attribute("happiness", 25, mod="+") %>%
  rollback(amount = 2, check = function() get_attribute(env, "happiness") < 100)

env <- simmer() %>%
  add_generator("mood_swinger", t0, at(0))
env %>% run() %>% invisible()
#> 0: mood_swinger0: >> Happiness level is at: 0 -- PETE: I'm feeling crappy...
#> 0: mood_swinger0: >> Happiness level is at: 25 -- PETE: Feelin' a bit moody
#> 0: mood_swinger0: >> Happiness level is at: 50 -- PETE: Just had a good espresso
#> 0: mood_swinger0: >> Happiness level is at: 75 -- PETE: Let's do this! (and stop this loop...)

leave()

leave(., prob) 容許一個到達以必定機率離開整個軌跡:

patient_traj <- trajectory(name = "patient_trajectory") %>%
  seize("nurse", 1) %>%
  timeout(3) %>%
  release("nurse", 1) %>%
  log_("before leave") %>%
  leave(prob = 1) %>%
  log_("after leave") %>%
  # patients will never seize the doctor
  seize("doctor", 1) %>%
  timeout(3) %>%
  release("doctor", 1)

env <- simmer() %>%
  add_resource("nurse", capacity=1) %>%
  add_resource("doctor", capacity=1) %>%
  add_generator("patient", patient_traj, at(0)) %>%
  run()
#> 3: patient0: before leave

get_mon_resources(env)
#>   resource time server queue capacity queue_size system limit replication
#> 1    nurse    0      1     0        1        Inf      1   Inf           1
#> 2    nurse    3      0     0        1        Inf      0   Inf           1

固然, 機率也能夠動態調整:

set.seed(1234)

patient_traj <- trajectory(name = "patient_trajectory") %>%
  seize("nurse", 1) %>%
  timeout(3) %>%
  release("nurse", 1) %>%
  log_("before leave") %>%
  leave(prob = function() runif(1) < 0.5) %>%
  log_("after leave") %>%
  # some patients will seize the doctor
  seize("doctor", 1) %>%
  timeout(3) %>%
  release("doctor", 1)

env <- simmer() %>%
  add_resource("nurse", capacity=1) %>%
  add_resource("doctor", capacity=1) %>%
  add_generator("patient", patient_traj, at(0, 1)) %>%
  run()
#> 3: patient0: before leave
#> 6: patient1: before leave
#> 6: patient1: after leave

get_mon_arrivals(env)
#>       name start_time end_time activity_time finished replication
#> 1 patient0          0        3             3    FALSE           1
#> 2 patient1          1        9             6     TRUE           1
get_mon_resources(env)
#>   resource time server queue capacity queue_size system limit replication
#> 1    nurse    0      1     0        1        Inf      1   Inf           1
#> 2    nurse    1      1     1        1        Inf      2   Inf           1
#> 3    nurse    3      1     0        1        Inf      1   Inf           1
#> 4    nurse    6      0     0        1        Inf      0   Inf           1
#> 5   doctor    6      1     0        1        Inf      1   Inf           1
#> 6   doctor    9      0     0        1        Inf      0   Inf           1

clone(), synchronize()

clone(., n, ...) 提供複製 n-1 次到達機率的方法來並行處理子軌跡。
synchronize(., wait, mon_all) 提供同步的方法來去除副本。默認,synchronize() 等待全部副本到達而且容許最後一個繼續執行:

t <- trajectory() %>%
  clone(n = 3,
        trajectory("original") %>%
          timeout(1),
        trajectory("clone 1") %>%
          timeout(2),
        trajectory("clone 2") %>%
          timeout(3)) %>%
  synchronize(wait = TRUE) %>%
  timeout(0.5)

env <- simmer(verbose = TRUE) %>%
  add_generator("arrival", t, at(0)) %>%
  run()
#>          0 |    source: arrival          |       new: arrival0         | 0
#>          0 |   arrival: arrival0         |  activity: Clone            | 3, 3 paths
#>          0 |   arrival: arrival0         |  activity: Timeout          | 1
#>          0 |   arrival: arrival0         |  activity: Timeout          | 2
#>          0 |   arrival: arrival0         |  activity: Timeout          | 3
#>          1 |   arrival: arrival0         |  activity: Synchronize      | 1
#>          2 |   arrival: arrival0         |  activity: Synchronize      | 1
#>          3 |   arrival: arrival0         |  activity: Synchronize      | 1
#>          3 |   arrival: arrival0         |  activity: Timeout          | 0.5

get_mon_arrivals(env)
#>       name start_time end_time activity_time finished replication
#> 1 arrival0          0      3.5           3.5     TRUE           1

注意,參數 n 也能夠是一個函數,若是有子軌跡須要clone,那麼重複的子軌跡不須要反覆聲明。若是子軌跡數量小於 clone 數量,部分clone將直接繼續下一個行動:

t <- trajectory() %>%
  clone(n = 3,
        trajectory("original") %>%
          timeout(1),
        trajectory("clone 1") %>%
          timeout(2)) %>%
  synchronize(wait = TRUE) %>%
  timeout(0.5)

env <- simmer(verbose = TRUE) %>%
  add_generator("arrival", t, at(0)) %>%
  run()
#>          0 |    source: arrival          |       new: arrival0         | 0
#>          0 |   arrival: arrival0         |  activity: Clone            | 3, 2 paths
#>          0 |   arrival: arrival0         |  activity: Timeout          | 1
#>          0 |   arrival: arrival0         |  activity: Timeout          | 2
#>          0 |   arrival: arrival0         |  activity: Synchronize      | 1
#>          1 |   arrival: arrival0         |  activity: Synchronize      | 1
#>          2 |   arrival: arrival0         |  activity: Synchronize      | 1
#>          2 |   arrival: arrival0         |  activity: Timeout          | 0.5

get_mon_arrivals(env)
#>       name start_time end_time activity_time finished replication
#> 1 arrival0          0      2.5           2.5     TRUE           1

若是預期爲弱依賴,但願最快完成副本任務,那麼 synchronize() 能夠設置 wait = FALSE:

t <- trajectory() %>%
  clone(n = 3,
        trajectory("original") %>%
          timeout(1),
        trajectory("clone 1") %>%
          timeout(2),
        trajectory("clone 2") %>%
          timeout(3)) %>%
  synchronize(wait = FALSE) %>%
  timeout(0.5)

env <- simmer(verbose = TRUE) %>%
  add_generator("arrival", t, at(0)) %>%
  run()
#>          0 |    source: arrival          |       new: arrival0         | 0
#>          0 |   arrival: arrival0         |  activity: Clone            | 3, 3 paths
#>          0 |   arrival: arrival0         |  activity: Timeout          | 1
#>          0 |   arrival: arrival0         |  activity: Timeout          | 2
#>          0 |   arrival: arrival0         |  activity: Timeout          | 3
#>          1 |   arrival: arrival0         |  activity: Synchronize      | 0
#>          1 |   arrival: arrival0         |  activity: Timeout          | 0.5
#>          2 |   arrival: arrival0         |  activity: Synchronize      | 0
#>          3 |   arrival: arrival0         |  activity: Synchronize      | 0

get_mon_arrivals(env)
#>       name start_time end_time activity_time finished replication
#> 1 arrival0          0      1.5           1.5     TRUE           1

synchronize() 默認不記錄被移除的 clone信息 (mon_all=FALSE),可是若是須要能夠經過修改 mon_all=TRUE 來實現:

t <- trajectory() %>%
  clone(n = 3,
        trajectory("original") %>%
          timeout(1),
        trajectory("clone 1") %>%
          timeout(2),
        trajectory("clone 2") %>%
          timeout(3)) %>%
  synchronize(wait = FALSE, mon_all = TRUE) %>%
  timeout(0.5)

env <- simmer(verbose = TRUE) %>%
  add_generator("arrival", t, at(0)) %>%
  run()
#>          0 |    source: arrival          |       new: arrival0         | 0
#>          0 |   arrival: arrival0         |  activity: Clone            | 3, 3 paths
#>          0 |   arrival: arrival0         |  activity: Timeout          | 1
#>          0 |   arrival: arrival0         |  activity: Timeout          | 2
#>          0 |   arrival: arrival0         |  activity: Timeout          | 3
#>          1 |   arrival: arrival0         |  activity: Synchronize      | 0
#>          1 |   arrival: arrival0         |  activity: Timeout          | 0.5
#>          2 |   arrival: arrival0         |  activity: Synchronize      | 0
#>          3 |   arrival: arrival0         |  activity: Synchronize      | 0

get_mon_arrivals(env)
#>       name start_time end_time activity_time finished replication
#> 1 arrival0          0      1.5           1.5     TRUE           1
#> 2 arrival0          0      2.0           2.0     TRUE           1
#> 3 arrival0          0      3.0           3.0     TRUE           1

batch(), separate()

batch(., n, timeout, permanent, name, rule) 提供以必定機率收集必定數量的到達流後批量處理的方法。而後,經過 separate(.) 方法來分離以前創建的臨時批次。這容許咱們實現一個過山車過程,舉例:

有一個10人座的過山車,隊列是20人排隊參與,每次玩過山車持續5分鐘,咱們能夠將問題按以下方式建模:

set.seed(1234)

t <- trajectory() %>%
  batch(10, timeout = 5, permanent = FALSE) %>%
  seize("rollercoaster", 1) %>%
  timeout(5) %>%
  release("rollercoaster", 1) %>%
  separate()

env <- simmer() %>%
  # capacity and queue_size are defined in batches of 10
  add_resource("rollercoaster", capacity = 1, queue_size = 2) %>%
  add_generator("person", t, function() rexp(1, 2)) %>%
  run(15)

get_mon_arrivals(env, per_resource = TRUE)
#>        name start_time  end_time activity_time      resource replication
#> 1   person0   3.800074  8.800074             5 rollercoaster           1
#> 2   person1   3.800074  8.800074             5 rollercoaster           1
#> 3   person2   3.800074  8.800074             5 rollercoaster           1
#> 4   person3   3.800074  8.800074             5 rollercoaster           1
#> 5   person4   3.800074  8.800074             5 rollercoaster           1
#> 6   person5   3.800074  8.800074             5 rollercoaster           1
#> 7   person6   3.800074  8.800074             5 rollercoaster           1
#> 8   person7   3.800074  8.800074             5 rollercoaster           1
#> 9   person8   3.800074  8.800074             5 rollercoaster           1
#> 10  person9   3.800074  8.800074             5 rollercoaster           1
#> 11 person10   8.800074 13.800074             5 rollercoaster           1
#> 12 person11   8.800074 13.800074             5 rollercoaster           1
#> 13 person12   8.800074 13.800074             5 rollercoaster           1
#> 14 person13   8.800074 13.800074             5 rollercoaster           1
#> 15 person14   8.800074 13.800074             5 rollercoaster           1
#> 16 person15   8.800074 13.800074             5 rollercoaster           1

這裏建立了 3 個批次,前10我的都是在3.8分鐘同時上車的。而後在第一波遊玩結束時, 只有6我的在等待,可是 batch() 設置的計時器 timeout=5 已經到時了,另一波遊客就能夠發動了。由於這個 batch設置了 (permanent=FALSE),因此能夠用 separate() 方法將隊列切開。

固然具體的rule參數也能夠用更精細粒度的選擇哪些遊客須要被組成一個批次。對於每一個特定的到達,默認都是一 rule = TRUE 返回。上面的例子,也能夠經過 rule = FALSE,避免和其餘乘客同時玩一個過山車。

t_batch <- trajectory() %>%
  batch(10, timeout = 5, permanent = FALSE, rule = function() FALSE) %>%
  seize("rollercoaster", 1) %>%
  timeout(5) %>%
  release("rollercoaster", 1) %>%
  separate()

t_nobatch <- trajectory() %>%
  seize("rollercoaster", 1) %>%
  timeout(5) %>%
  release("rollercoaster", 1)

set.seed(1234)

env_batch <- simmer() %>%
  # capacity and queue_size are defined in batches of 10
  add_resource("rollercoaster", capacity = 1, queue_size = 2) %>%
  add_generator("person", t_batch, function() rexp(1, 2)) %>%
  run(15)

set.seed(1234)

env_nobatch <- simmer() %>%
  # capacity and queue_size are defined in batches of 10
  add_resource("rollercoaster", capacity = 1, queue_size = 2) %>%
  add_generator("person", t_nobatch, function() rexp(1, 2)) %>%
  run(15)

get_mon_arrivals(env_batch, per_resource = TRUE)
#>      name start_time  end_time activity_time      resource replication
#> 1 person0   1.250879  6.250879             5 rollercoaster           1
#> 2 person1   1.374259 11.250879             5 rollercoaster           1
get_mon_arrivals(env_nobatch, per_resource = TRUE)
#>      name start_time  end_time activity_time      resource replication
#> 1 person0   1.250879  6.250879             5 rollercoaster           1
#> 2 person1   1.374259 11.250879             5 rollercoaster           1

默認,批次的 name 參數爲空,它表示每一個乘客是獨立的,可是,有趣的是怎麼給不一樣軌跡賦予相同批次。好比,咱們能夠嘗試:

t0 <- trajectory() %>%
  batch(2) %>%
  timeout(2) %>%
  separate()

t1 <- trajectory() %>%
  timeout(1) %>%
  join(t0)

env <- simmer(verbose = TRUE) %>%
  add_generator("t0_", t0, at(0)) %>%
  add_generator("t1_", t1, at(0)) %>%
  run()
#>          0 |    source: t0_              |       new: t0_0             | 0
#>          0 |   arrival: t0_0             |  activity: Batch            | 2, 0, 0, 
#>          0 |    source: t1_              |       new: t1_0             | 0
#>          0 |   arrival: t1_0             |  activity: Timeout          | 1
#>          1 |   arrival: t1_0             |  activity: Batch            | 2, 0, 0,

get_mon_arrivals(env)
#> [1] name          start_time    end_time      activity_time finished     
#> <0 rows> (or 0-length row.names)

咱們沒有得到預期的兩個不一樣批次結果。t1 緊跟着 t0 到達,則意味着實際狀況是下面這樣:

t0 <- trajectory() %>%
  batch(2) %>%
  timeout(2) %>%
  separate()

t1 <- trajectory() %>%
  timeout(1) %>%
  batch(2) %>%
  timeout(2) %>%
  separate()

所以到達流緊隨着一個不一樣軌跡將終止在一個不一樣批次上。除非,有一個方法共享 batch()的行動,如今能夠經過 name 參數實現。

t0 <- trajectory() %>%
  batch(2, name = "mybatch") %>%
  timeout(2) %>%
  separate()

t1 <- trajectory() %>%
  timeout(1) %>%
  batch(2, name = "mybatch") %>%
  timeout(2) %>%
  separate()

env <- simmer(verbose = TRUE) %>%
  add_generator("t0_", t0, at(0)) %>%
  add_generator("t1_", t1, at(0)) %>%
  run()
#>          0 |    source: t0_              |       new: t0_0             | 0
#>          0 |   arrival: t0_0             |  activity: Batch            | 2, 0, 0, mybatch
#>          0 |    source: t1_              |       new: t1_0             | 0
#>          0 |   arrival: t1_0             |  activity: Timeout          | 1
#>          1 |   arrival: t1_0             |  activity: Batch            | 2, 0, 0, mybatch
#>          1 |   arrival: batch_mybatch    |  activity: Timeout          | 2
#>          3 |   arrival: batch_mybatch    |  activity: Separate         |

get_mon_arrivals(env)
#>   name start_time end_time activity_time finished replication
#> 1 t0_0          0        3             2     TRUE           1
#> 2 t1_0          0        3             3     TRUE           1
Or, equivalently,

t0 <- trajectory() %>%
  batch(2, name = "mybatch") %>%
  timeout(2) %>%
  separate()

t1 <- trajectory() %>%
  timeout(1) %>%
  join(t0)

env <- simmer(verbose = TRUE) %>%
  add_generator("t0_", t0, at(0)) %>%
  add_generator("t1_", t1, at(0)) %>%
  run()
#>          0 |    source: t0_              |       new: t0_0             | 0
#>          0 |   arrival: t0_0             |  activity: Batch            | 2, 0, 0, mybatch
#>          0 |    source: t1_              |       new: t1_0             | 0
#>          0 |   arrival: t1_0             |  activity: Timeout          | 1
#>          1 |   arrival: t1_0             |  activity: Batch            | 2, 0, 0, mybatch
#>          1 |   arrival: batch_mybatch    |  activity: Timeout          | 2
#>          3 |   arrival: batch_mybatch    |  activity: Separate         |

get_mon_arrivals(env)
#>   name start_time end_time activity_time finished replication
#> 1 t0_0          0        3             2     TRUE           1
#> 2 t1_0          0        3             3     TRUE           1

send(), trap(), untrap(), wait()

這組行動容許異步編程。經過 send(., signals, delay) 廣播一個或者一組信號給到每一個訂閱信息的到達流。信號能夠當即被觸發:

t <- trajectory() %>%
  send(signals = c("signal1", "signal2"))

simmer(verbose = TRUE) %>%
  add_generator("signaler", t, at(0)) %>%
  run() %>% invisible
#>          0 |    source: signaler         |       new: signaler0        | 0
#>          0 |   arrival: signaler0        |  activity: Send             | [signal1, signal2], 0
#>          0 |      task: Broadcast        |          :                  |

或者安排在一些延遲以後:

t <- trajectory() %>%
  send(signals = c("signal1", "signal2"), delay = 3)

simmer(verbose = TRUE) %>%
  add_generator("signaler", t, at(0)) %>%
  run() %>% invisible
#>          0 |    source: signaler         |       new: signaler0        | 0
#>          0 |   arrival: signaler0        |  activity: Send             | [signal1, signal2], 3
#>          3 |      task: Broadcast        |          :                  |

注意,這兩個參數,signalsdelay,能夠是函數,所以他們能夠從到達流中獲取的屬性值。

若是無人監聽,廣播其實沒意義。到達流訂閱廣播而後能夠用 trap(., signals, handler, interruptible) 來賦予一個處理器。在下面的例子中,一個到達流訂閱一個信號而且阻塞知道收到 wait(.) 方法。

t_blocked <- trajectory() %>%
  trap("you shall pass") %>%
  log_("waiting...") %>%
  wait() %>%
  log_("continuing!")

t_signaler <- trajectory() %>%
  log_("you shall pass") %>%
  send("you shall pass")

simmer() %>%
  add_generator("blocked", t_blocked, at(0)) %>%
  add_generator("signaler", t_signaler, at(5)) %>%
  run() %>% invisible
#> 0: blocked0: waiting...
#> 5: signaler0: you shall pass
#> 5: blocked0: continuing!

注意信號能夠被忽略,當到達流是在資源隊列中等待。相同的操做也能夠在批處理中執行:全部在進入批次以前的被訂閱信息都將被忽略。所以,下面的批次將被無限阻塞:

t_blocked <- trajectory() %>%
  trap("you shall pass") %>%
  log_("waiting inside a batch...") %>%
  batch(1) %>%
  wait() %>%
  log_("continuing!")

t_signaler <- trajectory() %>%
  log_("you shall pass") %>%
  send("you shall pass")

simmer() %>%
  add_generator("blocked", t_blocked, at(0)) %>%
  add_generator("signaler", t_signaler, at(5)) %>%
  run() %>% invisible
#> 0: blocked0: waiting inside a batch...
#> 5: signaler0: you shall pass
#> inf: batch0: continuing!

在接收信號,中止當前活動並執行處理程序提供。而後,執行後返回到活動中斷的點:

t_worker <- trajectory() %>%
  trap("you are free to go", 
       handler = trajectory() %>%
         log_("ok, I'm packing...") %>%
         timeout(1)
  ) %>%
  log_("performing a looong task...") %>%
  timeout(100) %>%
  log_("and I'm leaving!")

t_signaler <- trajectory() %>%
  log_("you are free to go") %>%
  send("you are free to go")

simmer() %>%
  add_generator("worker", t_worker, at(0)) %>%
  add_generator("signaler", t_signaler, at(5)) %>%
  run() %>% invisible
#> 0: worker0: performing a looong task...
#> 5: signaler0: you are free to go
#> 5: worker0: ok, I'm packing...
#> 6: worker0: and I'm leaving!

最後,untrap(., signals) 來根據 signals 執行退訂:

t_worker <- trajectory() %>%
  trap("you are free to go", 
       handler = trajectory() %>%
         log_("ok, I'm packing...") %>%
         timeout(1)
  ) %>%
  log_("performing a looong task...") %>%
  untrap("you are free to go") %>%
  timeout(100) %>%
  log_("and I'm leaving!")

t_signaler <- trajectory() %>%
  log_("you are free to go") %>%
  send("you are free to go")

simmer() %>%
  add_generator("worker", t_worker, at(0)) %>%
  add_generator("signaler", t_signaler, at(5)) %>%
  run() %>% invisible
#> 0: worker0: performing a looong task...
#> 5: signaler0: you are free to go
#> 100: worker0: and I'm leaving!

Signal 處理器默認是能夠被打斷,這意味着若是有大量頻繁的請求信號,處理器會反覆重啓:

t_worker <- trajectory() %>%
  trap("you are free to go", 
       handler = trajectory() %>%
         log_("ok, I'm packing...") %>%
         timeout(1)
  ) %>%
  log_("performing a looong task...") %>%
  timeout(100) %>%
  log_("and I'm leaving!")

t_signaler <- trajectory() %>%
  log_("you are free to go") %>%
  send("you are free to go")

simmer() %>%
  add_generator("worker", t_worker, at(0)) %>%
  add_generator("signaler", t_signaler, from(5, function() 0.5)) %>%
  run(10) %>% invisible
#> 0: worker0: performing a looong task...
#> 5: signaler0: you are free to go
#> 5: worker0: ok, I'm packing...
#> 5.5: signaler1: you are free to go
#> 5.5: worker0: ok, I'm packing...
#> 6: signaler2: you are free to go
#> 6: worker0: ok, I'm packing...
#> 6.5: signaler3: you are free to go
#> 6.5: worker0: ok, I'm packing...
#> 7: signaler4: you are free to go
#> 7: worker0: ok, I'm packing...
#> 7.5: signaler5: you are free to go
#> 7.5: worker0: ok, I'm packing...
#> 8: signaler6: you are free to go
#> 8: worker0: ok, I'm packing...
#> 8.5: signaler7: you are free to go
#> 8.5: worker0: ok, I'm packing...
#> 9: signaler8: you are free to go
#> 9: worker0: ok, I'm packing...
#> 9.5: signaler9: you are free to go
#> 9.5: worker0: ok, I'm packing...

若是須要實現一個不能打斷的處理器,能夠經過設置合適的 flag 實現:

t_worker <- trajectory() %>%
  trap("you are free to go", 
       handler = trajectory() %>%
         log_("ok, I'm packing...") %>%
         timeout(1),
       interruptible = FALSE            # make it uninterruptible
  ) %>%
  log_("performing a looong task...") %>%
  timeout(100) %>%
  log_("and I'm leaving!")

t_signaler <- trajectory() %>%
  log_("you are free to go") %>%
  send("you are free to go")

simmer() %>%
  add_generator("worker", t_worker, at(0)) %>%
  add_generator("signaler", t_signaler, from(5, function() 0.5)) %>%
  run(10) %>% invisible
#> 0: worker0: performing a looong task...
#> 5: signaler0: you are free to go
#> 5: worker0: ok, I'm packing...
#> 5.5: signaler1: you are free to go
#> 6: worker0: and I'm leaving!
#> 6: signaler2: you are free to go
#> 6.5: signaler3: you are free to go
#> 7: signaler4: you are free to go
#> 7.5: signaler5: you are free to go
#> 8: signaler6: you are free to go
#> 8.5: signaler7: you are free to go
#> 9: signaler8: you are free to go
#> 9.5: signaler9: you are free to go

renege_in(), renege_if(), renege_abort()

renege_in(., t, out) 方法提供設置超時時間來出發到達流放棄軌跡的退出機制。中途退出後,到達流能夠選擇從一個子軌跡中出去。renege_abort(.) 方法提供了一個反悔機制。這些方法容許咱們作一些事情,好比,創建有限病人的模型。下面的例子中,用戶 到達銀行,只有一個職員處於服務態。 客服在等待5分鐘後若是還不能服務能夠選擇離開。

t <- trajectory(name = "bank") %>%
  log_("Here I am") %>%
  # renege in 5 minutes
  renege_in(5, 
            out = trajectory() %>%
              log_("Lost my patience. Reneging...")
  ) %>%
  seize("clerk", 1) %>%
  # stay if I'm being attended within 5 minutes
  renege_abort() %>%
  log_("I'm being attended") %>%
  timeout(10) %>%
  release("clerk", 1) %>%
  log_("Finished")

simmer() %>%
  add_resource("clerk", 1) %>%
  add_generator("customer", t, at(0, 1)) %>%
  run() %>% invisible
#> 0: customer0: Here I am
#> 0: customer0: I'm being attended
#> 1: customer1: Here I am
#> 6: customer1: Lost my patience. Reneging...
#> 10: customer0: Finished

一樣也能夠經過 renege_if(., signal, out) 實現,假設 在 t=5時刻, customer0 發送一個消息給 customer1:

t <- trajectory(name = "bank") %>%
  log_("Here I am") %>%
  # renege when "renege now" is received
  renege_if("renege now", 
            out = trajectory() %>%
              log_("Ok. Reneging...")
  ) %>%
  seize("clerk", 1) %>%
  # stay if I'm being attended within 5 minutes
  renege_abort() %>%
  log_("I'm being attended") %>%
  timeout(5) %>%
  log_("I say: renege now") %>%
  send("renege now") %>%
  timeout(5) %>%
  release("clerk", 1) %>%
  log_("Finished")

simmer() %>%
  add_resource("clerk", 1) %>%
  add_generator("customer", t, at(0, 1)) %>%
  run() %>% invisible
#> 0: customer0: Here I am
#> 0: customer0: I'm being attended
#> 1: customer1: Here I am
#> 5: customer0: I say: renege now
#> 5: customer1: Ok. Reneging...
#> 10: customer0: Finished

注意,和 trap() 不一樣的是, reneg* 是直接被觸發的,即便到達流還在隊列或者臨時批次中。

軌跡工具箱: joining 和 subsetting

join()

join(...) 將任意多個軌跡聚合,好比:

t1 <- trajectory() %>% seize("dummy", 1)
t2 <- trajectory() %>% timeout(1)
t3 <- trajectory() %>% release("dummy", 1)

t0 <- join(t1, t2, t3)
t0
#> trajectory: anonymous, 3 activities
#> { Activity: Seize        | resource: dummy, amount: 1 }
#> { Activity: Timeout      | delay: 1 }
#> { Activity: Release      | resource: dummy, amount: 1 }

或者,它可能嵌套使用,相似另外一個行爲:

t0 <- trajectory() %>%
  join(t1) %>%
  timeout(1) %>%
  join(t3)
t0
#> trajectory: anonymous, 3 activities
#> { Activity: Seize        | resource: dummy, amount: 1 }
#> { Activity: Timeout      | delay: 1 }
#> { Activity: Release      | resource: dummy, amount: 1 }

參考資料

原文做者: Iñaki Ucar, Bart Smeets 譯者: Harry Zhu 英文原文地址:
https://r-simmer.org/articles...

做爲分享主義者(sharism),本人全部互聯網發佈的圖文均聽從CC版權,轉載請保留做者信息並註明做者 Harry Zhu 的 FinanceR專欄:https://segmentfault.com/blog...,若是涉及源代碼請註明GitHub地址:https://github.com/harryprince。微信號: harryzhustudio商業使用請聯繫做者。

相關文章
相關標籤/搜索