阿里雲函數計算是一個事件驅動的全託管計算服務。經過函數計算,您無需管理服務器等基礎設施,只需編寫代碼並上傳。函數計算會爲您準備好計算資源,以彈性、可靠的方式運行您的代碼,並提供日誌查詢,性能監控,報警等功能。藉助於函數計算,您能夠快速構建任何類型的應用和服務,無需管理和運維。更棒的是,您只須要爲代碼實際運行消耗的資源付費,而代碼未運行則不產生費用。html
觸發器是觸發函數執行的方式。有時候您不想手動調用函數執行,您但願當某件事情發生時自動觸發函數的執行,這個事情就是事件源。您能夠經過配置觸發器的方式設置事件源觸發函數執行。
例如,設置定時觸發器,能夠在某個時間點觸發函數執行或者每隔5分鐘觸發函數一次;函數計算timetriggerjson
某些帳號ak須要按期更換,以確保ak安全;
在下面的代碼示例中,受權service具備訪問kms權限的能力,使用kms,先對一個具備建立和刪除ak權限的ak加密密文
解密,獲取具備建立和刪除ak權限的AK
, 以後利用這個AK
進行ak的建立和刪除操做api
說明: 除了使用kms加密解密來獲取較大權限的
AK
, 經過函數計算環境變量的設置也是一種很好的方法安全
建立函數,函數建立可參考函數計算helloworld服務器
注:記得給函數的service的role設置訪問kms權限微信
給函數配置定時器,詳情可參考定時觸發函數app
# -*- coding: utf-8 -*- import logging, time, json from aliyunsdkcore import client from aliyunsdkram.request.v20150501.CreateAccessKeyRequest import CreateAccessKeyRequest from aliyunsdkram.request.v20150501.DeleteAccessKeyRequest import DeleteAccessKeyRequest from aliyunsdkkms.request.v20160120.EncryptRequest import EncryptRequest from aliyunsdkkms.request.v20160120.DecryptRequest import DecryptRequest from aliyunsdkcore.auth.credentials import StsTokenCredential # ak Encrypt content AK_CiphertextBlob = "NmQyY2ZhODMtMTlhYS00MTNjLTlmZjAtZTQxYTFiYWVmMzZmM1B1NXhTZENCNXVWd1dhdTNMWVRvb3V6dU9QcVVlMXRBQUFBQUFBQUFBQ3gwZTkzeGhDdHVzMWhDUCtZeVVuMWlobzlCa3VxMlErOXFHWWdXXXHELLwL1NSZTFvUURYSW9lak5Hak1lMnF0R2I1TWUxMEJiYmkzVnBwZHlrWGYzc3kyK2tQbGlKb2lHQ3lrZUdieHN2eXZwSVYzN2Qyd1cydz09" USER_NAME = "ls-test" # sub-account name LOGGER = logging.getLogger() def handler(event, context): creds = context.credentials sts_token_credential = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token) # this demo ecs and function in same region, if not in same region, you need change region_id to your ecs instance's region_id clt = client.AcsClient(region_id=context.region, credential=sts_token_credential) request = DecryptRequest() request.set_CiphertextBlob(AK_CiphertextBlob) response = _send_request(clt, request) ak_info = json.loads(response.get("Plaintext","{}")) if not ak_info: return "KMS Decrypt ERROR" ak_id = ak_info["ak_id"] ak_secret = ak_info["ak_secret"] LOGGER.info("Decrypt sucessfully with key id: {}".format(response.get("KeyId","{}"))) clt2 = client.AcsClient(ak_id, ak_secret, context.region) request = CreateAccessKeyRequest() request.set_UserName(USER_NAME) # 給子帳號ls-test建立AK response = _send_request(clt2, request) create_ak_id = response.get("AccessKey",{}).get("AccessKeyId") if not create_ak_id: return LOGGER.info("create ak {} sucess!".format(create_ak_id)) time.sleep(10) request = DeleteAccessKeyRequest() request.set_UserName(USER_NAME) request.set_UserAccessKeyId(create_ak_id) response = _send_request(clt2, request) LOGGER.info("delete ak {} sucess!".format(create_ak_id)) return "OK" # send open api request def _send_request(clt, request): request.set_accept_format('json') try: response_str = clt.do_action_with_exception(request) LOGGER.debug(response_str) response_detail = json.loads(response_str) return response_detail except Exception as e: LOGGER.error(e)
AK 存在環境變量版本less
# -*- coding: utf-8 -*- import os, logging, time, json from aliyunsdkcore import client from aliyunsdkram.request.v20150501.CreateAccessKeyRequest import CreateAccessKeyRequest from aliyunsdkram.request.v20150501.DeleteAccessKeyRequest import DeleteAccessKeyRequest USER_NAME = "ls-test" # sub-account name LOGGER = logging.getLogger() def handler(event, context): ak_id = os.environ['AK_ID'] ak_secret = os.environ['AK_SECRET'] clt = client.AcsClient(ak_id, ak_secret, context.region) request = CreateAccessKeyRequest() request.set_UserName(USER_NAME) # 給子帳號USER_NAME建立AK response = _send_request(clt, request) create_ak_id = response.get("AccessKey", "").get("AccessKeyId") if not create_ak_id: return LOGGER.info("create ak {} sucess!".format(create_ak_id)) time.sleep(5) request = DeleteAccessKeyRequest() request.set_UserName(USER_NAME) request.set_UserAccessKeyId(create_ak_id) response = _send_request(clt, request) LOGGER.info("delete ak {} sucess!".format(create_ak_id)) return "OK" # send open api request def _send_request(clt, request): request.set_accept_format('json') try: response_str = clt.do_action_with_exception(request) LOGGER.info(response_str) response_detail = json.loads(response_str) return response_detail except Exception as e: LOGGER.error(e)
按期檢查本身ecs對應暴露的端口,確保安全,好比你的ecs是一個網站服務器,可能只須要對外暴露80端口就行,若是出現0.0.0.0/0這種容許全部人訪問的,須要出現報警或者自動修復運維
建立函數,函數建立可參考函數計算helloworlddom
注:記得給函數的service的role設置管理ecs權限
# -*- coding: utf-8 -*- import logging import json, random, string, time from aliyunsdkcore import client from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest from aliyunsdkecs.request.v20140526.DescribeSecurityGroupAttributeRequest import DescribeSecurityGroupAttributeRequest from aliyunsdkcore.auth.credentials import StsTokenCredential LOGGER = logging.getLogger() clt = None # 須要檢查的ecs列表, 修改爲你的ecs id 列表 ECS_INST_IDS = ["i-uf6h07zdscdg9g55zkxx", "i-uf6bwkxfxh847a1e2xxx"] def handler(event, context): creds = context.credentials global clt sts_token_credential = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token) # this demo ecs and function in same region, if not in same region, you need change region_id to your ecs instance's region_id clt = client.AcsClient(region_id=context.region, credential=sts_token_credential) invalid_perssions = {} for ecs_id in ECS_INST_IDS: ret = check_and_modify_security_rule(ecs_id) if ret: invalid_perssions[ecs_id] = ret return invalid_perssions def check_and_modify_security_rule(instance_id): LOGGER.info("check_and_modify_security_rule, instance_id is %s ", instance_id) request = DescribeInstancesRequest() request.set_InstanceIds(json.dumps([instance_id])) response = _send_request(request) SecurityGroupIds = [] if response is not None: instance_list = response.get('Instances', {}).get('Instance') for item in instance_list: SecurityGroupIds = item.get('SecurityGroupIds', {}).get("SecurityGroupId", []) break if not SecurityGroupIds: LOGGER.error("ecs {} do not have SecurityGroupIds".format(instance_id)) return invalid_perssions = [] for sg_id in SecurityGroupIds: request = DescribeSecurityGroupAttributeRequest() request.set_SecurityGroupId(sg_id) response = _send_request(request) LOGGER.info("Find a securityGroup id {}".format(sg_id)) permissions = response.get("Permissions", {}).get("Permission",[]) if not permissions: continue for permission in permissions: if permission["Direction"] == "ingress" and permission["SourceCidrIp"] == "0.0.0.0/0": LOGGER.error("ecs {0} , SecurityGroup id {1}, have a risk, need fix; permission = {2}".format(instance_id, sg_id, permission)) invalid_perssions.append(permission) return invalid_perssions # send open api request def _send_request(request): request.set_accept_format('json') try: response_str = clt.do_action_with_exception(request) LOGGER.debug(response_str) response_detail = json.loads(response_str) return response_detail except Exception as e: LOGGER.error(e)
「 阿里巴巴雲原生微信公衆號(ID:Alicloudnative)關注微服務、Serverless、容器、Service Mesh等技術領域、聚焦雲原生流行技術趨勢、雲原生大規模的落地實踐,作最懂雲原生開發者的技術公衆號。」