001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018/* 019 * The MIT License (MIT) 020 * Copyright (c) 2014 Martin Kleppmann 021 * 022 * Permission is hereby granted, free of charge, to any person obtaining a copy 023 * of this software and associated documentation files (the "Software"), to deal 024 * in the Software without restriction, including without limitation the rights 025 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 026 * copies of the Software, and to permit persons to whom the Software is 027 * furnished to do so, subject to the following conditions: 028 * 029 * The above copyright notice and this permission notice shall be included in 030 * all copies or substantial portions of the Software. 031 * 032 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 033 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 034 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 035 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 036 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 037 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 038 * THE SOFTWARE. 039 */ 040package org.apache.hadoop.hbase.test.util.warc; 041 042import java.io.EOFException; 043import java.io.IOException; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.io.LongWritable; 046import org.apache.hadoop.mapreduce.InputSplit; 047import org.apache.hadoop.mapreduce.JobContext; 048import org.apache.hadoop.mapreduce.RecordReader; 049import org.apache.hadoop.mapreduce.TaskAttemptContext; 050import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 051import org.apache.hadoop.mapreduce.lib.input.FileSplit; 052 053/** 054 * Hadoop InputFormat for mapreduce jobs ('new' API) that want to process data in WARC files. Usage: 055 * ```java Job job = new Job(getConf()); job.setInputFormatClass(WARCInputFormat.class); ``` Mappers 056 * should use a key of {@link org.apache.hadoop.io.LongWritable} (which is 1 for the first record in 057 * a file, 2 for the second record, etc.) and a value of {@link WARCWritable}. 058 */ 059public class WARCInputFormat extends FileInputFormat<LongWritable, WARCWritable> { 060 061 /** 062 * Opens a WARC file (possibly compressed) for reading, and returns a RecordReader for accessing 063 * it. 064 */ 065 @Override 066 public RecordReader<LongWritable, WARCWritable> createRecordReader(InputSplit split, 067 TaskAttemptContext context) throws IOException, InterruptedException { 068 return new WARCReader(); 069 } 070 071 /** 072 * Always returns false, as WARC files cannot be split. 073 */ 074 @Override 075 protected boolean isSplitable(JobContext context, Path filename) { 076 return false; 077 } 078 079 private static class WARCReader extends RecordReader<LongWritable, WARCWritable> { 080 private final LongWritable key = new LongWritable(); 081 private final WARCWritable value = new WARCWritable(); 082 private WARCFileReader reader; 083 084 @Override 085 public void initialize(InputSplit split, TaskAttemptContext context) 086 throws IOException, InterruptedException { 087 reader = new WARCFileReader(context.getConfiguration(), ((FileSplit) split).getPath()); 088 } 089 090 @Override 091 public boolean nextKeyValue() throws IOException { 092 try { 093 WARCRecord record = reader.read(); 094 key.set(reader.getRecordsRead()); 095 value.setRecord(record); 096 return true; 097 } catch (EOFException eof) { 098 return false; 099 } 100 } 101 102 @Override 103 public void close() throws IOException { 104 reader.close(); 105 } 106 107 @Override 108 public float getProgress() throws IOException { 109 return reader.getProgress(); 110 } 111 112 @Override 113 public LongWritable getCurrentKey() throws IOException, InterruptedException { 114 return key; 115 } 116 117 @Override 118 public WARCWritable getCurrentValue() throws IOException, InterruptedException { 119 return value; 120 } 121 } 122 123}