001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018/*
019 * The MIT License (MIT)
020 * Copyright (c) 2014 Martin Kleppmann
021 *
022 * Permission is hereby granted, free of charge, to any person obtaining a copy
023 * of this software and associated documentation files (the "Software"), to deal
024 * in the Software without restriction, including without limitation the rights
025 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
026 * copies of the Software, and to permit persons to whom the Software is
027 * furnished to do so, subject to the following conditions:
028 *
029 * The above copyright notice and this permission notice shall be included in
030 * all copies or substantial portions of the Software.
031 *
032 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
033 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
034 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
035 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
036 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
037 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
038 * THE SOFTWARE.
039 */
040
041package org.apache.hadoop.hbase.test.util.warc;
042
043import java.io.EOFException;
044import java.io.IOException;
045
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.io.LongWritable;
048import org.apache.hadoop.mapreduce.InputSplit;
049import org.apache.hadoop.mapreduce.JobContext;
050import org.apache.hadoop.mapreduce.RecordReader;
051import org.apache.hadoop.mapreduce.TaskAttemptContext;
052import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
053import org.apache.hadoop.mapreduce.lib.input.FileSplit;
054
055/**
056 * Hadoop InputFormat for mapreduce jobs ('new' API) that want to process data in WARC files.
057 *
058 * Usage:
059 *
060 * ```java
061 * Job job = new Job(getConf());
062 * job.setInputFormatClass(WARCInputFormat.class);
063 * ```
064 *
065 * Mappers should use a key of {@link org.apache.hadoop.io.LongWritable} (which is
066 * 1 for the first record in a file, 2 for the second record, etc.) and a value of
067 * {@link WARCWritable}.
068 */
069public class WARCInputFormat extends FileInputFormat<LongWritable, WARCWritable> {
070
071  /**
072   * Opens a WARC file (possibly compressed) for reading, and returns a RecordReader for
073   * accessing it.
074   */
075  @Override
076  public RecordReader<LongWritable, WARCWritable> createRecordReader(InputSplit split,
077      TaskAttemptContext context)
078          throws IOException, InterruptedException {
079    return new WARCReader();
080  }
081
082  /**
083   * Always returns false, as WARC files cannot be split.
084   */
085  @Override
086  protected boolean isSplitable(JobContext context, Path filename) {
087    return false;
088  }
089
090  private static class WARCReader extends RecordReader<LongWritable, WARCWritable> {
091    private final LongWritable key = new LongWritable();
092    private final WARCWritable value = new WARCWritable();
093    private WARCFileReader reader;
094
095    @Override
096    public void initialize(InputSplit split, TaskAttemptContext context)
097        throws IOException, InterruptedException {
098      reader = new WARCFileReader(context.getConfiguration(), ((FileSplit) split).getPath());
099    }
100
101    @Override
102    public boolean nextKeyValue() throws IOException {
103      try {
104        WARCRecord record = reader.read();
105        key.set(reader.getRecordsRead());
106        value.setRecord(record);
107        return true;
108      } catch (EOFException eof) {
109        return false;
110      }
111    }
112
113    @Override
114    public void close() throws IOException {
115      reader.close();
116    }
117
118    @Override
119    public float getProgress() throws IOException {
120      return reader.getProgress();
121    }
122
123    @Override
124    public LongWritable getCurrentKey() throws IOException, InterruptedException {
125      return key;
126    }
127
128    @Override
129    public WARCWritable getCurrentValue() throws IOException, InterruptedException {
130      return value;
131    }
132  }
133
134}