PyODPS 中使用 Python UDF
摘要: PyODPS 中使用 Python UDF 包含兩方面,一個是直接使用,也就是在 MaxCompute SQL 中使用;一個是間接的方式,也就是 PyODPS DataFrame,這種方式你不需要直接寫 Python UDF,而是寫普通的 Python 函數或者類。
點此查看原文:http://click.aliyun.com/m/41092/
PyODPS 中使用 Python UDF 包含兩方面,一個是直接使用,也就是在 MaxCompute SQL 中使用;一個是間接的方式,也就是 PyODPS DataFrame,這種方式你不需要直接寫 Python UDF,而是寫普通的 Python 函數或者類。下面我們分開說明。
作為準備工作,我們需要 ODPS 入口,可以通過直接初始化,或者使用 room 機制 加載。
from odps import ODPSo = ODPS('your-access-id', 'your-access-key', 'your-project')MaxCompute SQL 中使用 Python UDF
首先,我們需要寫一個 Python 文件,假設我們就是把某一列按 csv 格式放的一列轉成 json 格式。
import jsonfrom odps.udf import annotate@annotate('string->string') class Transform(object):def evaluate(self, x):columns = list('abc')d = dict(zip(columns, x.split(',')))return json.dumps(d)假設這個文件叫 my.py,接下來我們就需要創建 py 資源。
r = o.create_resource('csv_to_json.py', 'py', fileobj=open('my.py'))fileobj 參數也可以是 str 類型,就是表示文件的內容
接著我們就可以創建 Python UDF 了。
o.create_function('csv_to_json', class_type='csv_to_json.Transform', resources=[r])這里我們指定了函數名叫 csv_to_json,主類使我們上傳的 csv_to_json.py 文件里的 Transform 類。
現在我們就可以在 MaxCompute SQL 中調用這個 UDF 了。
o.execute_sql('select csv_to_json(raw) from pyodps_test_udf')這樣我們就完成了在 PyODPS 中使用 MaxCompute SQL + Python UDF 的整個過程。
PyODPS DataFrame
對于 PyODPS DataFrame 來說,用戶只需要寫普通的 Python 函數或者類,在函數或者類里,甚至可以讀取全局變量,這樣給開發帶來了極大的方便。
和上面的例子目標相同,我們定義一個 transform 函數即可。然后我們對于 DataFrame 的一列調用 map 方法來應用這個函數。
passed_columns = list('abc') # 可以從數據庫中讀取或者寫死def transform(x):import jsond = dict(zip(passed_columns, x.split(',')))return json.dumps(d)df.raw.map(transform) In [30]: dfraw 0 1,2,3 1 4,5,6 2 7,8,9In [31]: df.raw.map(transform)raw 0 {"a": "1", "c": "3", "b": "2"} 1 {"a": "4", "c": "6", "b": "5"} 2 {"a": "7", "c": "9", "b": "8"}實際上,PyODPS DataFrame 在用 MaxCompute 執行的時候,也會創建 Python UDF 來實現這個功能,但用戶不需要去創建文件、資源和函數這些過程,一切都是 Python 原生函數和類,整個過程相當順暢。
另外可以看到,在上面的 my.py 里,我們也是定義了一個 columns 參數的,而如果這個參數是通過變量傳進去的話,在 Python UDF 里非常麻煩,可能常常需要用一些 tricky 的方法,比如寫到某個文件資源,然后在 UDF 里讀取之類的。而對于 DataFrame 來說,完全沒有這個問題,我們可以自由讀取全局變量。
不過要注意的是,這個全局變量是被序列化到各個機器上的,所以你修改它不會全局生效。
好了,還有什么問題可以隨時和我們取得聯系。
文檔:http://pyodps.readthedocs.io/zh_CN/latest/
代碼:https://github.com/aliyun/aliyun-odps-python-sdk ,歡迎提 issue 和 merge request
總結
以上是生活随笔為你收集整理的PyODPS 中使用 Python UDF的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ICDE:POLARDB定义云原生数据库
- 下一篇: Python数据预处理:使用Dask和N