001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018/*
019 * The MIT License (MIT)
020 * Copyright (c) 2014 Martin Kleppmann
021 *
022 * Permission is hereby granted, free of charge, to any person obtaining a copy
023 * of this software and associated documentation files (the "Software"), to deal
024 * in the Software without restriction, including without limitation the rights
025 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
026 * copies of the Software, and to permit persons to whom the Software is
027 * furnished to do so, subject to the following conditions:
028 *
029 * The above copyright notice and this permission notice shall be included in
030 * all copies or substantial portions of the Software.
031 *
032 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
033 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
034 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
035 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
036 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
037 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
038 * THE SOFTWARE.
039 */
040package org.apache.hadoop.hbase.test.util.warc;
041
042import java.io.EOFException;
043import java.io.IOException;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.io.LongWritable;
046import org.apache.hadoop.mapreduce.InputSplit;
047import org.apache.hadoop.mapreduce.JobContext;
048import org.apache.hadoop.mapreduce.RecordReader;
049import org.apache.hadoop.mapreduce.TaskAttemptContext;
050import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
051import org.apache.hadoop.mapreduce.lib.input.FileSplit;
052
053/**
054 * Hadoop InputFormat for mapreduce jobs ('new' API) that want to process data in WARC files. Usage:
055 * ```java Job job = new Job(getConf()); job.setInputFormatClass(WARCInputFormat.class); ``` Mappers
056 * should use a key of {@link org.apache.hadoop.io.LongWritable} (which is 1 for the first record in
057 * a file, 2 for the second record, etc.) and a value of {@link WARCWritable}.
058 */
059public class WARCInputFormat extends FileInputFormat<LongWritable, WARCWritable> {
060
061  /**
062   * Opens a WARC file (possibly compressed) for reading, and returns a RecordReader for accessing
063   * it.
064   */
065  @Override
066  public RecordReader<LongWritable, WARCWritable> createRecordReader(InputSplit split,
067    TaskAttemptContext context) throws IOException, InterruptedException {
068    return new WARCReader();
069  }
070
071  /**
072   * Always returns false, as WARC files cannot be split.
073   */
074  @Override
075  protected boolean isSplitable(JobContext context, Path filename) {
076    return false;
077  }
078
079  private static class WARCReader extends RecordReader<LongWritable, WARCWritable> {
080    private final LongWritable key = new LongWritable();
081    private final WARCWritable value = new WARCWritable();
082    private WARCFileReader reader;
083
084    @Override
085    public void initialize(InputSplit split, TaskAttemptContext context)
086      throws IOException, InterruptedException {
087      reader = new WARCFileReader(context.getConfiguration(), ((FileSplit) split).getPath());
088    }
089
090    @Override
091    public boolean nextKeyValue() throws IOException {
092      try {
093        WARCRecord record = reader.read();
094        key.set(reader.getRecordsRead());
095        value.setRecord(record);
096        return true;
097      } catch (EOFException eof) {
098        return false;
099      }
100    }
101
102    @Override
103    public void close() throws IOException {
104      reader.close();
105    }
106
107    @Override
108    public float getProgress() throws IOException {
109      return reader.getProgress();
110    }
111
112    @Override
113    public LongWritable getCurrentKey() throws IOException, InterruptedException {
114      return key;
115    }
116
117    @Override
118    public WARCWritable getCurrentValue() throws IOException, InterruptedException {
119      return value;
120    }
121  }
122
123}