001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018/*
019 * The MIT License (MIT)
020 * Copyright (c) 2014 Martin Kleppmann
021 *
022 * Permission is hereby granted, free of charge, to any person obtaining a copy
023 * of this software and associated documentation files (the "Software"), to deal
024 * in the Software without restriction, including without limitation the rights
025 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
026 * copies of the Software, and to permit persons to whom the Software is
027 * furnished to do so, subject to the following conditions:
028 *
029 * The above copyright notice and this permission notice shall be included in
030 * all copies or substantial portions of the Software.
031 *
032 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
033 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
034 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
035 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
036 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
037 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
038 * THE SOFTWARE.
039 */
040package org.apache.hadoop.hbase.test.util.warc;
041
042import java.io.IOException;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.io.NullWritable;
046import org.apache.hadoop.io.compress.CompressionCodec;
047import org.apache.hadoop.mapreduce.RecordWriter;
048import org.apache.hadoop.mapreduce.TaskAttemptContext;
049import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
050
051/**
052 * Hadoop OutputFormat for mapreduce jobs ('new' API) that want to write data to WARC files. Usage:
053 * ```java Job job = new Job(getConf()); job.setOutputFormatClass(WARCOutputFormat.class);
054 * job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(WARCWritable.class);
055 * FileOutputFormat.setCompressOutput(job, true); ``` The tasks generating the output (usually the
056 * reducers, but may be the mappers if there are no reducers) should use `NullWritable.get()` as the
057 * output key, and the {@link WARCWritable} as the output value.
058 */
059public class WARCOutputFormat extends FileOutputFormat<NullWritable, WARCWritable> {
060
061  /**
062   * Creates a new output file in WARC format, and returns a RecordWriter for writing to it.
063   */
064  @Override
065  public RecordWriter<NullWritable, WARCWritable> getRecordWriter(TaskAttemptContext context)
066    throws IOException, InterruptedException {
067    return new WARCWriter(context);
068  }
069
070  private class WARCWriter extends RecordWriter<NullWritable, WARCWritable> {
071    private final WARCFileWriter writer;
072
073    public WARCWriter(TaskAttemptContext context) throws IOException {
074      Configuration conf = context.getConfiguration();
075      CompressionCodec codec =
076        getCompressOutput(context) ? WARCFileWriter.getGzipCodec(conf) : null;
077      Path workFile = getDefaultWorkFile(context, "");
078      this.writer = new WARCFileWriter(conf, codec, workFile);
079    }
080
081    @Override
082    public void write(NullWritable key, WARCWritable value)
083      throws IOException, InterruptedException {
084      writer.write(value);
085    }
086
087    @Override
088    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
089      writer.close();
090    }
091  }
092
093}