日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python 并行计算库_Python 大规模数据存储与读取、并行计算:Dask库简述

發(fā)布時間:2024/10/8 python 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 并行计算库_Python 大规模数据存储与读取、并行计算:Dask库简述 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

本文轉(zhuǎn)自:https://blog.csdn.net/sinat_26917383/article/details/78044437

數(shù)據(jù)結(jié)構(gòu)與pandas非常相似,比較容易理解。

原文文檔:http://dask.pydata.org/en/latest/index.html

github:https://github.com/dask

dask的內(nèi)容很多,挑一些我比較看好的內(nèi)容著重點一下。

一、數(shù)據(jù)讀取與存儲

先來看看dask能讀入哪些內(nèi)容:

1、csv

# pandas

import pandas as pd

df = pd.read_csv('2015-01-01.csv')

df.groupby(df.user_id).value.mean()

#dask

import dask.dataframe as dd

df = dd.read_csv('2015-*-*.csv')

df.groupby(df.user_id).value.mean().compute()

非常相似,除了.compute()

.

2、Dask Array讀取hdf5

import numpy as np import dask.array as da

f = h5py.File('myfile.hdf5') f = h5py.File('myfile.hdf5')

x = np.array(f['/small-data']) x = da.from_array(f['/big-data'],

chunks=(1000, 1000))

x - x.mean(axis=1) x - x.mean(axis=1).compute()

左是Pandas,右邊是dask

3、Dask Bag

import dask.bag as db

b = db.read_text('2015-*-*.json.gz').map(json.loads)

b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()

讀取大規(guī)模json文件,幾億都很easy

>>> b = db.read_text('myfile.txt')

>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])

>>> b = db.read_text('myfile.*.txt')

讀取txt

>>> import dask.bag as db

>>> b = db.from_sequence([{'name': 'Alice',?? 'balance': 100},

...?????????????????????? {'name': 'Bob',???? 'balance': 200},

...?????????????????????? {'name': 'Charlie', 'balance': 300}],

...????????????????????? npartitions=2)

>>> df = b.to_dataframe()

變?yōu)閐ataframe格式的內(nèi)容

4、Dask Delayed 并行計算

from dask import delayed

L = []

for fn in filenames:????????????????? # Use for loops to build up computation

data = delayed(load)(fn)????????? # Delay execution of function

L.append(delayed(process)(data))? # Build connections between variables

result = delayed(summarize)(L)

result.compute()

5、concurrent.futures自定義任務(wù)

from dask.distributed import Client

client = Client('scheduler:port')

futures = []

for fn in filenames:

future = client.submit(load, fn)

futures.append(future)

summary = client.submit(summarize, futures)

summary.result()

二、Delayed 并行計算模塊

一個先行例子,本來的案例:

def inc(x):

return x + 1

def double(x):

return x + 2

def add(x, y):

return x + y

data = [1, 2, 3, 4, 5]

output = []

for x in data:

a = inc(x)

b = double(x)

c = add(a, b)

output.append(c)

total = sum(output)

再來看看用delay加速的:

這里寫圖片描述

from dask import delayed

output = []

for x in data:

a = delayed(inc)(x)

b = delayed(double)(x)

c = delayed(add)(a, b)

output.append(c)

total = delayed(sum)(output)

還可以將計算流程可視化:

total.visualize()? # see image to the right

三、和SKLearn結(jié)合的并行算法

廣義回歸GLM:https://github.com/dask/dask-glm

tensorflow深度學(xué)習(xí)庫:Dask-Tensorflow

以XGBoost為例,官方:https://github.com/dask/dask-xgboost

來看一個案例code

.

1、加載數(shù)據(jù)

import dask.dataframe as dd

# Subset of the columns to use

cols = ['Year', 'Month', 'DayOfWeek', 'Distance',

'DepDelay', 'CRSDepTime', 'UniqueCarrier', 'Origin', 'Dest']

# Create the dataframe

df = dd.read_csv('s3://dask-data/airline-data/20*.csv', usecols=cols,

storage_options={'anon': True})

df = df.sample(frac=0.2) # we blow out ram otherwise

is_delayed = (df.DepDelay.fillna(16) > 15)

df['CRSDepTime'] = df['CRSDepTime'].clip(upper=2399)

del df['DepDelay']

df, is_delayed = persist(df, is_delayed)

progress(df, is_delayed)

2、One hot encode編碼

df2 = dd.get_dummies(df.categorize()).persist()

這里寫圖片描述

.

3、準(zhǔn)備訓(xùn)練集和測試集 + 訓(xùn)練

data_train, data_test = df2.random_split([0.9, 0.1],

random_state=1234)

labels_train, labels_test = is_delayed.random_split([0.9, 0.1],

random_state=1234)

訓(xùn)練

import dask_xgboost as dxgb

params = {'objective': 'binary:logistic', 'nround': 1000,

'max_depth': 16, 'eta': 0.01, 'subsample': 0.5,

'min_child_weight': 1}

bst = dxgb.train(client, params, data_train, labels_train)

bst

4、預(yù)測

# Use normal XGBoost model with normal Pandas

import xgboost as xgb

dtest = xgb.DMatrix(data_test.head())

bst.predict(dtest)

predictions = dxgb.predict(client, bst, data_test).persist()

predictions.head()

.

5、模型評估

from sklearn.metrics import roc_auc_score, roc_curve

print(roc_auc_score(labels_test.compute(),

predictions.compute()))

import matplotlib.pyplot as plt

%matplotlib inline

fpr, tpr, _ = roc_curve(labels_test.compute(), predictions.compute())

# Taken from http://scikit-learn.org/stable/auto_examples/model_selection/plot_roc.html#sphx-glr-auto-examples-model-selection-plot-roc-py

plt.figure(figsize=(8, 8))

lw = 2

plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve')

plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')

plt.xlim([0.0, 1.0])

plt.ylim([0.0, 1.05])

plt.xlabel('False Positive Rate')

plt.ylabel('True Positive Rate')

plt.title('Receiver operating characteristic example')

plt.legend(loc="lower right")

plt.show()

.

四、計算流程可視化部分——Dask.array

來源:https://gist.github.com/mrocklin/b61f795004ec0a70e43de350e453e97e

import numpy as np

import dask.array as da

x = da.ones(15, chunks=(5,))

x.visualize('dask.svg')

(x + 1).sum().visualize('dask.svg')

來一個二維模塊的:

x = da.ones((15, 15), chunks=(5, 5))

x.visualize('dask.svg')

(x.dot(x.T + 1) - x.mean(axis=0)).std().visualize('dask.svg')

---------------------

作者:悟乙己

來源:CSDN

原文:https://blog.csdn.net/sinat_26917383/article/details/78044437

版權(quán)聲明:本文為博主原創(chuàng)文章,轉(zhuǎn)載請附上博文鏈接!

總結(jié)

以上是生活随笔為你收集整理的python 并行计算库_Python 大规模数据存储与读取、并行计算:Dask库简述的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。