AWS Step Functions 將多個 AWS 服務協調爲無服務器工做流,以即可以快速構建和更新應用程序。使用 Step Functions,能夠設計和運行將AWS Lambda 和 Amazon ECS 等服務整合到功能豐富的應用程序中的工做流。工做流由一系列步驟組成,一個步驟的輸出充當下一個步驟的輸入。 使用 Step Functions,應用程序開發更簡單、更直觀,由於它將工做流轉換爲易於理解、易於向其餘人說明且易於更改的狀態機示意圖。能夠監控執行的每一個步驟,這意味着能夠快速識別並解決問題。Step Functions 能夠自動觸發和跟蹤各個步驟,並在出現錯誤時重試,所以的應用程序可以按照預期順序執行。數據庫
demo架構圖json
AWS step function 狀態機的工做流程服務器
工做流程如圖:session
1.調用 Input Lottery Winners 函數,傳入 num_of_winners,進入第二步。架構
2.Random Select Winners 根據 Input Lottery Winners 的輸出(body)調用 Random Select Winners,生成兩個獲獎號碼,進入第三步。運維
3.Validate Winners 根據第二步輸出查詢 Winners 表判斷是否重複,重複則傳出 status:1,不然傳出 status:0。dom
4.Is Winner In Past Draw 接受第三步 status 並判斷,當 status 爲1,則從新調用 Random Select Winners 進入第二步,當 status 爲0,則調用 Notify Winners 和 Record Winner Queue,給 SNS topic 發送通知,把獲獎者寫入 Winner 表。ide
在整個工做流程中,當 Catch 接收到錯誤,都直接進入 Failed 步驟,輸出異常並中斷 step function。函數
IAM角色建立設計
執行 lambda 的角色須要如下策略:
AmazonDynamoDBFullAccess
AWSLambdaBasicExecutionRole
AmazonSNSFullAccess
AWSStepFunctionsFullAccess
在 AWS IAM 控制檯中建立角色:
大概以下圖
lambda 建立
Input Lottery Winners
爲了實現Step Functions狀態機流轉下的任務,咱們此次實現會用到AWS Lambda做爲咱們業務的實現環境
1.進入AWS控制檯,選擇服務而後輸入Lambda進入AWS Lambda控制檯
2.選擇建立函數,而後選擇從頭開始創做來自定義咱們的實驗程序
3.首先咱們須要建立狀態機中的第一個狀態任務Input Lottery Winners,輸入函數名稱Lottery-InputWinners來定義幸運兒的數量。運行語言選擇Python 3.7。同時須要選擇函數執行的權限, 這裏咱們選擇使用現用角色,選擇咱們以前建立的IAM角色
4.點擊建立函數
5.在函數代碼欄目下輸入以下代碼塊,修改代碼中的 region_name 爲當前使用的 region
6.建立函數時複製頁面右上角的ARN,後面建立狀態機須要
import json
class CustomError(Exception):
pass
def lambda_handler(event, context):
num_of_winners = event['input']
if 'exception' in event: raise CustomError("An error occurred!!") return { "body": { "num_of_winners": num_of_winners } }
接下來咱們還須要建立另外三個須要定義的狀態機業務邏輯,建立過程和上面的Lottery-InputWinners一致,下面是具體的狀態機AWS
Lambda代碼塊
Lottery-RandomSelectWinners
import json
import boto3
from random import randint
from boto3.dynamodb.conditions import Key, Attr
TOTAL_NUM = 10
def lambda_handler(event, context):
num_of_winners = event['num_of_winners'] # query in dynamodb dynamodb = boto3.resource('dynamodb', region_name='') table = dynamodb.Table('Lottery-Employee') # random select the winners, if has duplicate value, re-run the process while True: lottery_serials = [randint(1,TOTAL_NUM) for i in range(num_of_winners)] if len(lottery_serials) == len(set(lottery_serials)): break # retrieve the employee details from dynamodb results = [table.query(KeyConditionExpression=Key('lottery_serial').eq(serial), IndexName='lottery_serial-index') for serial in lottery_serials] # format results winner_details = [result['Items'][0] for result in results] return { "body": { "num_of_winners": num_of_winners, "winner_details": winner_details } }
Lottery-ValidateWinners
import json
import boto3
from boto3.dynamodb.conditions import Key, Attr
def lambda_handler(event, context):
num_of_winners = event['num_of_winners'] winner_details = event['winner_details'] # query in dynamodb dynamodb = boto3.resource('dynamodb', region_name='') table = dynamodb.Table('Lottery-Winners') # valiate whether the winner has already been selected in the past draw winners_employee_id = [winner['employee_id'] for winner in winner_details] results = [table.query(KeyConditionExpression=Key('employee_id').eq(employee_id)) for employee_id in winners_employee_id] output = [result['Items'] for result in results if result['Count'] > 0] # if winner is in the past draw, return 0 else return 1 has_winner_in_queue = 1 if len(output) > 0 else 0 # format the winner details in sns winner_names = [winner['employee_name'] for winner in winner_details] name_s = "" for name in winner_names: name_s += name name_s += " " return { "body": { "num_of_winners": num_of_winners, "winner_details": winner_details }, "status": has_winner_in_queue, "sns": "Congrats! [{}] You have selected as the Lucky Champ!".format(name_s.strip()) }
Lottery-RecordWinners
import json
import boto3
from boto3.dynamodb.conditions import Key, Attr
def lambda_handler(event, context):
winner_details = event['winner_details'] # retrieve the winners' employee id employee_ids = [winner['employee_id'] for winner in winner_details] # save the records in dynamodb dynamodb = boto3.resource('dynamodb', region_name='') table = dynamodb.Table('Lottery-Winners') for employee_id in employee_ids: table.put_item(Item={ 'employee_id': employee_id }) return { "body": { "winners": winner_details }, "status_code": "SUCCESS" }
建立AWS SNS 通知服務
1.進入AWS控制檯,在服務中搜索SNS
2.在SNS控制面板中,選擇主題, 而後選擇建立主題
3.在建立新主題彈框中,輸入
主題名稱: Lottery-Notification
顯示名稱: Lottery
1.建立主題後,會進入主題詳細信息頁面,這時候咱們須要建立訂閱來對接咱們的消息服務,例如郵件服務(本次實驗使用郵件服務來做爲消息服務)
2.點擊建立訂閱, 在彈框中選擇
協議: Email
終端節點: <填入本身的郵箱地址>
1.點擊請求確認, 而後到上面填寫的郵箱地址中確認收到信息,表示確認該郵箱能夠接收來自AWS SNS該主題的通知消息
2.複製主題詳細頁面的主題ARN,以後替換Step Functions狀態機下的<Notification:ARN>
建立Amazon Dynamodb 服務
本次實驗須要建立兩張Dynamodb表來記錄員工信息和幸運兒信息。使用Dynamodb能更快地經過託管的方式記錄數據同時免去數據庫運維的壓力。
1.進入AWS控制檯,在服務中搜索Dynamodb
2.在左側控制欄中選在表, 而後在主頁面中選擇建立表
3.在建立Dynamodb表中,填入以下信息
·表名稱:Lottery-Winners ·主鍵:employee_id
4.表設置中確認勾選使用默認設置,點擊建立
5.一樣的設置步驟,點擊建立表,在建立Dynamodb表中,填入以下信息
·表名稱:Lottery-Employee主鍵:employee_id
1.表設置中確認勾選使用默認設置,點擊建立
2.等待表建立完成後, 經過本附件中的request-items.json文件導入數據到Lottery-Employee
$ aws dynamodb batch-write-item --request-items file://request-items.json
1.選擇表Lottery-Employee Tab頁面中的索引, 點擊建立索引
主鍵:lottery_serial, 字段類型選擇數字
索引名稱:lottery_serial-index
建立AWS Step Functions 狀態機
1.進入AWS控制檯,在服務中搜索Step Functions
2.進入Step Functions服務後,點擊左側的活動欄,並點擊狀態機
3.進入狀態機主頁面後,選擇建立狀態機
4.在定義狀態機欄目下,選擇默認使用代碼段創做。同時在詳細信息欄目輸入狀態機名稱Lottery
5.在狀態機定義欄目下,複製以下狀態機定義文件,經過Amazon States Language來定義狀態機的狀態流轉
{
"Comment": "A simple AWS Step Functions state machine that simulates the lottery session",
"StartAt": "Input Lottery Winners",
"States": {
"Input Lottery Winners": { "Type": "Task", "Resource": "<InputWinners:ARN>", "ResultPath": "$", "Catch": [ { "ErrorEquals": [ "CustomError" ], "Next": "Failed" }, { "ErrorEquals": [ "States.ALL" ], "Next": "Failed" } ], "Next": "Random Select Winners" }, "Random Select Winners": { "Type": "Task", "InputPath": "$.body", "Resource": "<RandomSelectWinners:ARN>", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Failed" } ], "Retry": [ { "ErrorEquals": [ "States.ALL"], "IntervalSeconds": 1, "MaxAttempts": 2 } ], "Next": "Validate Winners" }, "Validate Winners": { "Type": "Task", "InputPath": "$.body", "Resource": "<ValidateWinners:ARN>", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Failed" } ], "Retry": [ { "ErrorEquals": [ "States.ALL"], "IntervalSeconds": 1, "MaxAttempts": 2 } ], "Next": "Is Winner In Past Draw" }, "Is Winner In Past Draw": { "Type" : "Choice", "Choices": [ { "Variable": "$.status", "NumericEquals": 0, "Next": "Send SNS and Record In Dynamodb" }, { "Variable": "$.status", "NumericEquals": 1, "Next": "Random Select Winners" } ] }, "Send SNS and Record In Dynamodb": { "Type": "Parallel", "End": true, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Failed" } ], "Retry": [ { "ErrorEquals": [ "States.ALL"], "IntervalSeconds": 1, "MaxAttempts": 2 } ], "Branches": [ { "StartAt": "Notify Winners", "States": { "Notify Winners": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "<Notification:ARN>", "Message.$": "$.sns" }, "End": true } } }, { "StartAt": "Record Winner Queue", "States": { "Record Winner Queue": { "Type": "Task", "InputPath": "$.body", "Resource": "<RecordWinners:ARN>", "TimeoutSeconds": 300, "End": true } } } ] }, "Failed": { "Type": "Fail" }
}
}
1.在狀態機定義欄目的右側,點擊刷新按鈕,能夠看到狀態機流轉的流程圖。使用以前的 lambda ARN(4個),SNS topic ARN(1個),對應替換狀態機 json 文件中的<InputWinners:ARN>,<RandomSelectWinners:ARN>,<ValidateWinners:ARN>,<RecordWinners:ARN>,<Notification:ARN>,點擊下一步。
2.在配置設置下,選擇爲我建立IAM角色, 輸入自定義的IAM角色名稱MyStepFunctionsExecutionRole,而且附加AmazonSNSFullAccess權限
3.點擊建立狀態機完成建立過程
2.進入以前建立的狀態機Lottery
3.點擊啓動執行
4.在彈框中填入輸入的json文本,這裏的input表明在本次實驗中須要抽取的獲獎人數
{
"input": 2
}
1.點擊啓動執行
2.郵件會收取到幸運兒的信息
當函數出錯時,也能直觀得在可視化界面看到,而且在輸出中查看錯誤日誌