001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018/*
019 * The MIT License (MIT)
020 * Copyright (c) 2014 Martin Kleppmann
021 *
022 * Permission is hereby granted, free of charge, to any person obtaining a copy
023 * of this software and associated documentation files (the "Software"), to deal
024 * in the Software without restriction, including without limitation the rights
025 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
026 * copies of the Software, and to permit persons to whom the Software is
027 * furnished to do so, subject to the following conditions:
028 *
029 * The above copyright notice and this permission notice shall be included in
030 * all copies or substantial portions of the Software.
031 *
032 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
033 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
034 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
035 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
036 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
037 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
038 * THE SOFTWARE.
039 */
040package org.apache.hadoop.hbase.test.util.warc;
041
042import java.io.BufferedOutputStream;
043import java.io.DataOutputStream;
044import java.io.FilterOutputStream;
045import java.io.IOException;
046import java.io.OutputStream;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FSDataOutputStream;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.io.compress.CompressionCodec;
052import org.apache.hadoop.util.Progressable;
053import org.apache.hadoop.util.ReflectionUtils;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * Writes {@link WARCRecord}s to a WARC file, using Hadoop's filesystem APIs. (This means you can
059 * write to HDFS, S3 or any other filesystem supported by Hadoop). This implementation is not tied
060 * to the MapReduce APIs -- that link is provided by the mapred
061 * {@link com.martinkl.warc.mapred.WARCOutputFormat} and the mapreduce
062 * {@link com.martinkl.warc.mapreduce.WARCOutputFormat}. WARCFileWriter keeps track of how much data
063 * it has written (optionally gzip-compressed); when the file becomes larger than some threshold, it
064 * is automatically closed and a new segment is started. A segment number is appended to the
065 * filename for that purpose. The segment number always starts at 00000, and by default a new
066 * segment is started when the file size exceeds 1GB. To change the target size for a segment, you
067 * can set the `warc.output.segment.size` key in the Hadoop configuration to the number of bytes.
068 * (Files may actually be a bit larger than this threshold, since we finish writing the current
069 * record before opening a new file.)
070 */
071public class WARCFileWriter {
072  private static final Logger logger = LoggerFactory.getLogger(WARCFileWriter.class);
073  public static final long DEFAULT_MAX_SEGMENT_SIZE = 1000000000L; // 1 GB
074
075  private final Configuration conf;
076  private final CompressionCodec codec;
077  private final Path workOutputPath;
078  private final Progressable progress;
079  private final String extensionFormat;
080  private final long maxSegmentSize;
081  private long segmentsCreated = 0, segmentsAttempted = 0, bytesWritten = 0;
082  private CountingOutputStream byteStream;
083  private DataOutputStream dataStream;
084
085  /**
086   * Creates a WARC file, and opens it for writing. If a file with the same name already exists, an
087   * attempt number in the filename is incremented until we find a file that doesn't already exist.
088   * @param conf           The Hadoop configuration.
089   * @param codec          If null, the file is uncompressed. If non-null, this compression codec
090   *                       will be used. The codec's default file extension is appended to the
091   *                       filename.
092   * @param workOutputPath The directory and filename prefix to which the data should be written. We
093   *                       append a segment number and filename extensions to it.
094   */
095  public WARCFileWriter(Configuration conf, CompressionCodec codec, Path workOutputPath)
096    throws IOException {
097    this(conf, codec, workOutputPath, null);
098  }
099
100  /**
101   * Creates a WARC file, and opens it for writing. If a file with the same name already exists, it
102   * is *overwritten*. Note that this is different behaviour from the other constructor. Yes, this
103   * sucks. It will probably change in a future version.
104   * @param conf           The Hadoop configuration.
105   * @param codec          If null, the file is uncompressed. If non-null, this compression codec
106   *                       will be used. The codec's default file extension is appended to the
107   *                       filename.
108   * @param workOutputPath The directory and filename prefix to which the data should be written. We
109   *                       append a segment number and filename extensions to it.
110   * @param progress       An object used by the mapred API for tracking a task's progress.
111   */
112  public WARCFileWriter(Configuration conf, CompressionCodec codec, Path workOutputPath,
113    Progressable progress) throws IOException {
114    this.conf = conf;
115    this.codec = codec;
116    this.workOutputPath = workOutputPath;
117    this.progress = progress;
118    this.extensionFormat =
119      ".seg-%05d.attempt-%05d.warc" + (codec == null ? "" : codec.getDefaultExtension());
120    this.maxSegmentSize = conf.getLong("warc.output.segment.size", DEFAULT_MAX_SEGMENT_SIZE);
121    createSegment();
122  }
123
124  /**
125   * Instantiates a Hadoop codec for compressing and decompressing Gzip files. This is the most
126   * common compression applied to WARC files.
127   * @param conf The Hadoop configuration.
128   */
129  public static CompressionCodec getGzipCodec(Configuration conf) {
130    try {
131      return (CompressionCodec) ReflectionUtils
132        .newInstance(conf.getClassByName("org.apache.hadoop.io.compress.GzipCodec")
133          .asSubclass(CompressionCodec.class), conf);
134    } catch (ClassNotFoundException e) {
135      logger.warn("GzipCodec could not be instantiated", e);
136      return null;
137    }
138  }
139
140  /**
141   * Creates an output segment file and sets up the output streams to point at it. If the file
142   * already exists, retries with a different filename. This is a bit nasty -- after all,
143   * {@link FileOutputFormat}'s work directory concept is supposed to prevent filename clashes --
144   * but it looks like Amazon Elastic MapReduce prevents use of per-task work directories if the
145   * output of a job is on S3. TODO: Investigate this and find a better solution.
146   */
147  private void createSegment() throws IOException {
148    segmentsAttempted = 0;
149    bytesWritten = 0;
150    boolean success = false;
151
152    while (!success) {
153      Path path =
154        workOutputPath.suffix(String.format(extensionFormat, segmentsCreated, segmentsAttempted));
155      FileSystem fs = path.getFileSystem(conf);
156
157      try {
158        // The o.a.h.mapred OutputFormats overwrite existing files, whereas
159        // the o.a.h.mapreduce OutputFormats don't overwrite. Bizarre...
160        // Here, overwrite if progress != null, i.e. if using mapred API.
161        FSDataOutputStream fsStream =
162          (progress == null) ? fs.create(path, false) : fs.create(path, progress);
163        byteStream = new CountingOutputStream(new BufferedOutputStream(fsStream));
164        dataStream =
165          new DataOutputStream(codec == null ? byteStream : codec.createOutputStream(byteStream));
166        segmentsCreated++;
167        logger.info("Writing to output file: {}", path);
168        success = true;
169
170      } catch (IOException e) {
171        if (e.getMessage().startsWith("File already exists")) {
172          logger.warn("Tried to create file {} but it already exists; retrying.", path);
173          segmentsAttempted++; // retry
174        } else {
175          throw e;
176        }
177      }
178    }
179  }
180
181  /**
182   * Appends a {@link WARCRecord} to the file, in WARC/1.0 format.
183   * @param record The record to be written.
184   */
185  public void write(WARCRecord record) throws IOException {
186    if (bytesWritten > maxSegmentSize) {
187      dataStream.close();
188      createSegment();
189    }
190    record.write(dataStream);
191  }
192
193  /**
194   * Appends a {@link WARCRecord} wrapped in a {@link WARCWritable} to the file.
195   * @param record The wrapper around the record to be written.
196   */
197  public void write(WARCWritable record) throws IOException {
198    if (record.getRecord() != null) {
199      write(record.getRecord());
200    }
201  }
202
203  /**
204   * Flushes any buffered data and closes the file.
205   */
206  public void close() throws IOException {
207    dataStream.close();
208  }
209
210  private class CountingOutputStream extends FilterOutputStream {
211    public CountingOutputStream(OutputStream out) {
212      super(out);
213    }
214
215    @Override
216    public void write(byte[] b, int off, int len) throws IOException {
217      out.write(b, off, len);
218      bytesWritten += len;
219    }
220
221    @Override
222    public void write(int b) throws IOException {
223      out.write(b);
224      bytesWritten++;
225    }
226
227    // Overriding close() because FilterOutputStream's close() method pre-JDK8 has bad behavior:
228    // it silently ignores any exception thrown by flush(). Instead, just close the delegate
229    // stream. It should flush itself if necessary. (Thanks to the Guava project for noticing
230    // this.)
231    @Override
232    public void close() throws IOException {
233      out.close();
234    }
235  }
236
237}