如何快速安全的插入千万条数据?
最近有個(gè)需求解析一個(gè)訂單文件,并且說(shuō)明文件可達(dá)到千萬(wàn)條數(shù)據(jù),每條數(shù)據(jù)大概在20個(gè)字段左右,每個(gè)字段使用逗號(hào)分隔,需要盡量在半小時(shí)內(nèi)入庫(kù)。
思路
1.估算文件大小
因?yàn)楦嬖V文件有千萬(wàn)條,同時(shí)每條記錄大概在20個(gè)字段左右,所以可以大致估算一下整個(gè)訂單文件的大小,方法也很簡(jiǎn)單使用FileWriter往文件中插入一千萬(wàn)條數(shù)據(jù),查看文件大小,經(jīng)測(cè)試大概在1.5G左右;
2.如何批量插入
由上可知文件比較大,一次性讀取內(nèi)存肯定不行,方法是每次從當(dāng)前訂單文件中截取一部分?jǐn)?shù)據(jù),然后進(jìn)行批量插入,如何批次插入可以使用insert(...)values(...),(...)的方式,經(jīng)測(cè)試這種方式效率還是挺高的;怎么快速插入 100 條數(shù)據(jù),用時(shí)最短,這篇看下。
3.數(shù)據(jù)的完整性
截取數(shù)據(jù)的時(shí)候需要注意,需要保證數(shù)據(jù)的完整性,每條記錄最后都是一個(gè)換行符,需要根據(jù)這個(gè)標(biāo)識(shí)保證每次截取都是整條數(shù),不要出現(xiàn)半條數(shù)據(jù)這種情況;
4.數(shù)據(jù)庫(kù)是否支持批次數(shù)據(jù)
因?yàn)樾枰M(jìn)行批次數(shù)據(jù)的插入,數(shù)據(jù)庫(kù)是否支持大量數(shù)據(jù)寫(xiě)入,比如這邊使用的mysql,可以通過(guò)設(shè)置max_allowed_packet來(lái)保證批次提交的數(shù)據(jù)量;
5.中途出錯(cuò)的情況
因?yàn)槭谴笪募馕?#xff0c;如果中途出現(xiàn)錯(cuò)誤,比如數(shù)據(jù)剛好插入到900w的時(shí)候,數(shù)據(jù)庫(kù)連接失敗,這種情況不可能重新來(lái)插一遍,所有需要記錄每次插入數(shù)據(jù)的位置,并且需要保證和批次插入的數(shù)據(jù)在同一個(gè)事務(wù)中,這樣恢復(fù)之后可以從記錄的位置開(kāi)始繼續(xù)插入。
實(shí)現(xiàn)
1.準(zhǔn)備數(shù)據(jù)表
這里需要準(zhǔn)備兩張表分別是:訂單狀態(tài)位置信息表,訂單表;
CREATE?TABLE?`file_analysis`?(`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT,`file_type`?varchar(255)?NOT?NULL?COMMENT?'文件類(lèi)型 01:類(lèi)型1,02:類(lèi)型2',`file_name`?varchar(255)?NOT?NULL?COMMENT?'文件名稱(chēng)',`file_path`?varchar(255)?NOT?NULL?COMMENT?'文件路徑',`status`?varchar(255)?NOT?NULL?COMMENT?'文件狀態(tài) 0初始化;1成功;2失敗:3處理中',`position`?bigint(20)?NOT?NULL?COMMENT?'上一次處理完成的位置',`crt_time`?datetime?NOT?NULL?COMMENT?'創(chuàng)建時(shí)間',`upd_time`?datetime?NOT?NULL?COMMENT?'更新時(shí)間',PRIMARY?KEY?(`id`) )?ENGINE=InnoDB?AUTO_INCREMENT=2?DEFAULT?CHARSET=utf8CREATE?TABLE?`file_order`?(`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT,`file_id`?bigint(20)?DEFAULT?NULL,`field1`?varchar(255)?DEFAULT?NULL,`field2`?varchar(255)?DEFAULT?NULL,`field3`?varchar(255)?DEFAULT?NULL,`field4`?varchar(255)?DEFAULT?NULL,`field5`?varchar(255)?DEFAULT?NULL,`field6`?varchar(255)?DEFAULT?NULL,`field7`?varchar(255)?DEFAULT?NULL,`field8`?varchar(255)?DEFAULT?NULL,`field9`?varchar(255)?DEFAULT?NULL,`field10`?varchar(255)?DEFAULT?NULL,`field11`?varchar(255)?DEFAULT?NULL,`field12`?varchar(255)?DEFAULT?NULL,`field13`?varchar(255)?DEFAULT?NULL,`field14`?varchar(255)?DEFAULT?NULL,`field15`?varchar(255)?DEFAULT?NULL,`field16`?varchar(255)?DEFAULT?NULL,`field17`?varchar(255)?DEFAULT?NULL,`field18`?varchar(255)?DEFAULT?NULL,`crt_time`?datetime?NOT?NULL?COMMENT?'創(chuàng)建時(shí)間',`upd_time`?datetime?NOT?NULL?COMMENT?'更新時(shí)間',PRIMARY?KEY?(`id`) )?ENGINE=InnoDB?AUTO_INCREMENT=10000024?DEFAULT?CHARSET=utf82.配置數(shù)據(jù)庫(kù)包大小
mysql>?show VARIABLES like?'%max_allowed_packet%'; +--------------------------+------------+ | Variable_name | Value | +--------------------------+------------+ | max_allowed_packet | 1048576 | | slave_max_allowed_packet | 1073741824 | +--------------------------+------------+ 2 rows in setmysql>?set?global max_allowed_packet = 1024*1024*10; Query OK, 0 rows affected通過(guò)設(shè)置max_allowed_packet,保證數(shù)據(jù)庫(kù)能夠接收批次插入的數(shù)據(jù)包大小;不然會(huì)出現(xiàn)如下錯(cuò)誤:
Caused?by: com.mysql.jdbc.PacketTooBigException:?Packet?for?query?is?too?large?(4980577?>?1048576). You can change?this?value?on?the server?by?setting the max_allowed_packet' variable.at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3915)at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2598)at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2778)at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2834)3.準(zhǔn)備測(cè)試數(shù)據(jù)
public?static?void?main(String[] args) throws IOException?{FileWriter?out?=?new?FileWriter(new?File("D://xxxxxxx//orders.txt"));for?(int?i =?0; i <?10000000; i++) {out.write("vaule1,vaule2,vaule3,vaule4,vaule5,vaule6,vaule7,vaule8,vaule9,vaule10,vaule11,vaule12,vaule13,vaule14,vaule15,vaule16,vaule17,vaule18");out.write(System.getProperty("line.separator"));}out.close(); }使用FileWriter遍歷往一個(gè)文件里插入1000w條數(shù)據(jù)即可,這個(gè)速度還是很快的,不要忘了在每條數(shù)據(jù)的后面添加換行符(\n\r);
4.截取數(shù)據(jù)的完整性
除了需要設(shè)置每次讀取文件的大小,同時(shí)還需要設(shè)置一個(gè)參數(shù),用來(lái)每次獲取一小部分?jǐn)?shù)據(jù),從這小部分?jǐn)?shù)據(jù)中獲取換行符(\n\r),如果獲取不到一直累加直接獲取為止,這個(gè)值設(shè)置大小大致同每條數(shù)據(jù)的大小差不多合適,部分實(shí)現(xiàn)如下:
ByteBuffer byteBuffer = ByteBuffer.allocate(buffSize);?// 申請(qǐng)一個(gè)緩存區(qū) long?endPosition = batchFileSize + startPosition - buffSize;// 子文件結(jié)束位置long?startTime, endTime; for?(int?i =?0; i < count; i++) {startTime = System.currentTimeMillis();if?(i +?1?!= count) {int?read = inputChannel.read(byteBuffer, endPosition);// 讀取數(shù)據(jù)readW:?while?(read !=?-1) {byteBuffer.flip();// 切換讀模式byte[]?array?= byteBuffer.array();for?(int?j =?0; j <?array.length; j++) {byte b =?array[j];if?(b ==?10?|| b ==?13) {?// 判斷\n\rendPosition += j;break?readW;}}endPosition += buffSize;byteBuffer.clear();?// 重置緩存塊指針read = inputChannel.read(byteBuffer, endPosition);}}?else?{endPosition = fileSize;?// 最后一個(gè)文件直接指向文件末尾}...省略,更多可以查看Github完整代碼... }如上代碼所示開(kāi)辟了一個(gè)緩沖區(qū),根據(jù)每行數(shù)據(jù)大小來(lái)定大概在200字節(jié)左右,然后通過(guò)遍歷查找換行符(\n\r),找到以后將當(dāng)前的位置加到之前的結(jié)束位置上,保證了數(shù)據(jù)的完整性;
5.批次插入數(shù)據(jù)
通過(guò)insert(...)values(...),(...)的方式批次插入數(shù)據(jù),部分代碼如下:
// 保存訂單和解析位置保證在一個(gè)事務(wù)中 SqlSession session = sqlSessionFactory.openSession(); try?{long?startTime = System.currentTimeMillis();FielAnalysisMapper fielAnalysisMapper = session.getMapper(FielAnalysisMapper.class);FileOrderMapper fileOrderMapper = session.getMapper(FileOrderMapper.class);fileOrderMapper.batchInsert(orderList);// 更新上次解析到的位置,同時(shí)指定更新時(shí)間fileAnalysis.setPosition(endPosition +?1);fileAnalysis.setStatus("3");fileAnalysis.setUpdTime(new?Date());fielAnalysisMapper.updateFileAnalysis(fileAnalysis);session.commit();long?endTime = System.currentTimeMillis();System.out.println("===插入數(shù)據(jù)花費(fèi):"?+ (endTime - startTime) +?"ms==="); }?catch?(Exception e) {session.rollback(); }?finally?{session.close(); } ...省略,更多可以查看Github完整代碼...如上代碼在一個(gè)事務(wù)中同時(shí)保存批次訂單數(shù)據(jù)和文件解析位置信息,batchInsert通過(guò)使用mybatis的<foreach>標(biāo)簽來(lái)遍歷訂單列表,生成values數(shù)據(jù);
總結(jié)
以上展示了部分代碼,完整的代碼可以查看Github地址中的batchInsert模塊,本地設(shè)置每次截取的文件大小為2M。
經(jīng)測(cè)試1000w條數(shù)據(jù)(大小1.5G左右)插入mysql數(shù)據(jù)庫(kù)中,大概花費(fèi)時(shí)間在20分鐘左右,當(dāng)然可以通過(guò)設(shè)置截取的文件大小,花費(fèi)的時(shí)間也會(huì)相應(yīng)的改變。
作者:ksfzhaohui
https://my.oschina.net/OutOfMemory/blog/3117737
總結(jié)
以上是生活随笔為你收集整理的如何快速安全的插入千万条数据?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 一文让你轻松了解 JAVA 开发中的四种
- 下一篇: Netty 在 Dubbo 中是如何应用