View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.DataOutput;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.List;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FSDataOutputStream;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.fs.permission.FsPermission;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.KeyValue.KeyComparator;
34  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
35  import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.FSUtils;
38  import org.apache.hadoop.io.RawComparator;
39  import org.apache.hadoop.io.Writable;
40  
41  /**
42   * Common functionality needed by all versions of {@link HFile} writers.
43   */
44  public abstract class AbstractHFileWriter extends SchemaConfigured
45      implements HFile.Writer {
46  
47    /** Key previously appended. Becomes the last key in the file. */
48    protected byte[] lastKeyBuffer = null;
49  
50    protected int lastKeyOffset = -1;
51    protected int lastKeyLength = -1;
52  
53    /** FileSystem stream to write into. */
54    protected FSDataOutputStream outputStream;
55  
56    /** True if we opened the <code>outputStream</code> (and so will close it). */
57    protected final boolean closeOutputStream;
58  
59    /** A "file info" block: a key-value map of file-wide metadata. */
60    protected FileInfo fileInfo = new HFile.FileInfo();
61  
62    /** Number of uncompressed bytes we allow per block. */
63    protected final int blockSize;
64  
65    /** Total # of key/value entries, i.e. how many times add() was called. */
66    protected long entryCount = 0;
67  
68    /** Used for calculating the average key length. */
69    protected long totalKeyLength = 0;
70  
71    /** Used for calculating the average value length. */
72    protected long totalValueLength = 0;
73  
74    /** Total uncompressed bytes, maybe calculate a compression ratio later. */
75    protected long totalUncompressedBytes = 0;
76  
77    /** Key comparator. Used to ensure we write in order. */
78    protected final RawComparator<byte[]> comparator;
79  
80    /** Meta block names. */
81    protected List<byte[]> metaNames = new ArrayList<byte[]>();
82  
83    /** {@link Writable}s representing meta block data. */
84    protected List<Writable> metaData = new ArrayList<Writable>();
85  
86    /** The compression algorithm used. NONE if no compression. */
87    protected final Compression.Algorithm compressAlgo;
88    
89    /**
90     * The data block encoding which will be used.
91     * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
92     */
93    protected final HFileDataBlockEncoder blockEncoder;
94  
95    /** First key in a block. */
96    protected byte[] firstKeyInBlock = null;
97  
98    /** May be null if we were passed a stream. */
99    protected final Path path;
100 
101 
102   /** Cache configuration for caching data on write. */
103   protected final CacheConfig cacheConf;
104 
105   /**
106    * Name for this object used when logging or in toString. Is either
107    * the result of a toString on stream or else name of passed file Path.
108    */
109   protected final String name;
110 
111   public AbstractHFileWriter(CacheConfig cacheConf,
112       FSDataOutputStream outputStream, Path path, int blockSize,
113       Compression.Algorithm compressAlgo,
114       HFileDataBlockEncoder dataBlockEncoder,
115       KeyComparator comparator) {
116     super(null, path);
117     this.outputStream = outputStream;
118     this.path = path;
119     this.name = path != null ? path.getName() : outputStream.toString();
120     this.blockSize = blockSize;
121     this.compressAlgo = compressAlgo == null
122         ? HFile.DEFAULT_COMPRESSION_ALGORITHM : compressAlgo;
123     this.blockEncoder = dataBlockEncoder != null
124         ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
125     this.comparator = comparator != null ? comparator
126         : Bytes.BYTES_RAWCOMPARATOR;
127 
128     closeOutputStream = path != null;
129     this.cacheConf = cacheConf;
130   }
131 
132   /**
133    * Add last bits of metadata to file info before it is written out.
134    */
135   protected void finishFileInfo() throws IOException {
136     if (lastKeyBuffer != null) {
137       // Make a copy. The copy is stuffed into HMapWritable. Needs a clean
138       // byte buffer. Won't take a tuple.
139       fileInfo.append(FileInfo.LASTKEY, Arrays.copyOfRange(lastKeyBuffer,
140           lastKeyOffset, lastKeyOffset + lastKeyLength), false);
141     }
142 
143     // Average key length.
144     int avgKeyLen =
145         entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
146     fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
147 
148     // Average value length.
149     int avgValueLen =
150         entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
151     fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
152   }
153 
154   /**
155    * Add to the file info. All added key/value pairs can be obtained using
156    * {@link HFile.Reader#loadFileInfo()}.
157    *
158    * @param k Key
159    * @param v Value
160    * @throws IOException in case the key or the value are invalid
161    */
162   @Override
163   public void appendFileInfo(final byte[] k, final byte[] v)
164       throws IOException {
165     fileInfo.append(k, v, true);
166   }
167 
168   /**
169    * Sets the file info offset in the trailer, finishes up populating fields in
170    * the file info, and writes the file info into the given data output. The
171    * reason the data output is not always {@link #outputStream} is that we store
172    * file info as a block in version 2.
173    *
174    * @param trailer fixed file trailer
175    * @param out the data output to write the file info to
176    * @throws IOException
177    */
178   protected final void writeFileInfo(FixedFileTrailer trailer, DataOutput out)
179       throws IOException {
180     trailer.setFileInfoOffset(outputStream.getPos());
181     finishFileInfo();
182     fileInfo.write(out);
183   }
184 
185   /**
186    * Checks that the given key does not violate the key order.
187    *
188    * @param key Key to check.
189    * @return true if the key is duplicate
190    * @throws IOException if the key or the key order is wrong
191    */
192   protected boolean checkKey(final byte[] key, final int offset,
193       final int length) throws IOException {
194     boolean isDuplicateKey = false;
195 
196     if (key == null || length <= 0) {
197       throw new IOException("Key cannot be null or empty");
198     }
199     if (length > HFile.MAXIMUM_KEY_LENGTH) {
200       throw new IOException("Key length " + length + " > "
201           + HFile.MAXIMUM_KEY_LENGTH);
202     }
203     if (lastKeyBuffer != null) {
204       int keyComp = comparator.compare(lastKeyBuffer, lastKeyOffset,
205           lastKeyLength, key, offset, length);
206       if (keyComp > 0) {
207         throw new IOException("Added a key not lexically larger than"
208             + " previous key="
209             + Bytes.toStringBinary(key, offset, length)
210             + ", lastkey="
211             + Bytes.toStringBinary(lastKeyBuffer, lastKeyOffset,
212                 lastKeyLength));
213       } else if (keyComp == 0) {
214         isDuplicateKey = true;
215       }
216     }
217     return isDuplicateKey;
218   }
219 
220   /** Checks the given value for validity. */
221   protected void checkValue(final byte[] value, final int offset,
222       final int length) throws IOException {
223     if (value == null) {
224       throw new IOException("Value cannot be null");
225     }
226   }
227 
228   /**
229    * @return Path or null if we were passed a stream rather than a Path.
230    */
231   @Override
232   public Path getPath() {
233     return path;
234   }
235 
236   @Override
237   public String toString() {
238     return "writer=" + (path != null ? path.toString() : null) + ", name="
239         + name + ", compression=" + compressAlgo.getName();
240   }
241 
242   /**
243    * Sets remaining trailer fields, writes the trailer to disk, and optionally
244    * closes the output stream.
245    */
246   protected void finishClose(FixedFileTrailer trailer) throws IOException {
247     trailer.setMetaIndexCount(metaNames.size());
248     trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
249     trailer.setEntryCount(entryCount);
250     trailer.setCompressionCodec(compressAlgo);
251 
252     trailer.serialize(outputStream);
253 
254     if (closeOutputStream) {
255       outputStream.close();
256       outputStream = null;
257     }
258   }
259 
260   public static Compression.Algorithm compressionByName(String algoName) {
261     if (algoName == null)
262       return HFile.DEFAULT_COMPRESSION_ALGORITHM;
263     return Compression.getCompressionAlgorithmByName(algoName);
264   }
265 
266   /** A helper method to create HFile output streams in constructors */
267   protected static FSDataOutputStream createOutputStream(Configuration conf,
268       FileSystem fs, Path path) throws IOException {
269     FsPermission perms = FSUtils.getFilePermissions(fs, conf,
270         HConstants.DATA_FILE_UMASK_KEY);
271     return FSUtils.create(fs, path, perms);
272   }
273 }