利用MaxCompute InformationSchema與阿里雲交易和帳單管理API 實現MaxCompute費用對帳分攤統計html
1、需求場景分析java
不少的企業用戶選擇MaxCompute按量付費模式構建本身的數據平臺,利用MaxCompute按做業付費的計費模型,在得到高性能的同時避免"IDLE"狀態的沒必要要資源費用支出,僅爲實際使用付費。spring
那麼在一個規模比較大的公司,企業購買了MaxCompute服務,會支撐企業內部的不一樣部門、我的來使用MaxCompute來開展數據處理分析。爲了更好地識別數據平臺使用方的週期性花費成本,優化數據資源的使用,就有必要對做業的費用進行統計,從而確認不一樣人員或歸屬部門的做業數量、做業費用、做業時長、做業資源使用量等指標。基於這些指標進行成本分攤、做業優化等管理工做。sql
阿里雲交易和帳單系統包含了MaxCompute產品的費用信息及費用明細,經過關聯交易和帳單系統的計費明細與MaxCompute項目的做業明細或某時間段的帳單費用,能夠得到使用用戶、做業明細信息(如提交人帳號、做業SQL內容、做業資源使用大小等信息)與計費明細或帳單費用間的關係,從而開展分析。
本文將介紹若是自動化實現MaxCompute按量付費項目的做業費用統計,您也能夠經過阿里雲交易和帳單系統API獲取其餘須要的費用信息,擴展分析場景。apache
2、方案設計思路json
一、得到MaxCompute項目歷史做業明細api
MaxCompute Information_Schema服務是MaxCompute產品的開放元數據服務,經過Information_Schema提供的一組視圖,用戶能夠自助地查詢訪問到項目內的準實時的table,column,function等全量元數據信息,同時也提供了項目內近期的做業歷史明細,供使用者自助查詢使用。
經過元數據服務Information_Schema裏面的做業歷史表tasks_history視圖,能夠查詢到準實時的項目做業歷史明細。包括:項目名稱、任務名稱、Instance id、開始時間、結束時間、任務複雜度、任務CPU使用狀況等字段。session
備註:Information_Schema目前正在灰度中,即將全面開放。app
二、獲取做業的計費明細數據ide
用戶能夠經過費用中心帳號總覽消費記錄去查詢具體的消費狀況。
同時,格式阿里雲交易和帳單管理OpenAPI爲用戶提供管理阿里雲產品售賣和財資能力,經過該api能夠程序化獲取MaxCompute做業計費明細數據。
調用QueryUserOmsData接口(阿里雲的帳單系統OMS),能夠查詢到具體計量信息編號、數據分類、存儲、SQL讀取量、公網上下行流量等字段信息。
三、關聯計費明細與做業明細
經過表關聯,查詢到須要計算的數據結果
select distinct t.task_schema, o.MeteringId, t.owner_id, t.operation_text, o.type, o.endtime, o.computationsqlinput, o.computationsqlcomplexity, t.cost_cpu,o.starttime, t.cost_mem from information_schema.tasks_history t right join OdpsFeeDemo o on t.inst_id = o. meteringid and t.task_schema = o.projectid where o.type = "ComputationSql";
這些數據能夠經過做業ID與計費明細數據進行關聯後,您就獲取各個做業明細的費用信息(例如,SQL費用=數據掃描量*複雜度) ,從而能夠開展不一樣視角的分析。
須要強調的是:MaxCompute的計費都是以阿里雲費用中心的出帳結果及費用明細爲準。
3、具體實現步驟(含參考代碼)
1.查詢元數據服務裏面的做業歷史表tasks_history
例如,您登陸訪問的當前項目爲 myproject1,在 myproject1 中,能夠經過查詢 INFORMATION_SCHEMA.tables 得到當前 myproject1 中全部表的元數據信息。
odps@ myproject1 > select * from information_schema.tables;
INFORMATION_SCHEMA 同時包含了做業歷史視圖,能夠查詢到當前項目內的做業歷史信息,使用時注意添加日期分區進行過濾,例如。
odps@ mypoject1 > select * from information_schema.tasks_history where ds=’yyyymmdd’ limit 100; odps@ myproject1 > desc package information_schema.systables;
查詢歷史表字段屬性
odps@ myproject1 > desc information_schema.tasks_history;
以下如所示:
2.使用阿里雲交易和帳單管理API獲取費用明細和分攤統計
方法1:手工下載上傳方式
(一)首先在MaxCompute中建立結果輸出表OMS表,建表語句以下:
CREATE TABLE IF NOT EXISTS OdpsFeeDemo( ProjectId STRING COMMENT '項目編號', MeteringId STRING COMMENT '計量信息編號', Type STRING COMMENT '數據分類', Storage STRING COMMENT '存儲(Byte)', EndTime STRING COMMENT '結束時間', ComputationSqlInput STRING COMMENT 'SQL讀取量', ComputationSqlComplexity STRING COMMENT 'SQL複雜度', StartTime STRING COMMENT '開始時間', OdpsSpecCode STRING COMMENT '規格類型' );
方法一:手動從視圖下載oms帳單詳細費用,將數據上傳(tunnel upload)到odps對應輸出表
手動下載步驟:
https://help.aliyun.com/product/87964.html?spm=a2c4g.750001.list.245.5e907b138Ik9xM
進入阿里雲用戶中心:https://usercenter2.aliyun.com/home
返回舊版
費用中心>消費記錄>使用記錄
選擇產品類型,填寫使用期間,計算粒度,導出CSV格式
把oms數據按期取下來,而後上傳到odps中建立的結果輸出表(OdpsFeeDemo)
tunnel upload C:UsersDesktopaa.txt project.tablename ;
(二)進行表關聯,將最終結果存儲在上面建立的MaxComputeFee中
select distinct t.task_schema, o.MeteringId, t.owner_id, o.type, o.endtime, o.computationsqlinput, o.computationsqlcomplexity, t.cost_cpu,o.starttime, t.cost_mem from information_schema.tasks_history t right join OdpsFeeDemo o on t.inst_id = o. meteringid and t.task_schema = o.projectid where o.type = 「ComputationSql」;
方法2:程序化API下載費用明細數據&上傳到MaxCompute後分析
(一)在odps建立oms表OdpsFeeDemo,參考以下:
CREATE TABLE IF NOT EXISTS OdpsFeeDemo( ProjectId STRING COMMENT '項目編號', MeteringId STRING COMMENT '計量信息編號', Type STRING COMMENT '數據分類', Storage STRING COMMENT '存儲(Byte)', EndTime STRING COMMENT '結束時間', ComputationSqlInput STRING COMMENT 'SQL讀取量', ComputationSqlComplexity STRING COMMENT 'SQL複雜度', StartTime STRING COMMENT '開始時間', OdpsSpecCode STRING COMMENT '規格類型' );
經過API下載OMS系統數據並上傳到odps對於表格中
代碼參考以下:
1) 服務啓動類Application
package com.alibaba.odps; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; /** * @ClassName: Application * @Description: 服務啓動類 * @Author: *** * @Data: 2019/7/30 17:15 **/ @SpringBootApplication @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
2) 從odps接收數據ReceiveData
package com.alibaba.odps.controller; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.Map; import com.alibaba.fastjson.JSONObject; import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.bssopenapi.model.v20171214.QueryUserOmsDataRequest; import com.aliyuncs.bssopenapi.model.v20171214.QueryUserOmsDataResponse; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * @ClassName: ReceiveData * @Description: 接收數據 * @Author: LiuJianwei * @Data: 2019/7/30 17:18 **/ @Component public class ReceiveData { @Value("${table}") private String table; @Value("${odps.accessKeyId}") private String accessKeyId; @Value("${odps.accessKeySecret}") private String accessKeySecret; @Value("${file.save.path}") private String fileSavePath; @Autowired private OdpsServer odpsServer; protected final ObjectMapper objectMapper = new ObjectMapper(); { objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } // String[] fileds = {"DBVersion", "InstanceId", "NetworkIn", "NetworkOut", "Storage", "Memory", "Region", "ProviderId", // "DBType", "EndTime", "StartTime", "InstanceUseType", "InstanceName"}; String[] fileds = {"ProjectId","MeteringId","Type","Storage","EndTime","ComputationSqlInput","ComputationSqlComplexity","StartTime","OdpsSpecCode"}; @Scheduled(cron = "${cron}") public void queryUserOmsData() { //獲取昨天的開始日期和結束日期 SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); String yesterday = format.format(DateUtils.addDays(new Date(), -1)); //String yesterday = "2019-07-29"; String startTime = yesterday + "T00:00:00Z"; String endTime = yesterday + "T23:59:59Z"; DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", accessKeyId, accessKeySecret); IAcsClient client = new DefaultAcsClient(profile); for (String tab : table.split(",")) { QueryUserOmsDataRequest request = new QueryUserOmsDataRequest(); request.setTable(tab.trim()); request.setDataType("HOUR"); request.setStartTime(startTime); request.setEndTime(endTime); try { QueryUserOmsDataResponse response = client.getAcsResponse(request); String data = new Gson().toJson(response); //將數據插入 odpsServer.writeDataToOdps(data, yesterday, tab.trim()); //將查詢到的數據保存到TXT中 writeDataToTxt(data, yesterday); } catch (IOException | ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println(e); System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } } public void writeDataToTxt(String data, String yesterday) throws IOException { String path = fileSavePath + File.separator + yesterday + ".txt"; FileWriter writer = new FileWriter(new File(path)); if (StringUtils.isNotEmpty(data)) { JSONObject json = objectMapper.readValue(data, JSONObject.class); JSONObject datas = json.getJSONObject("data"); if (datas.containsKey("omsData")) { List<Map<String, Object>> list = (List<Map<String, Object>>) datas.get("omsData"); if (!list.isEmpty()) { for (Map<String, Object> map : list) { StringBuilder sb = new StringBuilder(); for (String key : fileds) { if (map.containsKey(key)) { sb.append(map.get(key)); } else { sb.append(" "); } sb.append(","); } sb.setLength(sb.length() - 1); sb.append("\r\n"); writer.write(sb.toString()); } } } } writer.flush(); writer.close(); } }
3) 將接收數據上傳到MaxCompute項目裏建好的oms表,類名:OdpsServer
package com.alibaba.odps.controller; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import com.alibaba.fastjson.JSONObject; import com.aliyun.odps.Odps; import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.data.Record; import com.aliyun.odps.data.RecordWriter; import com.aliyun.odps.tunnel.TableTunnel; import com.aliyun.odps.tunnel.TableTunnel.UploadSession; import com.aliyun.odps.tunnel.TableTunnel.UploadStatus; import com.aliyun.odps.tunnel.TunnelException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * @ClassName: OdpsServer * @Description: 將數據寫入ODPS中 * @Author: LiuJianwei * @Data: 2019/7/30 17:23 **/ @Component public class OdpsServer implements InitializingBean { @Value("${odps.accessKeyId}") private String accessKeyId; @Value("${odps.accessKeySecret}") private String accessKeySecret; @Value("${odps.project}") private String project; @Value("${odps.url}") private String url; private UploadSession ossUploadSession; private UploadSession rdsUploadSession; private UploadSession odpsUploadSession; private String OSSTableName = "MaxComputeFee"; private String RDSTableName ="RDS"; private String ODPSTableName ="OdpsFeeDemo"; protected final ObjectMapper objectMapper = new ObjectMapper(); { objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } public void writeDataToOdps(String data, String yesday, String tab) { List<Map<String, Object>> dataList = new ArrayList<>(); if (StringUtils.isNotEmpty(data)) { try { JSONObject json = objectMapper.readValue(data, JSONObject.class); JSONObject datas = json.getJSONObject("data"); if (datas.containsKey("omsData")) { dataList = (List<Map<String, Object>>)datas.get("omsData"); } if (dataList.isEmpty()) { return; } //數據不爲空,開發往ODPS中寫入數據 if (tab.equals("oss")) { for (Map<String, Object> map : dataList) { UploadSession session = getSession(OSSTableName); RecordWriter writer = session.openRecordWriter(session.getAvailBlockId()); Record record = session.newRecord(); writer.write(record); if (writer != null) { writer.close(); session.commit(new Long[] {0L}); } } } else if (tab.equals("rds")) { for (Map<String, Object> map : dataList) { UploadSession session = getSession(RDSTableName); RecordWriter writer = session.openRecordWriter(session.getAvailBlockId()); Record record = session.newRecord(); record.set("dbversion", map.get("DBVersion").toString()); record.set("instanceid", map.get("InstanceId").toString()); record.set("networkin", map.get("NetworkIn").toString()); record.set("networkout", map.get("NetworkOut").toString()); record.set("storage", Long.valueOf(map.get("Storage").toString())); record.set("memory", map.get("Memory").toString()); record.set("region", map.get("Region").toString()); record.set("providerid", map.get("ProviderId").toString()); record.set("dbtype", map.get("DBType").toString()); record.set("endtime", map.get("EndTime").toString()); record.set("starttime", map.get("StartTime").toString()); record.set("instanceusetype", map.get("InstanceUseType").toString()); record.set("instancename", map.get("InstanceName").toString()); writer.write(record); if (writer != null) { writer.close(); session.commit(new Long[] {0L}); } } } else if (tab.equals("odps")) { for (Map<String, Object> map : dataList) { UploadSession session = getSession(ODPSTableName); RecordWriter writer = session.openRecordWriter(session.getAvailBlockId()); Record record = session.newRecord(); record.set("projectid", map.containsKey("ProjectId") ? map.get("ProjectId").toString() : ""); record.set("meteringid", map.containsKey("MeteringId") ? map.get("MeteringId").toString() : ""); record.set("type", map.containsKey("Type") ? map.get("Type").toString() : ""); record.set("storage", map.containsKey("Storage") ? map.get("Storage").toString() : ""); record.set("endtime", map.containsKey("EndTime") ? map.get("EndTime").toString() : ""); record.set("computationsqlinput", map.containsKey("ComputationSqlInput") ? map.get("ComputationSqlInput").toString() : ""); record.set("computationsqlcomplexity", map.containsKey("ComputationSqlComplexity") ? map.get("ComputationSqlComplexity").toString() : ""); record.set("starttime", map.containsKey("StartTime") ? map.get("StartTime").toString() : ""); record.set("odpsspeccode", map.containsKey("OdpsSpecCode") ? map.get("OdpsSpecCode").toString() : ""); writer.write(record); if (writer != null) { writer.close(); session.commit(new Long[] {0L}); } } } } catch (Exception e) { throw new RuntimeException(); } } } private UploadSession getSession(String tableName) { try { if (tableName.equals(OSSTableName)) { if (!this.ossUploadSession.getStatus().equals(UploadStatus.NORMAL)) { this.ossUploadSession = createNewSession(tableName); } return this.ossUploadSession; } else if (tableName.equals(RDSTableName)) { if (!this.rdsUploadSession.getStatus().equals(UploadStatus.NORMAL)) { this.rdsUploadSession = createNewSession(tableName); } return this.rdsUploadSession; }else if (tableName.equals(ODPSTableName)) { if (!this.odpsUploadSession.getStatus().equals(UploadStatus.NORMAL)) { this.odpsUploadSession = createNewSession(tableName); } return this.odpsUploadSession; } } catch (TunnelException | IOException e) { throw new RuntimeException(e); } return null; } private UploadSession createNewSession(String tableName) { try { AliyunAccount account = new AliyunAccount(accessKeyId, accessKeySecret); Odps odps = new Odps(account); odps.setEndpoint(url); odps.setDefaultProject(project); TableTunnel odpsTunnel = new TableTunnel(odps); UploadSession session = odpsTunnel.createUploadSession(project, tableName); return session; } catch (TunnelException e) { throw new RuntimeException(e); } } @Override public void afterPropertiesSet() throws Exception { this.ossUploadSession = createNewSession(OSSTableName); this.rdsUploadSession = createNewSession(RDSTableName); this.odpsUploadSession = createNewSession(ODPSTableName); } }
4) 配置文件
#配置accessKeyId odps.accessKeyId=******** #配置accessKeySecret odps.accessKeySecret=******** #配置project odps.project=工做空間 #配置url odps.url=http://service.odps.aliyun.com/api #配置table table=odps ds#配置定時任務時間設置 cron=0/1 0/1 * * * ?
5) 如今將數據上傳到odps裏面對應的表,而後進行關聯
select distinct t.task_schema, o.MeteringId, t.owner_id, o.type, o.endtime, o.computationsqlinput, o.computationsqlcomplexity, t.cost_cpu,o.starttime, t.cost_mem from information_schema.tasks_history t right join OdpsFeeDemo o on t.inst_id = o. meteringid and t.task_schema = o.projectid where o.type = 「ComputationSql」;
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。