001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018/*
019 * The MIT License (MIT)
020 * Copyright (c) 2014 Martin Kleppmann
021 *
022 * Permission is hereby granted, free of charge, to any person obtaining a copy
023 * of this software and associated documentation files (the "Software"), to deal
024 * in the Software without restriction, including without limitation the rights
025 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
026 * copies of the Software, and to permit persons to whom the Software is
027 * furnished to do so, subject to the following conditions:
028 *
029 * The above copyright notice and this permission notice shall be included in
030 * all copies or substantial portions of the Software.
031 *
032 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
033 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
034 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
035 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
036 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
037 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
038 * THE SOFTWARE.
039 */
040
041package org.apache.hadoop.hbase.test.util.warc;
042
043import java.io.IOException;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.io.NullWritable;
047import org.apache.hadoop.io.compress.CompressionCodec;
048import org.apache.hadoop.mapreduce.RecordWriter;
049import org.apache.hadoop.mapreduce.TaskAttemptContext;
050import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
051
052/**
053 * Hadoop OutputFormat for mapreduce jobs ('new' API) that want to write data to WARC files.
054 *
055 * Usage:
056 *
057 * ```java
058 * Job job = new Job(getConf());
059 * job.setOutputFormatClass(WARCOutputFormat.class);
060 * job.setOutputKeyClass(NullWritable.class);
061 * job.setOutputValueClass(WARCWritable.class);
062 * FileOutputFormat.setCompressOutput(job, true);
063 * ```
064 *
065 * The tasks generating the output (usually the reducers, but may be the mappers if there
066 * are no reducers) should use `NullWritable.get()` as the output key, and the
067 * {@link WARCWritable} as the output value.
068 */
069public class WARCOutputFormat extends FileOutputFormat<NullWritable, WARCWritable> {
070
071  /**
072   * Creates a new output file in WARC format, and returns a RecordWriter for writing to it.
073   */
074  @Override
075  public RecordWriter<NullWritable, WARCWritable> getRecordWriter(TaskAttemptContext context)
076      throws IOException, InterruptedException {
077    return new WARCWriter(context);
078  }
079
080  private class WARCWriter extends RecordWriter<NullWritable, WARCWritable> {
081    private final WARCFileWriter writer;
082
083    public WARCWriter(TaskAttemptContext context) throws IOException {
084      Configuration conf = context.getConfiguration();
085      CompressionCodec codec =
086        getCompressOutput(context) ? WARCFileWriter.getGzipCodec(conf) : null;
087      Path workFile = getDefaultWorkFile(context, "");
088      this.writer = new WARCFileWriter(conf, codec, workFile);
089    }
090
091    @Override
092    public void write(NullWritable key, WARCWritable value)
093        throws IOException, InterruptedException {
094      writer.write(value);
095    }
096
097    @Override
098    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
099      writer.close();
100    }
101  }
102
103}