001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018/* 019 * The MIT License (MIT) 020 * Copyright (c) 2014 Martin Kleppmann 021 * 022 * Permission is hereby granted, free of charge, to any person obtaining a copy 023 * of this software and associated documentation files (the "Software"), to deal 024 * in the Software without restriction, including without limitation the rights 025 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 026 * copies of the Software, and to permit persons to whom the Software is 027 * furnished to do so, subject to the following conditions: 028 * 029 * The above copyright notice and this permission notice shall be included in 030 * all copies or substantial portions of the Software. 031 * 032 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 033 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 034 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 035 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 036 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 037 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 038 * THE SOFTWARE. 039 */ 040package org.apache.hadoop.hbase.test.util.warc; 041 042import java.io.BufferedInputStream; 043import java.io.DataInputStream; 044import java.io.FilterInputStream; 045import java.io.IOException; 046import java.io.InputStream; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.io.compress.CompressionCodec; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * Reads {@link WARCRecord}s from a WARC file, using Hadoop's filesystem APIs. (This means you can 056 * read from HDFS, S3 or any other filesystem supported by Hadoop). This implementation is not tied 057 * to the MapReduce APIs -- that link is provided by the mapred 058 * {@link com.martinkl.warc.mapred.WARCInputFormat} and the mapreduce 059 * {@link com.martinkl.warc.mapreduce.WARCInputFormat}. 060 */ 061public class WARCFileReader { 062 private static final Logger logger = LoggerFactory.getLogger(WARCFileReader.class); 063 064 private final long fileSize; 065 private CountingInputStream byteStream = null; 066 private DataInputStream dataStream = null; 067 private long bytesRead = 0, recordsRead = 0; 068 069 /** 070 * Opens a file for reading. If the filename ends in `.gz`, it is automatically decompressed on 071 * the fly. 072 * @param conf The Hadoop configuration. 073 * @param filePath The Hadoop path to the file that should be read. 074 */ 075 public WARCFileReader(Configuration conf, Path filePath) throws IOException { 076 FileSystem fs = filePath.getFileSystem(conf); 077 this.fileSize = fs.getFileStatus(filePath).getLen(); 078 logger.info("Reading from " + filePath); 079 080 CompressionCodec codec = 081 filePath.getName().endsWith(".gz") ? WARCFileWriter.getGzipCodec(conf) : null; 082 byteStream = new CountingInputStream(new BufferedInputStream(fs.open(filePath))); 083 dataStream = 084 new DataInputStream(codec == null ? byteStream : codec.createInputStream(byteStream)); 085 } 086 087 /** 088 * Reads the next record from the file. 089 * @return The record that was read. 090 */ 091 public WARCRecord read() throws IOException { 092 WARCRecord record = new WARCRecord(dataStream); 093 recordsRead++; 094 return record; 095 } 096 097 /** 098 * Closes the file. No more reading is possible after the file has been closed. 099 */ 100 public void close() throws IOException { 101 if (dataStream != null) { 102 dataStream.close(); 103 } 104 byteStream = null; 105 dataStream = null; 106 } 107 108 /** 109 * Returns the number of records that have been read since the file was opened. 110 */ 111 public long getRecordsRead() { 112 return recordsRead; 113 } 114 115 /** 116 * Returns the number of bytes that have been read from file since it was opened. If the file is 117 * compressed, this refers to the compressed file size. 118 */ 119 public long getBytesRead() { 120 return bytesRead; 121 } 122 123 /** 124 * Returns the proportion of the file that has been read, as a number between 0.0 and 1.0. 125 */ 126 public float getProgress() { 127 if (fileSize == 0) { 128 return 1.0f; 129 } 130 return (float) bytesRead / (float) fileSize; 131 } 132 133 private class CountingInputStream extends FilterInputStream { 134 public CountingInputStream(InputStream in) { 135 super(in); 136 } 137 138 @Override 139 public int read() throws IOException { 140 int result = in.read(); 141 if (result != -1) { 142 bytesRead++; 143 } 144 return result; 145 } 146 147 @Override 148 public int read(byte[] b, int off, int len) throws IOException { 149 int result = in.read(b, off, len); 150 if (result != -1) { 151 bytesRead += result; 152 } 153 return result; 154 } 155 156 @Override 157 public long skip(long n) throws IOException { 158 long result = in.skip(n); 159 bytesRead += result; 160 return result; 161 } 162 } 163}