摘要: 昨天,DataWorks推出了PYODPS任務類型,集成了Maxcompute的Python SDK,可在DataWorks的PYODPS節點上直接編輯Python代碼操做Maxcompute,也能夠設置調度任務來處理數據,提升數據開發效率。html
昨天,DataWorks推出了PYODPS任務類型,集成了Maxcompute的Python SDK,可在DataWorks的PYODPS節點上直接編輯Python代碼操做Maxcompute,也能夠設置調度任務來處理數據,提升數據開發效率。sql
效果以下圖app
只有華東2(上海)region 支持了 PYODPS 節點。異步
注:底層的 Python 版本爲 2.7 。工具
新建 PYODPS 節點具體操做以下:測試
1) 單擊數據開發頁面工具欄中的 新建 > 新建任務。2) 填寫新建任務彈出框中的各配置項。spa
3) 單擊建立code
DataWorks 的 PyODPS 節點中,將會包含一個全局的變量 odps 或者 o ,即 ODPS 入口。用戶不須要手動定義 ODPS 入口。htm
print(odps.exist_table('pyodps_iris'))
PyODPS支持ODPS SQL的查詢,並能夠讀取執行的結果。 execute_sql 或者 run_sql 方法的返回值是 運行實例 。對象
註解:並不是全部在 ODPS Console 中能夠執行的命令都是 ODPS 能夠接受的 SQL 語句。 在調用非 DDL / DML 語句時,請使用其餘方法,例如 GRANT / REVOKE 等語句請使用 run_security_query 方法,PAI 命令請使用 run_xflow 或 execute_xflow 方法。
>>> o.execute_sql('select * from dual') # 同步的方式執行,會阻塞直到SQL執行完成 >>> >>> instance = o.run_sql('select * from dual') # 異步的方式執行 >>> print(instance.get_logview_address()) # 獲取logview地址 >>> instance.wait_for_success() # 阻塞直到完成
有時,咱們在運行時,須要設置運行時參數,咱們能夠經過設置 hints 參數,參數類型是dict。
>>> o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})
咱們能夠對於全局配置設置sql.settings後,每次運行時則都會添加相關的運行時參數。
>>> from odps import options >>> options.sql.settings = {'odps.sql.mapper.split.size': 16} >>> o.execute_sql('select * from pyodps_iris') # 會根據全局配置添加hints
運行 SQL 的 instance 可以直接執行 open_reader 的操做,一種狀況是SQL返回告終構化的數據。
>>> with o.execute_sql('select * from dual').open_reader() as reader: >>> for record in reader: >>> # 處理每個record
另外一種狀況是 SQL 可能執行的好比 desc,這時經過 reader.raw 屬性取到原始的SQL執行結果。
>>> with o.execute_sql('desc dual').open_reader() as reader: >>> print(reader.raw)
PYODPS節點使用調度參數須要注意一下,系統定義的調度參數,能夠直接經過此方法獲取。
在全局包括一個 args 對象,能夠在這個中獲取,它是一個dict類型。
測試運行結果以下:
請注意:在數據開發下,使用了自定義調度參數,頁面上直接觸發運行PYODPS節點時,須要寫死時間,PYODPS節點沒法像SQL同樣直接替換。