Python 多线程抓取网页
Python 多線程抓取網(wǎng)頁 - 糖拌咸魚 - 博客園
Python 多線程抓取網(wǎng)頁
?? 最近,一直在做網(wǎng)絡(luò)爬蟲相關(guān)的東西。 看了一下開源C++寫的larbin爬蟲,仔細閱讀了里面的設(shè)計思想和一些關(guān)鍵技術(shù)的實現(xiàn)。
1、larbin的URL去重用的很高效的bloom filter算法;
2、DNS處理,使用的adns異步的開源組件;
3、對于url隊列的處理,則是用部分緩存到內(nèi)存,部分寫入文件的策略。
4、larbin對文件的相關(guān)操作做了很多工作
5、在larbin里有連接池,通過創(chuàng)建套接字,向目標(biāo)站點發(fā)送HTTP協(xié)議中GET方法,獲取內(nèi)容,再解析header之類的東西
6、大量描述字,通過poll方法進行I/O復(fù)用,很高效
7、larbin可配置性很強
8、作者所使用的大量數(shù)據(jù)結(jié)構(gòu)都是自己從最底層寫起的,基本沒用STL之類的東西
......
還有很多,以后有時間在好好寫篇文章,總結(jié)下。
?? 這兩天,用python寫了個多線程下載頁面的程序,對于I/O密集的應(yīng)用而言,多線程顯然是個很好的解決方案。剛剛寫過的線程池,也正好可以利用上了。其實用python爬取頁面非常簡單,有個urllib2的模塊,使用起來很方便,基本兩三行代碼就可以搞定。雖然使用第三方模塊,可以很方便的解決問題,但是對個人的技術(shù)積累而言沒有什么好處,因為關(guān)鍵的算法都是別人實現(xiàn)的,而不是你自己實現(xiàn)的,很多細節(jié)的東西,你根本就無法了解。 我們做技術(shù)的,不能一味的只是用別人寫好的模塊或是api,要自己動手實現(xiàn),才能讓自己學(xué)習(xí)得更多。
? 我決定從socket寫起,也是去封裝GET協(xié)議,解析header,而且還可以把DNS的解析過程單獨處理,例如DNS緩存一下,所以這樣自己寫的話,可控性更強,更有利于擴展。對于timeout的處理,我用的全局的5秒鐘的超時處理,對于重定位(301or302)的處理是,最多重定位3次,因為之前測試過程中,發(fā)現(xiàn)很多站點的重定位又定位到自己,這樣就無限循環(huán)了,所以設(shè)置了上限。具體原理,比較簡單,直接看代碼就好了。
?? 自己寫完之后,與urllib2進行了下性能對比,自己寫的效率還是比較高的,而且urllib2的錯誤率稍高一些,不知道為什么。網(wǎng)上有人說urllib2在多線程背景下有些小問題,具體我也不是特別清楚。
先貼代碼:
fetchPage.py? 使用Http協(xié)議的Get方法,進行頁面下載,并存儲為文件
?| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 | '''Created on 2012-3-13Get Page using GET methodDefault using HTTP Protocol , http port 80@author: xiaojay'''import socketimport statisticsimport datetimeimport threadingsocket.setdefaulttimeout(statistics.timeout)class Error404(Exception):????'''Can not find the page.'''????passclass ErrorOther(Exception):????'''Some other exception'''????def __init__(self,code):????????#print 'Code :',code????????passclass ErrorTryTooManyTimes(Exception):????'''try too many times'''????passdef downPage(hostname ,filename , trytimes=0):????try :????????#To avoid too many tries .Try times can not be more than max_try_times????????if trytimes >= statistics.max_try_times : ????????????raise ErrorTryTooManyTimes????except ErrorTryTooManyTimes :????????return statistics.RESULTTRYTOOMANY,hostname+filename????try:????????s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) ????????#DNS cache????????if statistics.DNSCache.has_key(hostname):????????????addr = statistics.DNSCache[hostname]????????else:????????????addr = socket.gethostbyname(hostname)????????????statistics.DNSCache[hostname] = addr????????#connect to http server ,default port 80????????s.connect((addr,80))????????msg? = 'GET '+filename+' HTTP/1.0\r\n'????????msg += 'Host: '+hostname+'\r\n'????????msg += 'User-Agent:xiaojay\r\n\r\n'????????code = '' ????????f = None????????s.sendall(msg)????????first = True????????while True:????????????msg = s.recv(40960)????????????if not len(msg):????????????????if f!=None:????????????????????f.flush()????????????????????f.close()????????????????break????????????# Head information must be in the first recv buffer????????????if first:????????????????first = False???????????????????????????????headpos = msg.index("\r\n\r\n")????????????????code,other = dealwithHead(msg[:headpos])????????????????if code=='200':????????????????????#statistics.fetched_url += 1????????????????????f = open('pages/'+str(abs(hash(hostname+filename))),'w')????????????????????f.writelines(msg[headpos+4:])????????????????elif code=='301' or code=='302':????????????????????#if code is 301 or 302 , try down again using redirect location????????????????????if other.startswith("http") :??????????????? ????????????????????????hname, fname = parse(other)????????????????????????downPage(hname,fname,trytimes+1)#try again????????????????????else :????????????????????????downPage(hostname,other,trytimes+1)????????????????elif code=='404':????????????????????raise Error404????????????????else : ????????????????????raise ErrorOther(code)????????????else:????????????????if f!=None :f.writelines(msg)????????s.shutdown(socket.SHUT_RDWR)????????s.close()????????return statistics.RESULTFETCHED,hostname+filename????except Error404 :????????return statistics.RESULTCANNOTFIND,hostname+filename????except ErrorOther:????????return statistics.RESULTOTHER,hostname+filename????except socket.timeout:????????return statistics.RESULTTIMEOUT,hostname+filename????except Exception, e:????????return statistics.RESULTOTHER,hostname+filenamedef dealwithHead(head):????'''deal with HTTP HEAD'''????lines = head.splitlines()????fstline = lines[0]????code =fstline.split()[1]????if code == '404' : return (code,None)????if code == '200' : return (code,None)????if code == '301' or code == '302' : ????????for line in lines[1:]:????????????p = line.index(':')????????????key = line[:p]????????????if key=='Location' :????????????????return (code,line[p+2:])????return (code,None)?????def parse(url):????'''Parse a url to hostname+filename'''????try:????????u = url.strip().strip('\n').strip('\r').strip('\t')????????if u.startswith('http://') :????????????u = u[7:]????????elif u.startswith('https://'):????????????u = u[8:]????????if u.find(':80')>0 :????????????p = u.index(':80')????????????p2 = p + 3????????else:????????????if u.find('/')>0:????????????????p = u.index('/') ????????????????p2 = p????????????else:????????????????p = len(u)????????????????p2 = -1????????hostname = u[:p]????????if p2>0 :????????????filename = u[p2:]????????else : filename = '/'????????return hostname, filename????except Exception ,e:????????print "Parse wrong : " , url????????print edef PrintDNSCache():????'''print DNS dict'''????n = 1????for hostname in statistics.DNSCache.keys():????????print n,'\t',hostname, '\t',statistics.DNSCache[hostname]????????n+=1def dealwithResult(res,url):????'''Deal with the result of downPage'''????statistics.total_url+=1????if res==statistics.RESULTFETCHED :????????statistics.fetched_url+=1????????print statistics.total_url , '\t fetched :', url????if res==statistics.RESULTCANNOTFIND :????????statistics.failed_url+=1????????print "Error 404 at : ", url????if res==statistics.RESULTOTHER :????????statistics.other_url +=1????????print "Error Undefined at : ", url????if res==statistics.RESULTTIMEOUT :????????statistics.timeout_url +=1????????print "Timeout ",url????if res==statistics.RESULTTRYTOOMANY:????????statistics.trytoomany_url+=1????????print e ,"Try too many times at", urlif __name__=='__main__':??? ????print? 'Get Page using GET method' |
下面,我將利用上一篇的線程池作為輔助,實現(xiàn)多線程下的并行爬取,并用上面自己寫的下載頁面的方法和urllib2進行一下性能對比。
?| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 | '''Created on 2012-3-16@author: xiaojay'''import fetchPageimport threadpoolimport datetimeimport statisticsimport urllib2'''one thread'''def usingOneThread(limit):????urlset = open("input.txt","r")????start = datetime.datetime.now()????for u in urlset:????????if limit <= 0 : break????????limit-=1????????hostname , filename = parse(u)????????res= fetchPage.downPage(hostname,filename,0)????????fetchPage.dealwithResult(res)????end = datetime.datetime.now()????print "Start at :\t" , start????print "End at :\t" , end????print "Total Cost :\t" , end - start????print 'Total fetched :', statistics.fetched_url?????'''threadpoll and GET method'''def callbackfunc(request,result):????fetchPage.dealwithResult(result[0],result[1])def usingThreadpool(limit,num_thread):????urlset = open("input.txt","r")????start = datetime.datetime.now()????main = threadpool.ThreadPool(num_thread)????for url in urlset :????????try :????????????hostname , filename = fetchPage.parse(url)????????????req = threadpool.WorkRequest(fetchPage.downPage,args=[hostname,filename],kwds={},callback=callbackfunc)????????????main.putRequest(req)????????except Exception:????????????print Exception.message??????? ????while True:????????try:????????????main.poll()????????????if statistics.total_url >= limit : break????????except threadpool.NoResultsPending:????????????print "no pending results"????????????break????????except Exception ,e:????????????print e????end = datetime.datetime.now()????print "Start at :\t" , start??? ????print "End at :\t" , end????print "Total Cost :\t" , end - start????print 'Total url :',statistics.total_url????print 'Total fetched :', statistics.fetched_url????print 'Lost url :', statistics.total_url - statistics.fetched_url????print 'Error 404 :' ,statistics.failed_url????print 'Error timeout :',statistics.timeout_url????print 'Error Try too many times ' ,statistics.trytoomany_url????print 'Error Other faults ',statistics.other_url????main.stop()'''threadpool and urllib2 '''def downPageUsingUrlib2(url):????try:????????req = urllib2.Request(url)????????fd = urllib2.urlopen(req)????????f = open("pages3/"+str(abs(hash(url))),'w')????????f.write(fd.read())????????f.flush()????????f.close()????????return url ,'success'????except Exception:????????return url , None?????def writeFile(request,result):????statistics.total_url += 1????if result[1]!=None :????????statistics.fetched_url += 1????????print statistics.total_url,'\tfetched :', result[0],????else:????????statistics.failed_url += 1????????print statistics.total_url,'\tLost :',result[0],def usingThreadpoolUrllib2(limit,num_thread):????urlset = open("input.txt","r")????start = datetime.datetime.now()?? ????main = threadpool.ThreadPool(num_thread)??? ?????????for url in urlset :????????try :????????????req = threadpool.WorkRequest(downPageUsingUrlib2,args=[url],kwds={},callback=writeFile)????????????main.putRequest(req)????????except Exception ,e:????????????print e?????????????while True:????????try:????????????main.poll()????????????if statistics.total_url? >= limit : break????????except threadpool.NoResultsPending:????????????print "no pending results"????????????break????????except Exception ,e:????????????print e ????end = datetime.datetime.now()??? ????print "Start at :\t" , start ????print "End at :\t" , end????print "Total Cost :\t" , end - start????print 'Total url :',statistics.total_url????print 'Total fetched :', statistics.fetched_url????print 'Lost url :', statistics.total_url - statistics.fetched_url????main.stop()if __name__ =='__main__':????'''too slow'''????#usingOneThread(100)????'''use Get method'''????#usingThreadpool(3000,50)????'''use urllib2'''????usingThreadpoolUrllib2(3000,50) |
?
實驗分析:
實驗數(shù)據(jù):larbin抓取下來的3000條url,經(jīng)過Mercator隊列模型(我用c++實現(xiàn)的,以后有機會發(fā)個blog)處理后的url集合,具有隨機和代表性。使用50個線程的線程池。
實驗環(huán)境:ubuntu10.04,網(wǎng)絡(luò)較好,python2.6
存儲:小文件,每個頁面,一個文件進行存儲
PS:由于學(xué)校上網(wǎng)是按流量收費的,做網(wǎng)絡(luò)爬蟲,灰常費流量啊!!!過幾天,可能會做個大規(guī)模url下載的實驗,用個幾十萬的url試試。
實驗結(jié)果:
使用urllib2 ,usingThreadpoolUrllib2(3000,50)
Start at :??? 2012-03-16 22:18:20.956054
End at :??? 2012-03-16 22:22:15.203018
Total Cost :??? 0:03:54.246964
Total url : 3001
Total fetched : 2442
Lost url : 559
下載頁面的物理存儲大小:84088kb
使用自己的getPageUsingGet ,usingThreadpool(3000,50)
Start at :??? 2012-03-16 22:23:40.206730
End at :??? 2012-03-16 22:26:26.843563
Total Cost :??? 0:02:46.636833
Total url : 3002
Total fetched : 2484
Lost url : 518
Error 404 : 94
Error timeout : 312
Error Try too many times? 0
Error Other faults? 112
下載頁面的物理存儲大小:87168kb
小結(jié): 自己寫的下載頁面程序,效率還是很不錯的,而且丟失的頁面也較少。但其實自己考慮一下,還是有很多地方可以優(yōu)化的,比如文件過于分散,過多的小文件創(chuàng)建和釋放定會產(chǎn)生不小的性能開銷,而且程序里用的是hash命名,也會產(chǎn)生很多的計算,如果有好的策略,其實這些開銷都是可以省略的。另外DNS,也可以不使用python自帶的DNS解析,因為默認的DNS解析都是同步的操作,而DNS解析一般比較耗時,可以采取多線程的異步的方式進行,再加以適當(dāng)?shù)腄NS緩存很大程度上可以提高效率。不僅如此,在實際的頁面抓取過程中,會有大量的url ,不可能一次性把它們存入內(nèi)存,而應(yīng)該按照一定的策略或是算法進行合理的分配。 總之,采集頁面要做的東西以及可以優(yōu)化的東西,還有很多很多。
附件下載:程序代碼(水平有限,僅供參考)
總結(jié)
以上是生活随笔為你收集整理的Python 多线程抓取网页的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: javascript函数上的protot
- 下一篇: ChartDirector Python