001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018/*
019 * The MIT License (MIT)
020 * Copyright (c) 2014 Martin Kleppmann
021 *
022 * Permission is hereby granted, free of charge, to any person obtaining a copy
023 * of this software and associated documentation files (the "Software"), to deal
024 * in the Software without restriction, including without limitation the rights
025 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
026 * copies of the Software, and to permit persons to whom the Software is
027 * furnished to do so, subject to the following conditions:
028 *
029 * The above copyright notice and this permission notice shall be included in
030 * all copies or substantial portions of the Software.
031 *
032 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
033 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
034 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
035 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
036 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
037 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
038 * THE SOFTWARE.
039 */
040
041package org.apache.hadoop.hbase.test.util.warc;
042
043import java.io.BufferedOutputStream;
044import java.io.DataOutputStream;
045import java.io.FilterOutputStream;
046import java.io.IOException;
047import java.io.OutputStream;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.FSDataOutputStream;
050import org.apache.hadoop.fs.FileSystem;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.io.compress.CompressionCodec;
053import org.apache.hadoop.util.Progressable;
054import org.apache.hadoop.util.ReflectionUtils;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * Writes {@link WARCRecord}s to a WARC file, using Hadoop's filesystem APIs. (This means you
060 * can write to HDFS, S3 or any other filesystem supported by Hadoop). This implementation is
061 * not tied to the MapReduce APIs -- that link is provided by the mapred
062 * {@link com.martinkl.warc.mapred.WARCOutputFormat} and the mapreduce
063 * {@link com.martinkl.warc.mapreduce.WARCOutputFormat}.
064 *
065 * WARCFileWriter keeps track of how much data it has written (optionally gzip-compressed);
066 * when the file becomes larger than some threshold, it is automatically closed and a
067 * new segment is started. A segment number is appended to the filename for that purpose.
068 * The segment number always starts at 00000, and by default a new segment is started when
069 * the file size exceeds 1GB. To change the target size for a segment, you can set the
070 * `warc.output.segment.size` key in the Hadoop configuration to the number of bytes.
071 * (Files may actually be a bit larger than this threshold, since we finish writing the
072 * current record before opening a new file.)
073 */
074public class WARCFileWriter {
075  private static final Logger logger = LoggerFactory.getLogger(WARCFileWriter.class);
076  public static final long DEFAULT_MAX_SEGMENT_SIZE = 1000000000L; // 1 GB
077
078  private final Configuration conf;
079  private final CompressionCodec codec;
080  private final Path workOutputPath;
081  private final Progressable progress;
082  private final String extensionFormat;
083  private final long maxSegmentSize;
084  private long segmentsCreated = 0, segmentsAttempted = 0, bytesWritten = 0;
085  private CountingOutputStream byteStream;
086  private DataOutputStream dataStream;
087
088  /**
089   * Creates a WARC file, and opens it for writing. If a file with the same name already
090   * exists, an attempt number in the filename is incremented until we find a file that
091   * doesn't already exist.
092   *
093   * @param conf The Hadoop configuration.
094   * @param codec If null, the file is uncompressed. If non-null, this compression codec
095   *              will be used. The codec's default file extension is appended to the filename.
096   * @param workOutputPath The directory and filename prefix to which the data should be
097   *                       written. We append a segment number and filename extensions to it.
098   */
099  public WARCFileWriter(Configuration conf, CompressionCodec codec, Path workOutputPath)
100      throws IOException {
101    this(conf, codec, workOutputPath, null);
102  }
103
104  /**
105   * Creates a WARC file, and opens it for writing. If a file with the same name already
106   * exists, it is *overwritten*. Note that this is different behaviour from the other
107   * constructor. Yes, this sucks. It will probably change in a future version.
108   *
109   * @param conf The Hadoop configuration.
110   * @param codec If null, the file is uncompressed. If non-null, this compression codec
111   *              will be used. The codec's default file extension is appended to the filename.
112   * @param workOutputPath The directory and filename prefix to which the data should be
113   *                       written. We append a segment number and filename extensions to it.
114   * @param progress An object used by the mapred API for tracking a task's progress.
115   */
116  public WARCFileWriter(Configuration conf, CompressionCodec codec, Path workOutputPath,
117      Progressable progress) throws IOException {
118    this.conf = conf;
119    this.codec = codec;
120    this.workOutputPath = workOutputPath;
121    this.progress = progress;
122    this.extensionFormat = ".seg-%05d.attempt-%05d.warc" +
123        (codec == null ? "" : codec.getDefaultExtension());
124    this.maxSegmentSize = conf.getLong("warc.output.segment.size", DEFAULT_MAX_SEGMENT_SIZE);
125    createSegment();
126  }
127
128  /**
129   * Instantiates a Hadoop codec for compressing and decompressing Gzip files. This is the
130   * most common compression applied to WARC files.
131   *
132   * @param conf The Hadoop configuration.
133   */
134  public static CompressionCodec getGzipCodec(Configuration conf) {
135    try {
136      return (CompressionCodec) ReflectionUtils.newInstance(
137        conf.getClassByName("org.apache.hadoop.io.compress.GzipCodec")
138          .asSubclass(CompressionCodec.class),
139        conf);
140    } catch (ClassNotFoundException e) {
141      logger.warn("GzipCodec could not be instantiated", e);
142      return null;
143    }
144  }
145
146  /**
147   * Creates an output segment file and sets up the output streams to point at it.
148   * If the file already exists, retries with a different filename. This is a bit nasty --
149   * after all, {@link FileOutputFormat}'s work directory concept is supposed to prevent
150   * filename clashes -- but it looks like Amazon Elastic MapReduce prevents use of per-task
151   * work directories if the output of a job is on S3.
152   *
153   * TODO: Investigate this and find a better solution.
154   */
155  private void createSegment() throws IOException {
156    segmentsAttempted = 0;
157    bytesWritten = 0;
158    boolean success = false;
159
160    while (!success) {
161      Path path = workOutputPath.suffix(String.format(extensionFormat, segmentsCreated,
162        segmentsAttempted));
163      FileSystem fs = path.getFileSystem(conf);
164
165      try {
166        // The o.a.h.mapred OutputFormats overwrite existing files, whereas
167        // the o.a.h.mapreduce OutputFormats don't overwrite. Bizarre...
168        // Here, overwrite if progress != null, i.e. if using mapred API.
169        FSDataOutputStream fsStream = (progress == null) ? fs.create(path, false) :
170          fs.create(path, progress);
171        byteStream = new CountingOutputStream(new BufferedOutputStream(fsStream));
172        dataStream = new DataOutputStream(codec == null ? byteStream :
173          codec.createOutputStream(byteStream));
174        segmentsCreated++;
175        logger.info("Writing to output file: {}", path);
176        success = true;
177
178      } catch (IOException e) {
179        if (e.getMessage().startsWith("File already exists")) {
180          logger.warn("Tried to create file {} but it already exists; retrying.", path);
181          segmentsAttempted++; // retry
182        } else {
183          throw e;
184        }
185      }
186    }
187  }
188
189  /**
190   * Appends a {@link WARCRecord} to the file, in WARC/1.0 format.
191   * @param record The record to be written.
192   */
193  public void write(WARCRecord record) throws IOException {
194    if (bytesWritten > maxSegmentSize) {
195      dataStream.close();
196      createSegment();
197    }
198    record.write(dataStream);
199  }
200
201  /**
202   * Appends a {@link WARCRecord} wrapped in a {@link WARCWritable} to the file.
203   * @param record The wrapper around the record to be written.
204   */
205  public void write(WARCWritable record) throws IOException {
206    if (record.getRecord() != null) {
207      write(record.getRecord());
208    }
209  }
210
211  /**
212   * Flushes any buffered data and closes the file.
213   */
214  public void close() throws IOException {
215    dataStream.close();
216  }
217
218  private class CountingOutputStream extends FilterOutputStream {
219    public CountingOutputStream(OutputStream out) {
220      super(out);
221    }
222
223    @Override
224    public void write(byte[] b, int off, int len) throws IOException {
225      out.write(b, off, len);
226      bytesWritten += len;
227    }
228
229    @Override
230    public void write(int b) throws IOException {
231      out.write(b);
232      bytesWritten++;
233    }
234
235    // Overriding close() because FilterOutputStream's close() method pre-JDK8 has bad behavior:
236    // it silently ignores any exception thrown by flush(). Instead, just close the delegate
237    // stream. It should flush itself if necessary. (Thanks to the Guava project for noticing
238    // this.)
239    @Override
240    public void close() throws IOException {
241      out.close();
242    }
243  }
244
245}