日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

PyODPS 中使用 Python UDF

發布時間:2024/8/23 python 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 PyODPS 中使用 Python UDF 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

摘要: PyODPS 中使用 Python UDF 包含兩方面,一個是直接使用,也就是在 MaxCompute SQL 中使用;一個是間接的方式,也就是 PyODPS DataFrame,這種方式你不需要直接寫 Python UDF,而是寫普通的 Python 函數或者類。

點此查看原文:http://click.aliyun.com/m/41092/

PyODPS 中使用 Python UDF 包含兩方面,一個是直接使用,也就是在 MaxCompute SQL 中使用;一個是間接的方式,也就是 PyODPS DataFrame,這種方式你不需要直接寫 Python UDF,而是寫普通的 Python 函數或者類。下面我們分開說明。

作為準備工作,我們需要 ODPS 入口,可以通過直接初始化,或者使用 room 機制 加載。

from odps import ODPSo = ODPS('your-access-id', 'your-access-key', 'your-project')

MaxCompute SQL 中使用 Python UDF

首先,我們需要寫一個 Python 文件,假設我們就是把某一列按 csv 格式放的一列轉成 json 格式。

import jsonfrom odps.udf import annotate@annotate('string->string') class Transform(object):def evaluate(self, x):columns = list('abc')d = dict(zip(columns, x.split(',')))return json.dumps(d)

假設這個文件叫 my.py,接下來我們就需要創建 py 資源。

r = o.create_resource('csv_to_json.py', 'py', fileobj=open('my.py'))

fileobj 參數也可以是 str 類型,就是表示文件的內容

接著我們就可以創建 Python UDF 了。

o.create_function('csv_to_json', class_type='csv_to_json.Transform', resources=[r])

這里我們指定了函數名叫 csv_to_json,主類使我們上傳的 csv_to_json.py 文件里的 Transform 類。

現在我們就可以在 MaxCompute SQL 中調用這個 UDF 了。

o.execute_sql('select csv_to_json(raw) from pyodps_test_udf')

這樣我們就完成了在 PyODPS 中使用 MaxCompute SQL + Python UDF 的整個過程。

PyODPS DataFrame

對于 PyODPS DataFrame 來說,用戶只需要寫普通的 Python 函數或者類,在函數或者類里,甚至可以讀取全局變量,這樣給開發帶來了極大的方便。

和上面的例子目標相同,我們定義一個 transform 函數即可。然后我們對于 DataFrame 的一列調用 map 方法來應用這個函數。

passed_columns = list('abc') # 可以從數據庫中讀取或者寫死def transform(x):import jsond = dict(zip(passed_columns, x.split(',')))return json.dumps(d)df.raw.map(transform) In [30]: dfraw 0 1,2,3 1 4,5,6 2 7,8,9In [31]: df.raw.map(transform)raw 0 {"a": "1", "c": "3", "b": "2"} 1 {"a": "4", "c": "6", "b": "5"} 2 {"a": "7", "c": "9", "b": "8"}

實際上,PyODPS DataFrame 在用 MaxCompute 執行的時候,也會創建 Python UDF 來實現這個功能,但用戶不需要去創建文件、資源和函數這些過程,一切都是 Python 原生函數和類,整個過程相當順暢。

另外可以看到,在上面的 my.py 里,我們也是定義了一個 columns 參數的,而如果這個參數是通過變量傳進去的話,在 Python UDF 里非常麻煩,可能常常需要用一些 tricky 的方法,比如寫到某個文件資源,然后在 UDF 里讀取之類的。而對于 DataFrame 來說,完全沒有這個問題,我們可以自由讀取全局變量。

不過要注意的是,這個全局變量是被序列化到各個機器上的,所以你修改它不會全局生效。

好了,還有什么問題可以隨時和我們取得聯系。

文檔:http://pyodps.readthedocs.io/zh_CN/latest/
代碼:https://github.com/aliyun/aliyun-odps-python-sdk ,歡迎提 issue 和 merge request

總結

以上是生活随笔為你收集整理的PyODPS 中使用 Python UDF的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。