以5个数据库为例,用Python实现数据的提取、转换和加载(ETL)
導(dǎo)讀:每個(gè)數(shù)據(jù)科學(xué)專業(yè)人員都必須從不同的數(shù)據(jù)源中提取、轉(zhuǎn)換和加載(Extract-Transform-Load,ETL)數(shù)據(jù)。
本文將討論如何使用Python為選定的流行數(shù)據(jù)庫實(shí)現(xiàn)數(shù)據(jù)的ETL。對(duì)于關(guān)系數(shù)據(jù)庫,選擇MySQL,并將Elasticsearch作為文檔數(shù)據(jù)庫的例子展開。對(duì)于圖形數(shù)據(jù)庫,選擇Neo4j。對(duì)于NoSQL,可參考此前文章中介紹的MongoDB。
作者:薩揚(yáng)·穆霍帕迪亞(Sayan Mukhopadhyay)
如需轉(zhuǎn)載請(qǐng)聯(lián)系大數(shù)據(jù)(ID:hzdashuju)
ElasticSearch是一個(gè)基于Lucene的搜索服務(wù)器。它提供了一個(gè)分布式多用戶能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java開發(fā)的,并作為Apache許可條款下的開放源碼發(fā)布,是當(dāng)前流行的企業(yè)級(jí)搜索引擎。
Neo4j是一個(gè)高性能的,NOSQL圖形數(shù)據(jù)庫,它將結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ)在網(wǎng)絡(luò)上(從數(shù)學(xué)角度叫做圖)而不是表中,是一個(gè)嵌入式的、基于磁盤的、具備完全的事務(wù)特性的Java持久化引擎。
01 MySQL
MySQLdb是在MySQL C接口上面開發(fā)的Python API。
1. 如何安裝MySQLdb
首先,需要在計(jì)算機(jī)上安裝Python MySQLdb模塊。然后運(yùn)行以下腳本:
import?MySQLdb
如果出現(xiàn)導(dǎo)入錯(cuò)誤,則表示模塊未正確安裝。
以下是安裝MySQL Python模塊的說明:
$tar?–xvf?MySQL-python-1.2.2.tar
$cd?MySQL-python-1.2.2
$python?setup.py?build
$python?setup.py?install
2. 數(shù)據(jù)庫連接
在連接到MySQL數(shù)據(jù)庫之前,請(qǐng)確保有以下內(nèi)容。
有一個(gè)名為TEST的數(shù)據(jù)庫。
在TEST數(shù)據(jù)庫中有一個(gè)表STUDENT。
STUDENT表有三個(gè)字段:NAME、SUR_NAME和ROLL_NO。
用戶對(duì)TEST數(shù)據(jù)庫具有完全訪問權(quán)限。
3. INSERT操作
以下代碼執(zhí)行SQL INSERT語句,以便在STUDENT表中創(chuàng)建記錄:
import?MySQLdb
#?Open?database?connection
db?=?MySQLdb.connect("localhost","user","passwd","TEST"?)
#?prepare?a?cursor?object?using?cursor()?method
cursor?=?db.cursor()
#?Prepare?SQL?query?to?INSERT?a?record?into?the?database.
sql?=?"""INSERT?INTO?STUDENT(NAME,
?????????SUR_NAME,?ROLL_NO)
?????????VALUES?('Sayan',?'Mukhopadhyay',?1)"""
try:
???#?Execute?the?SQL?command
cursor.execute(sql)
???#?Commit?your?changes?in?the?database
???db.commit()
except:
???#?Rollback?in?case?there?is?any?error
???db.rollback()
#?disconnect?from?server
db.close()
4. READ操作
以下代碼從STUDENT表中提取數(shù)據(jù)并打印出來:
import?MySQLdb
#?Open?database?connection
db?=?MySQLdb.connect("localhost","user","passwd","TEST"?)
#?prepare?a?cursor?object?using?cursor()?method
cursor?=?db.cursor()
#?Prepare?SQL?query?to?INSERT?a?record?into?the?database.
sql?=?"SELECT?*?FROM?STUDENT?"
try:
???#?Execute?the?SQL?command
cursor.execute(sql)
???#?Fetch?all?the?rows?in?a?list?of?lists.
results?=?cursor.fetchall()
for?row?in?results:
fname?=?row[0]
lname?=?row[1]
id?=?row[2]
??????#?Now?print?fetched?result
print?"name=%s,surname=%s,id=%d"?%?\
?????????????(fname,?lname,?id?)
except:
print?"Error:?unable?to?fecth?data"
#?disconnect?from?server
db.close()
5. DELETE操作
以下代碼從TEST中刪除id=1的一行數(shù)據(jù):
import?MySQLdb
#?Open?database?connection
db?=?MySQLdb.connect("localhost","test","passwd","TEST")
#prepare?a?cursor?object?using?cursor()?method
cursor?=?db.cursor()
#?PrepareSQL?query?to?DELETE?required?records
sql="DELETE?FROM?STUDENT?WHERE?ROLL_NO=1"
try:
#Execute?the?SQL?command?
cursor.execute(sql)
#Commit?your?changes?in?the?database
db.commit()
except:
#Roll?back?in?case?there?is?any?error
db.rollback()
#disconnect?from?server?
db.close()
6. UPDATE操作
以下代碼將lastname為Mukhopadhyay的記錄更改為Mukherjee:
import?MySQLdb
#?Open?database?connection
db?=?MySQLdb.connect("localhost","user","passwd","TEST"?)
#?prepare?a?cursor?object?using
cursor()?method?cursor?=?db.cursor()
#?Prepare?SQL?query?to?UPDATE?required?records
sql?=?"UPDATE?STUDENT?SET?SUR_NAME="Mukherjee"
??????????????????????????WHERE?SUR_NAME="Mukhopadhyay""
try:
???#?Execute?the?SQL?command
cursor.execute(sql)
???#?Commit?your?changes?in?the?database
db.commit()
except:
???#?Rollback?in?case?there?is?any?error
db.rollback()
#?disconnect?from?server
db.close()
7. COMMIT操作
提交操作提供對(duì)數(shù)據(jù)庫完成修改命令,并且在此操作之后,無法將其還原。
8. ROLL-BACK操作
如果不能確認(rèn)對(duì)數(shù)據(jù)的修改同時(shí)想要撤回操作,可以使用roll-back()方法。
以下是通過Python訪問MySQL數(shù)據(jù)的完整示例。它將提供將數(shù)據(jù)存儲(chǔ)為CSV文件或MySQL數(shù)據(jù)庫中的數(shù)據(jù)的完整描述。
import?sys
out?=?open('Config1.txt','w')
print?"Enter?the?Data?Source?Type:"
print?"1.?MySql"
print?"2.?Text"
print?"3.?Exit"
while(1):
???????data1?=?sys.stdin.readline().strip()
???????if(int(data1)?==?1):
?????????????out.write("source?begin"+"\n"+"type=mysql\n")
?????????????print?"Enter?the?ip:"
?????????????ip?=?sys.stdin.readline().strip()
?????????????out.write("host="?+?ip?+?"\n")
?????????????print?"Enter?the?database?name:"
?????????????db?=?sys.stdin.readline().strip()
?????????????out.write("database="?+?db?+?"\n")
?????????????print?"Enter?the?user?name:"
?????????????usr?=?sys.stdin.readline().strip()
?????????????out.write("user="?+?usr?+?"\n")
?????????????print?"Enter?the?password:"
?????????????passwd?=?sys.stdin.readline().strip()
?????????????out.write("password="?+?passwd?+?"\n")
?????????????connection?=?MySQLdb.connect(ip,?usr,?passwd,?db)
?????????????cursor?=?connection.cursor()
?????????????query?=?"show?tables"
?????????????cursor.execute(query)
?????????????data?=?cursor.fetchall()
?????????????tables?=?[]
?????????????for?row?in?data:
????????????????????for?field?in?row:
???????????????????????????tables.append(field.strip())
?????????????for?i?in?range(len(tables)):
????????????????????print?i,?tables[i]
?????????????tb?=?tables[int(sys.stdin.readline().strip())]
?????????????out.write("table="?+?tb?+?"\n")
?????????????query?=?"describe?"?+?tb
?????????????cursor.execute(query)
?????????????data?=?cursor.fetchall()
?????????????columns?=?[]
?????????????for?row?in?data:
????????????????????columns.append(row[0].strip())
?????????????for?i?in?range(len(columns)):
????????????????????print?columns[i]?
?????????????print?"Not?index?choose?the?exact?column?names?seperated?by?coma"
?????????????cols?=?sys.stdin.readline().strip()
?????????????out.write("columns="?+?cols?+?"\n")
?????????????cursor.close()
?????????????connection.close()
?????????????out.write("source?end"+"\n")
?????????????print?"Enter?the?Data?Source?Type:"
?????????????print?"1.?MySql"
?????????????print?"2.?Text"
?????????????print?"3.?Exit"
???????if(int(data1)?==?2):
?????????????print?"path?of?text?file:"
?????????????path?=?sys.stdin.readline().strip()
?????????????file?=?open(path)
?????????????count?=?0
?????????????for?line?in?file:
????????????????????print?line
????????????????????count?=?count?+?1
????????????????????if?count?>?3:
??????????????????????????break
?????????????file.close()
?????????????out.write("source?begin"+"\n"+"type=text\n")
?????????????out.write("path="?+?path?+?"\n")
?????????????print?"enter?delimeter:"
?????????????dlm?=?sys.stdin.readline().strip()
?????????????out.write("dlm="?+?dlm?+?"\n")
?????????????print?"enter?column?indexes?seperated?by?comma:"
?????????????cols?=?sys.stdin.readline().strip()
?????????????out.write("columns="?+?cols?+?"\n")
?????????????out.write("source?end"+"\n")
?????????????print?"Enter?the?Data?Source?Type:"
?????????????print?"1.?MySql"
?????????????print?"2.?Text"
?????????????print?"3.?Exit"
???????if(int(data1)?==?3):
?????????????out.close()
?????????????sys.exit()
02 Elasticsearch
Elasticsearch(ES)低級(jí)客戶端提供從Python到ES REST端點(diǎn)的直接映射。Elasticsearch的一大優(yōu)勢(shì)是為數(shù)據(jù)分析提供了全棧解決方案。Elasticsearch作為數(shù)據(jù)庫,有可配置前端Kibana、數(shù)據(jù)收集工具Logstash以及企業(yè)安全工具Shield。
下例具有稱為cat、cluster、indices、ingest、nodes、snapshot和tasks的特征,根據(jù)任務(wù)分別轉(zhuǎn)換為CatClient、ClusterClient、IndicesClient、IngestClient、NodesClient、SnapshotClient和TasksClient實(shí)例。這些實(shí)例是訪問這些類及其方法的唯一方式。
你可以指定自己的連接類,可以通過提供的connection_class參數(shù)來使用。
Es1=Elasticsearch(connection_class=ThriftConnection)
如果你想打開sniffing,那么有幾個(gè)選擇:
#?the?list?of?active?nodes.?Start?with?nodes?running?on?'esnode1'?and
#?'esnode2'
Es1=Elasticsearch(
????['esnode1',?'esnode2'],
#?sniff?before?doing?anything
sniff_on_start=True,
#?refresh?nodes?after?a?node?fails?to?respond
sniff_on_connection_fail=True,
#?and?also?every?30?seconds
sniffer_timeout=30
)
不同的主機(jī)可以有不同的參數(shù),你可以為每個(gè)節(jié)點(diǎn)使用一個(gè)字典來指定它們。
#?and?an?url_prefix.?Note?that?``port``?needs?to?be?an?int.
Es1=Elasticsearch([
{'host':'localhost'},
{'host':'othernode','port':443,'url_prefix':'es','use_ssl':True},
])
還支持SSL客戶端身份驗(yàn)證(有關(guān)選項(xiàng)的詳細(xì)說明,請(qǐng)參閱Urllib3HttpConnection)。
['localhost:443','other_host:443'],
#?turn?on?SSL
use_ssl=True,
#?make?sure?we?verify?SSL?certificates?(off?by?default)
verify_certs=True,
#?provide?a?path?to?CA?certs?on?disk
ca_certs='path?to?CA_certs',
#?PEM?formatted?SSL?client?certificate
client_cert='path?to?clientcert.pem',
#?PEM?formatted?SSL?client?key
client_key='path?to?clientkey.pem'
)
連接層API
許多類負(fù)責(zé)處理Elasticsearch集群。這里可以通過將參數(shù)傳遞給Elasticsearch類來忽略正在使用的默認(rèn)子類。屬于客戶端的每個(gè)參數(shù)都將添加到Transport、ConnectionPool和Connection上。
例如,如果你要使用定制的ConnectionSelector類,只需傳入selector_class參數(shù)即可。
整個(gè)API以很高的精確度包裝了原始REST API,其中包括區(qū)分調(diào)用必需參數(shù)和可選參數(shù)。這意味著代碼區(qū)分了按排位的參數(shù)和關(guān)鍵字參數(shù)。建議讀者使用關(guān)鍵字參數(shù)來保證所有調(diào)用的一致性和安全性。
如果Elasticsearch返回2XX,則API調(diào)用成功(并將返回響應(yīng))。否則,將引發(fā)TransportError(或更具體的子類)的實(shí)例。你可以在異常中查看其他異常和錯(cuò)誤狀態(tài)。如果你不希望引發(fā)異常,可以通過傳入ignore參數(shù)忽略狀態(tài)代碼或狀態(tài)代碼列表。
es=Elasticsearch()
#?ignore?400?cause?by?IndexAlreadyExistsException?when?creating?an?index
es.indices.create(index='test-index',ignore=400)
#?ignore?404?and?400
es.indices.delete(index='test-index',ignore=[400,404])
03 Neo4j Python驅(qū)動(dòng)
Neo4j支持Neo4j Python驅(qū)動(dòng),并通過二進(jìn)制協(xié)議與數(shù)據(jù)庫連接。它試圖保持簡約及Python的慣用方式。
from?neo4j.v1?import?GraphDatabase,?basic_auth
driver11?=?GraphDatabase.driver("bolt://localhost",?auth=basic_auth("neo4j",?"neo4j"))
session11?=?driver11.session()
session11.run("CREATE?(a:Person?{name:'Sayan',title:'Mukhopadhyay'})")
result11=?session11.run("MATCH?(a:Person)?WHERE?a.name?='Sayan'?RETURN?a.name?AS?name,?a.title?AS?title")
for?recordi?n?result11:
print("%s?%s"%?(record["title"],?record["name"]))
session11.close()
04 neo4j-rest-client
neo4j-rest-client的主要目標(biāo)是確保已經(jīng)使用本地Neo4j的Python程序員通過python-embedded的方式也能夠訪問Neo4j REST服務(wù)器。因此,neo4j-rest-client API的結(jié)構(gòu)與python-embedded完全同步。但是引入了一種新的結(jié)構(gòu),以達(dá)到更加Python化的風(fēng)格,并通過Neo4j團(tuán)隊(duì)引入的新特性來增強(qiáng)API。
05 內(nèi)存數(shù)據(jù)庫
另一個(gè)重要的數(shù)據(jù)庫類是內(nèi)存數(shù)據(jù)庫。它在RAM中存儲(chǔ)和處理數(shù)據(jù)。因此,對(duì)數(shù)據(jù)庫的操作非常快,并且數(shù)據(jù)是靈活的。SQLite是內(nèi)存數(shù)據(jù)庫的一個(gè)流行范例。在Python中,需要使用sqlalchemy庫來操作SQLite。在第1章的Flask和Falcon示例中,展示了如何從SQLite中選擇數(shù)據(jù)。以下將展示如何在SQLite中存儲(chǔ)Pandas數(shù)據(jù)框架:
import?sqlite3
conn?=?sqlite3.connect('multiplier.db')
conn.execute('''CREATE?TABLE?if?not?exists?multiplier
???????(domain????????CHAR(50),
????????low????????REAL,
????????high????????REAL);''')
conn.close()
db_name?=?"sqlite:///"?+?prop?+?"_"?+?domain?+?str(i)?+?".db"
disk_engine?=?create_engine(db_name)
df.to_sql('scores',?disk_engine,?if_exists='replace')
06 Python版本MongoDB
這部分內(nèi)容請(qǐng)見此前的文章數(shù)據(jù)處理入門干貨:MongoDB和pandas極簡教程。
關(guān)于作者:Sayan Mukhopadhyay擁有超過13年的行業(yè)經(jīng)驗(yàn),并與瑞信、PayPal、CA Technologies、CSC和Mphasis等公司建立了聯(lián)系。他對(duì)投資銀行、在線支付、在線廣告、IT架構(gòu)和零售等領(lǐng)域的數(shù)據(jù)分析應(yīng)用有著深刻的理解。他的專業(yè)領(lǐng)域是在分布式和數(shù)據(jù)驅(qū)動(dòng)的環(huán)境(如實(shí)時(shí)分析、高頻交易等)中,實(shí)現(xiàn)高性能計(jì)算。
本文摘編自《Python高級(jí)數(shù)據(jù)分析:機(jī)器學(xué)習(xí)、深度學(xué)習(xí)和NLP實(shí)例》,經(jīng)出版方授權(quán)發(fā)布。
延伸閱讀《Python高級(jí)數(shù)據(jù)分析》
點(diǎn)擊上圖了解及購買
轉(zhuǎn)載請(qǐng)聯(lián)系微信:DoctorData
推薦語:本書介紹高級(jí)數(shù)據(jù)分析概念的廣泛基礎(chǔ),以及最近的數(shù)據(jù)庫革命,如Neo4j、彈性搜索和MongoDB。本書討論了如何實(shí)現(xiàn)包括局部爬取在內(nèi)的ETL技術(shù),并應(yīng)用于高頻算法交易和目標(biāo)導(dǎo)向的對(duì)話系統(tǒng)等領(lǐng)域。還有一些機(jī)器學(xué)習(xí)概念的例子,如半監(jiān)督學(xué)習(xí)、深度學(xué)習(xí)和NLP。本書還涵蓋了重要的傳統(tǒng)數(shù)據(jù)分析技術(shù),如時(shí)間序列和主成分分析等。
你想免費(fèi)讀這本《Python高級(jí)數(shù)據(jù)分析:機(jī)器學(xué)習(xí)、深度學(xué)習(xí)和NLP實(shí)例》嗎?
「大數(shù)據(jù)」內(nèi)容合伙人之「鑒書小分隊(duì)」上線啦!
最近,你都在讀什么書?有哪些心得體會(huì)想要跟大家分享?
數(shù)據(jù)叔最近搞了個(gè)大事——聯(lián)合優(yōu)質(zhì)圖書出版商機(jī)械工業(yè)出版社華章公司發(fā)起鑒書活動(dòng)。
簡單說就是:你可以免費(fèi)讀新書,你可以免費(fèi)讀新書的同時(shí),順手碼一篇讀書筆記就行。詳情請(qǐng)?jiān)诖髷?shù)據(jù)公眾號(hào)后臺(tái)對(duì)話框回復(fù)合伙人查看。
京東圖書大促開始啦!
每滿100-50!滿200-100!滿300-150!
長按二維碼進(jìn)入唯一領(lǐng)券入口,領(lǐng)取大數(shù)據(jù)粉絲專屬“200減35”疊加滿減券。
是的!你沒看錯(cuò)!相當(dāng)于165塊買400塊的書!!!(打了多少折你敲敲旁邊的計(jì)算器……)
長按下方二維碼或點(diǎn)擊閱讀原文
發(fā)現(xiàn)更多好書
據(jù)統(tǒng)計(jì),99%的大咖都完成了這個(gè)神操作
▼
更多精彩
在公眾號(hào)后臺(tái)對(duì)話框輸入以下關(guān)鍵詞
查看更多優(yōu)質(zhì)內(nèi)容!
PPT?|?報(bào)告?|?讀書?|?書單?|?干貨?
大數(shù)據(jù)?|?揭秘?|?Python?|?可視化
人工智能?|?機(jī)器學(xué)習(xí)?|?深度學(xué)習(xí)?|?神經(jīng)網(wǎng)絡(luò)
AI?|?1024?|?段子?|?區(qū)塊鏈?|?數(shù)學(xué)
猜你想看
如果你不想長期996,看看這個(gè)
數(shù)據(jù)采集技術(shù)揭秘:手把手教你全埋點(diǎn)技術(shù)解決方案
那些“反人類”的用戶體驗(yàn),都錯(cuò)在哪了?
一文看懂?dāng)?shù)據(jù)挖掘:哪一種方法最好?都需要哪些技術(shù)?
Q: 你常用哪些數(shù)據(jù)庫?
歡迎留言并贏取福利
覺得不錯(cuò),請(qǐng)把這篇文章分享給你的朋友
轉(zhuǎn)載 / 投稿請(qǐng)聯(lián)系:baiyu@hzbook.com
更多精彩,請(qǐng)?jiān)诤笈_(tái)點(diǎn)擊“歷史文章”查看
點(diǎn)擊閱讀原文,了解更多
總結(jié)
以上是生活随笔為你收集整理的以5个数据库为例,用Python实现数据的提取、转换和加载(ETL)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么是架构?网络架构中都有什么?终于有人
- 下一篇: 小学生手写Python程序解魔方!这是高