用Java处理大文件
最近,我不得不處理一組包含逐筆歷史匯率市場(chǎng)數(shù)據(jù)的文件,并很快意識(shí)到使用傳統(tǒng)的InputStream都無法將它們讀取到內(nèi)存中,因?yàn)槊總€(gè)文件的大小都超過4 GB。 Emacs甚至無法打開它們。
在這種特殊情況下,我可以編寫一個(gè)簡(jiǎn)單的bash腳本,將文件分成小塊,然后像往常一樣讀取它們。 但是我不希望這樣,因?yàn)槎M(jìn)制格式會(huì)使這種方法無效。
因此,正確處理此問題的方法是使用內(nèi)存映射文件逐步處理數(shù)據(jù)區(qū)域。 內(nèi)存映射文件的優(yōu)點(diǎn)在于它們不消耗虛擬內(nèi)存或分頁(yè)空間,因?yàn)樗鼈冇纱疟P上的文件數(shù)據(jù)支持。
Okey,讓我們看一下這些文件并提取一些數(shù)據(jù)。 似乎它們包含帶有逗號(hào)分隔字段的ASCII文本行。
格式: [currency-pair],[timestamp],[bid-price],[ask-price]
例如: EUR/USD,20120102 00:01:30.420,1.29451,1.2949
公平地說,我可以為該格式編寫程序。 但是讀取和解析文件是正交的概念。 因此,讓我們退后一步,考慮一下可以在將來遇到類似問題時(shí)可以重用的通用設(shè)計(jì)。
問題歸結(jié)為對(duì)一組以無限長(zhǎng)字節(jié)數(shù)組編碼的條目進(jìn)行增量解碼,而不會(huì)耗盡內(nèi)存。 示例格式以逗號(hào)/行分隔的文本編碼的事實(shí)與一般解決方案無關(guān),因此很明顯,需要解碼器接口才能處理不同的格式。
同樣,在處理完整個(gè)文件之前,無法解析每個(gè)條目并將其保留在內(nèi)存中,因此我們需要一種方法來逐步移交可以在其他位置(磁盤或網(wǎng)絡(luò))寫入的條目塊,然后再進(jìn)行垃圾回收。 迭代器是處理此要求的很好的抽象方法,因?yàn)樗鼈兊男袨榫拖裼螛?biāo)一樣,這正是重點(diǎn)。 每次迭代都會(huì)轉(zhuǎn)發(fā)文件指針,然后讓我們對(duì)數(shù)據(jù)進(jìn)行處理。
所以首先是Decoder接口。 這個(gè)想法是從MappedByteBuffer增量解碼對(duì)象,或者如果緩沖區(qū)中沒有對(duì)象,則返回null。
public interface Decoder<T> {public T decode(ByteBuffer buffer); }然后是實(shí)現(xiàn)Iterable的FileReader 。 每次迭代將處理下一個(gè)4096字節(jié)的數(shù)據(jù),并使用Decoder將其Decoder為對(duì)象列表。 請(qǐng)注意, FileReader接受文件列表,這很不錯(cuò),因?yàn)樗试S遍歷數(shù)據(jù)而無需擔(dān)心跨文件聚合。 順便說一下,對(duì)于較大的文件,4096個(gè)字節(jié)的塊可能會(huì)有點(diǎn)小。
public class FileReader implements Iterable<List<T>> {private static final long CHUNK_SIZE = 4096;private final Decoder<T> decoder;private Iterator<File> files;private FileReader(Decoder<T> decoder, File... files) {this(decoder, Arrays.asList(files));}private FileReader(Decoder<T> decoder, List<File> files) {this.files = files.iterator();this.decoder = decoder;}public static <T> FileReader<T> create(Decoder<T> decoder, List<File> files) {return new FileReader<T>(decoder, files);}public static <T> FileReader<T> create(Decoder<T> decoder, File... files) {return new FileReader<T>(decoder, files);}@Overridepublic Iterator<List<T>> iterator() {return new Iterator<List<T>>() {private List<T> entries;private long chunkPos = 0;private MappedByteBuffer buffer;private FileChannel channel;@Overridepublic boolean hasNext() {if (buffer == null || !buffer.hasRemaining()) {buffer = nextBuffer(chunkPos);if (buffer == null) {return false;}}T result = null;while ((result = decoder.decode(buffer)) != null) {if (entries == null) {entries = new ArrayList<T>();}entries.add(result);}// set next MappedByteBuffer chunkchunkPos += buffer.position();buffer = null;if (entries != null) {return true;} else {Closeables.closeQuietly(channel);return false;}}private MappedByteBuffer nextBuffer(long position) {try {if (channel == null || channel.size() == position) {if (channel != null) {Closeables.closeQuietly(channel);channel = null;}if (files.hasNext()) {File file = files.next();channel = new RandomAccessFile(file, "r").getChannel();chunkPos = 0;position = 0;} else {return null;}}long chunkSize = CHUNK_SIZE;if (channel.size() - position < chunkSize) {chunkSize = channel.size() - position;}return channel.map(FileChannel.MapMode.READ_ONLY, chunkPos, chunkSize);} catch (IOException e) {Closeables.closeQuietly(channel);throw new RuntimeException(e);}}@Overridepublic List<T> next() {List<T> res = entries;entries = null;return res;}@Overridepublic void remove() {throw new UnsupportedOperationException();}};} }下一個(gè)任務(wù)是編寫一個(gè)Decoder ,我決定為任何逗號(hào)分隔的文本文件格式實(shí)現(xiàn)一個(gè)通用的TextRowDecoder ,接受每行的字段數(shù)和一個(gè)字段定界符,并返回一個(gè)字節(jié)數(shù)組數(shù)組。 然后, TextRowDecoder可以由可能處理不同字符集的格式特定的解碼器重用。
public class TextRowDecoder implements Decoder<byte[][]> {private static final byte LF = 10;private final int numFields;private final byte delimiter;public TextRowDecoder(int numFields, byte delimiter) {this.numFields = numFields;this.delimiter = delimiter;}@Overridepublic byte[][] decode(ByteBuffer buffer) {int lineStartPos = buffer.position();int limit = buffer.limit();while (buffer.hasRemaining()) {byte b = buffer.get();if (b == LF) { // reached line feed so parse lineint lineEndPos = buffer.position();// set positions for one row duplicationif (buffer.limit() < lineEndPos + 1) {buffer.position(lineStartPos).limit(lineEndPos);} else {buffer.position(lineStartPos).limit(lineEndPos + 1);}byte[][] entry = parseRow(buffer.duplicate());if (entry != null) {// reset main bufferbuffer.position(lineEndPos);buffer.limit(limit);// set start after LFlineStartPos = lineEndPos;}return entry;}}buffer.position(lineStartPos);return null;}public byte[][] parseRow(ByteBuffer buffer) {int fieldStartPos = buffer.position();int fieldEndPos = 0;int fieldNumber = 0;byte[][] fields = new byte[numFields][];while (buffer.hasRemaining()) {byte b = buffer.get();if (b == delimiter || b == LF) {fieldEndPos = buffer.position();// save limitint limit = buffer.limit();// set positions for one row duplicationbuffer.position(fieldStartPos).limit(fieldEndPos);fields[fieldNumber] = parseField(buffer.duplicate(), fieldNumber, fieldEndPos - fieldStartPos - 1);fieldNumber++;// reset main bufferbuffer.position(fieldEndPos);buffer.limit(limit);// set start after LFfieldStartPos = fieldEndPos;}if (fieldNumber == numFields) {return fields;}}return null;}private byte[] parseField(ByteBuffer buffer, int pos, int length) {byte[] field = new byte[length];for (int i = 0; i < field.length; i++) {field[i] = buffer.get();}return field;} }這就是文件的處理方式。 每個(gè)列表包含從單個(gè)緩沖區(qū)解碼的元素,每個(gè)元素都是由TextRowDecoder指定的字節(jié)數(shù)組的數(shù)組。
TextRowDecoder decoder = new TextRowDecoder(4, comma); FileReader<byte[][]> reader = FileReader.create(decoder, file.listFiles()); for (List<byte[][]> chunk : reader) {// do something with each chunk }我們可以在這里停下來,但還有其他要求。 每行都包含一個(gè)時(shí)間戳記,并且必須按時(shí)間段而不是按天或按小時(shí)對(duì)緩沖區(qū)進(jìn)行分組。 我仍然想遍歷每個(gè)批次,因此立即的反應(yīng)是為FileReader創(chuàng)建一個(gè)Iterable包裝器,以實(shí)現(xiàn)此行為。 另外一個(gè)細(xì)節(jié)是,每個(gè)元素必須通過實(shí)現(xiàn)PeriodEntries Timestamped接口(此處未顯示)為PeriodEntries提供其時(shí)間戳。
public class PeriodEntries<T extends Timestamped> implements Iterable<List<T>> {private final Iterator<List<T extends Timestamped>> entriesIt;private final long interval;private PeriodEntries(Iterable<List<T>> entriesIt, long interval) {this.entriesIt = entriesIt.iterator();this.interval = interval;}public static <T extends Timestamped> PeriodEntries<T> create(Iterable<List<T>> entriesIt, long interval) {return new PeriodEntries<T>(entriesIt, interval);}@Overridepublic Iterator<List<T extends Timestamped>> iterator() {return new Iterator<List<T>>() {private Queue<List<T>> queue = new LinkedList<List<T>>();private long previous;private Iterator<T> entryIt;@Overridepublic boolean hasNext() {if (!advanceEntries()) {return false;}T entry = entryIt.next();long time = normalizeInterval(entry);if (previous == 0) {previous = time;}if (queue.peek() == null) {List<T> group = new ArrayList<T>();queue.add(group);}while (previous == time) {queue.peek().add(entry);if (!advanceEntries()) {break;}entry = entryIt.next();time = normalizeInterval(entry);}previous = time;List<T> result = queue.peek();if (result == null || result.isEmpty()) {return false;}return true;}private boolean advanceEntries() {// if there are no rows leftif (entryIt == null || !entryIt.hasNext()) {// try get more rows if possibleif (entriesIt.hasNext()) {entryIt = entriesIt.next().iterator();return true;} else {// no more rowsreturn false;}}return true;}private long normalizeInterval(Timestamped entry) {long time = entry.getTime();int utcOffset = TimeZone.getDefault().getOffset(time);long utcTime = time + utcOffset;long elapsed = utcTime % interval;return time - elapsed;}@Overridepublic List<T> next() {return queue.poll();}@Overridepublic void remove() {throw new UnsupportedOperationException();}};} }引入此功能后,最終處理代碼并沒有太大變化,只有一個(gè)干凈緊湊的for循環(huán),不必關(guān)心跨文件,緩沖區(qū)和句點(diǎn)對(duì)元素進(jìn)行分組。 PeriodEntries也足夠靈活,可以管理間隔上的任何長(zhǎng)度。
TrueFxDecoder decoder = new TrueFxDecoder(); FileReader<TrueFxData> reader = FileReader.create(decoder, file.listFiles()); long periodLength = TimeUnit.DAYS.toMillis(1); PeriodEntries<TrueFxData> periods = PeriodEntries.create(reader, periodLength);for (List<TrueFxData> entries : periods) {// data for each dayfor (TrueFxData entry : entries) {// process each entry} } 正如您可能意識(shí)到的那樣,不可能用集合來解決這個(gè)問題。 選擇迭代器是一項(xiàng)關(guān)鍵的設(shè)計(jì)決策,它能夠解析TB級(jí)的數(shù)據(jù)而不會(huì)占用太多的堆空間。
參考: Deephacks博客上的JCG合作伙伴 Kristoffer Sjogren 使用Java處理大型文件 。
翻譯自: https://www.javacodegeeks.com/2013/01/processing-huge-files-with-java.html
總結(jié)
以上是生活随笔為你收集整理的用Java处理大文件的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 戴尔台式电脑怎么重装系统(神舟台式机如何
- 下一篇: Java中的推断异常