001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018/* 019 * The MIT License (MIT) 020 * Copyright (c) 2014 Martin Kleppmann 021 * 022 * Permission is hereby granted, free of charge, to any person obtaining a copy 023 * of this software and associated documentation files (the "Software"), to deal 024 * in the Software without restriction, including without limitation the rights 025 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 026 * copies of the Software, and to permit persons to whom the Software is 027 * furnished to do so, subject to the following conditions: 028 * 029 * The above copyright notice and this permission notice shall be included in 030 * all copies or substantial portions of the Software. 031 * 032 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 033 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 034 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 035 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 036 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 037 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 038 * THE SOFTWARE. 039 */ 040package org.apache.hadoop.hbase.test.util.warc; 041 042import java.io.BufferedOutputStream; 043import java.io.DataOutputStream; 044import java.io.FilterOutputStream; 045import java.io.IOException; 046import java.io.OutputStream; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FSDataOutputStream; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.io.compress.CompressionCodec; 052import org.apache.hadoop.util.Progressable; 053import org.apache.hadoop.util.ReflectionUtils; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * Writes {@link WARCRecord}s to a WARC file, using Hadoop's filesystem APIs. (This means you can 059 * write to HDFS, S3 or any other filesystem supported by Hadoop). This implementation is not tied 060 * to the MapReduce APIs -- that link is provided by the mapred 061 * {@link com.martinkl.warc.mapred.WARCOutputFormat} and the mapreduce 062 * {@link com.martinkl.warc.mapreduce.WARCOutputFormat}. WARCFileWriter keeps track of how much data 063 * it has written (optionally gzip-compressed); when the file becomes larger than some threshold, it 064 * is automatically closed and a new segment is started. A segment number is appended to the 065 * filename for that purpose. The segment number always starts at 00000, and by default a new 066 * segment is started when the file size exceeds 1GB. To change the target size for a segment, you 067 * can set the `warc.output.segment.size` key in the Hadoop configuration to the number of bytes. 068 * (Files may actually be a bit larger than this threshold, since we finish writing the current 069 * record before opening a new file.) 070 */ 071public class WARCFileWriter { 072 private static final Logger logger = LoggerFactory.getLogger(WARCFileWriter.class); 073 public static final long DEFAULT_MAX_SEGMENT_SIZE = 1000000000L; // 1 GB 074 075 private final Configuration conf; 076 private final CompressionCodec codec; 077 private final Path workOutputPath; 078 private final Progressable progress; 079 private final String extensionFormat; 080 private final long maxSegmentSize; 081 private long segmentsCreated = 0, segmentsAttempted = 0, bytesWritten = 0; 082 private CountingOutputStream byteStream; 083 private DataOutputStream dataStream; 084 085 /** 086 * Creates a WARC file, and opens it for writing. If a file with the same name already exists, an 087 * attempt number in the filename is incremented until we find a file that doesn't already exist. 088 * @param conf The Hadoop configuration. 089 * @param codec If null, the file is uncompressed. If non-null, this compression codec 090 * will be used. The codec's default file extension is appended to the 091 * filename. 092 * @param workOutputPath The directory and filename prefix to which the data should be written. We 093 * append a segment number and filename extensions to it. 094 */ 095 public WARCFileWriter(Configuration conf, CompressionCodec codec, Path workOutputPath) 096 throws IOException { 097 this(conf, codec, workOutputPath, null); 098 } 099 100 /** 101 * Creates a WARC file, and opens it for writing. If a file with the same name already exists, it 102 * is *overwritten*. Note that this is different behaviour from the other constructor. Yes, this 103 * sucks. It will probably change in a future version. 104 * @param conf The Hadoop configuration. 105 * @param codec If null, the file is uncompressed. If non-null, this compression codec 106 * will be used. The codec's default file extension is appended to the 107 * filename. 108 * @param workOutputPath The directory and filename prefix to which the data should be written. We 109 * append a segment number and filename extensions to it. 110 * @param progress An object used by the mapred API for tracking a task's progress. 111 */ 112 public WARCFileWriter(Configuration conf, CompressionCodec codec, Path workOutputPath, 113 Progressable progress) throws IOException { 114 this.conf = conf; 115 this.codec = codec; 116 this.workOutputPath = workOutputPath; 117 this.progress = progress; 118 this.extensionFormat = 119 ".seg-%05d.attempt-%05d.warc" + (codec == null ? "" : codec.getDefaultExtension()); 120 this.maxSegmentSize = conf.getLong("warc.output.segment.size", DEFAULT_MAX_SEGMENT_SIZE); 121 createSegment(); 122 } 123 124 /** 125 * Instantiates a Hadoop codec for compressing and decompressing Gzip files. This is the most 126 * common compression applied to WARC files. 127 * @param conf The Hadoop configuration. 128 */ 129 public static CompressionCodec getGzipCodec(Configuration conf) { 130 try { 131 return (CompressionCodec) ReflectionUtils 132 .newInstance(conf.getClassByName("org.apache.hadoop.io.compress.GzipCodec") 133 .asSubclass(CompressionCodec.class), conf); 134 } catch (ClassNotFoundException e) { 135 logger.warn("GzipCodec could not be instantiated", e); 136 return null; 137 } 138 } 139 140 /** 141 * Creates an output segment file and sets up the output streams to point at it. If the file 142 * already exists, retries with a different filename. This is a bit nasty -- after all, 143 * {@link FileOutputFormat}'s work directory concept is supposed to prevent filename clashes -- 144 * but it looks like Amazon Elastic MapReduce prevents use of per-task work directories if the 145 * output of a job is on S3. TODO: Investigate this and find a better solution. 146 */ 147 private void createSegment() throws IOException { 148 segmentsAttempted = 0; 149 bytesWritten = 0; 150 boolean success = false; 151 152 while (!success) { 153 Path path = 154 workOutputPath.suffix(String.format(extensionFormat, segmentsCreated, segmentsAttempted)); 155 FileSystem fs = path.getFileSystem(conf); 156 157 try { 158 // The o.a.h.mapred OutputFormats overwrite existing files, whereas 159 // the o.a.h.mapreduce OutputFormats don't overwrite. Bizarre... 160 // Here, overwrite if progress != null, i.e. if using mapred API. 161 FSDataOutputStream fsStream = 162 (progress == null) ? fs.create(path, false) : fs.create(path, progress); 163 byteStream = new CountingOutputStream(new BufferedOutputStream(fsStream)); 164 dataStream = 165 new DataOutputStream(codec == null ? byteStream : codec.createOutputStream(byteStream)); 166 segmentsCreated++; 167 logger.info("Writing to output file: {}", path); 168 success = true; 169 170 } catch (IOException e) { 171 if (e.getMessage().startsWith("File already exists")) { 172 logger.warn("Tried to create file {} but it already exists; retrying.", path); 173 segmentsAttempted++; // retry 174 } else { 175 throw e; 176 } 177 } 178 } 179 } 180 181 /** 182 * Appends a {@link WARCRecord} to the file, in WARC/1.0 format. 183 * @param record The record to be written. 184 */ 185 public void write(WARCRecord record) throws IOException { 186 if (bytesWritten > maxSegmentSize) { 187 dataStream.close(); 188 createSegment(); 189 } 190 record.write(dataStream); 191 } 192 193 /** 194 * Appends a {@link WARCRecord} wrapped in a {@link WARCWritable} to the file. 195 * @param record The wrapper around the record to be written. 196 */ 197 public void write(WARCWritable record) throws IOException { 198 if (record.getRecord() != null) { 199 write(record.getRecord()); 200 } 201 } 202 203 /** 204 * Flushes any buffered data and closes the file. 205 */ 206 public void close() throws IOException { 207 dataStream.close(); 208 } 209 210 private class CountingOutputStream extends FilterOutputStream { 211 public CountingOutputStream(OutputStream out) { 212 super(out); 213 } 214 215 @Override 216 public void write(byte[] b, int off, int len) throws IOException { 217 out.write(b, off, len); 218 bytesWritten += len; 219 } 220 221 @Override 222 public void write(int b) throws IOException { 223 out.write(b); 224 bytesWritten++; 225 } 226 227 // Overriding close() because FilterOutputStream's close() method pre-JDK8 has bad behavior: 228 // it silently ignores any exception thrown by flush(). Instead, just close the delegate 229 // stream. It should flush itself if necessary. (Thanks to the Guava project for noticing 230 // this.) 231 @Override 232 public void close() throws IOException { 233 out.close(); 234 } 235 } 236 237}