001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018/*
019 * The MIT License (MIT)
020 * Copyright (c) 2014 Martin Kleppmann
021 *
022 * Permission is hereby granted, free of charge, to any person obtaining a copy
023 * of this software and associated documentation files (the "Software"), to deal
024 * in the Software without restriction, including without limitation the rights
025 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
026 * copies of the Software, and to permit persons to whom the Software is
027 * furnished to do so, subject to the following conditions:
028 *
029 * The above copyright notice and this permission notice shall be included in
030 * all copies or substantial portions of the Software.
031 *
032 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
033 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
034 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
035 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
036 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
037 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
038 * THE SOFTWARE.
039 */
040package org.apache.hadoop.hbase.test.util.warc;
041
042import java.io.BufferedInputStream;
043import java.io.DataInputStream;
044import java.io.FilterInputStream;
045import java.io.IOException;
046import java.io.InputStream;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.io.compress.CompressionCodec;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * Reads {@link WARCRecord}s from a WARC file, using Hadoop's filesystem APIs. (This means you can
056 * read from HDFS, S3 or any other filesystem supported by Hadoop). This implementation is not tied
057 * to the MapReduce APIs -- that link is provided by the mapred
058 * {@link com.martinkl.warc.mapred.WARCInputFormat} and the mapreduce
059 * {@link com.martinkl.warc.mapreduce.WARCInputFormat}.
060 */
061public class WARCFileReader {
062  private static final Logger logger = LoggerFactory.getLogger(WARCFileReader.class);
063
064  private final long fileSize;
065  private CountingInputStream byteStream = null;
066  private DataInputStream dataStream = null;
067  private long bytesRead = 0, recordsRead = 0;
068
069  /**
070   * Opens a file for reading. If the filename ends in `.gz`, it is automatically decompressed on
071   * the fly.
072   * @param conf     The Hadoop configuration.
073   * @param filePath The Hadoop path to the file that should be read.
074   */
075  public WARCFileReader(Configuration conf, Path filePath) throws IOException {
076    FileSystem fs = filePath.getFileSystem(conf);
077    this.fileSize = fs.getFileStatus(filePath).getLen();
078    logger.info("Reading from " + filePath);
079
080    CompressionCodec codec =
081      filePath.getName().endsWith(".gz") ? WARCFileWriter.getGzipCodec(conf) : null;
082    byteStream = new CountingInputStream(new BufferedInputStream(fs.open(filePath)));
083    dataStream =
084      new DataInputStream(codec == null ? byteStream : codec.createInputStream(byteStream));
085  }
086
087  /**
088   * Reads the next record from the file.
089   * @return The record that was read.
090   */
091  public WARCRecord read() throws IOException {
092    WARCRecord record = new WARCRecord(dataStream);
093    recordsRead++;
094    return record;
095  }
096
097  /**
098   * Closes the file. No more reading is possible after the file has been closed.
099   */
100  public void close() throws IOException {
101    if (dataStream != null) {
102      dataStream.close();
103    }
104    byteStream = null;
105    dataStream = null;
106  }
107
108  /**
109   * Returns the number of records that have been read since the file was opened.
110   */
111  public long getRecordsRead() {
112    return recordsRead;
113  }
114
115  /**
116   * Returns the number of bytes that have been read from file since it was opened. If the file is
117   * compressed, this refers to the compressed file size.
118   */
119  public long getBytesRead() {
120    return bytesRead;
121  }
122
123  /**
124   * Returns the proportion of the file that has been read, as a number between 0.0 and 1.0.
125   */
126  public float getProgress() {
127    if (fileSize == 0) {
128      return 1.0f;
129    }
130    return (float) bytesRead / (float) fileSize;
131  }
132
133  private class CountingInputStream extends FilterInputStream {
134    public CountingInputStream(InputStream in) {
135      super(in);
136    }
137
138    @Override
139    public int read() throws IOException {
140      int result = in.read();
141      if (result != -1) {
142        bytesRead++;
143      }
144      return result;
145    }
146
147    @Override
148    public int read(byte[] b, int off, int len) throws IOException {
149      int result = in.read(b, off, len);
150      if (result != -1) {
151        bytesRead += result;
152      }
153      return result;
154    }
155
156    @Override
157    public long skip(long n) throws IOException {
158      long result = in.skip(n);
159      bytesRead += result;
160      return result;
161    }
162  }
163}