View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.DataOutputStream;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.List;
27  
28  import org.apache.hadoop.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.FSDataOutputStream;
31  import org.apache.hadoop.fs.FileSystem;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.fs.permission.FsPermission;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValue.KVComparator;
37  import org.apache.hadoop.hbase.io.compress.Compression;
38  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
39  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hbase.util.FSUtils;
42  import org.apache.hadoop.io.Writable;
43  
44  /**
45   * Common functionality needed by all versions of {@link HFile} writers.
46   */
47  @InterfaceAudience.Private
48  public abstract class AbstractHFileWriter implements HFile.Writer {
49  
50    /** Key previously appended. Becomes the last key in the file. */
51    protected byte[] lastKeyBuffer = null;
52  
53    protected int lastKeyOffset = -1;
54    protected int lastKeyLength = -1;
55  
56    /** FileSystem stream to write into. */
57    protected FSDataOutputStream outputStream;
58  
59    /** True if we opened the <code>outputStream</code> (and so will close it). */
60    protected final boolean closeOutputStream;
61  
62    /** A "file info" block: a key-value map of file-wide metadata. */
63    protected FileInfo fileInfo = new HFile.FileInfo();
64  
65    /** Total # of key/value entries, i.e. how many times add() was called. */
66    protected long entryCount = 0;
67  
68    /** Used for calculating the average key length. */
69    protected long totalKeyLength = 0;
70  
71    /** Used for calculating the average value length. */
72    protected long totalValueLength = 0;
73  
74    /** Total uncompressed bytes, maybe calculate a compression ratio later. */
75    protected long totalUncompressedBytes = 0;
76  
77    /** Key comparator. Used to ensure we write in order. */
78    protected final KVComparator comparator;
79  
80    /** Meta block names. */
81    protected List<byte[]> metaNames = new ArrayList<byte[]>();
82  
83    /** {@link Writable}s representing meta block data. */
84    protected List<Writable> metaData = new ArrayList<Writable>();
85  
86    /** First key in a block. */
87    protected byte[] firstKeyInBlock = null;
88  
89    /** May be null if we were passed a stream. */
90    protected final Path path;
91  
92  
93    /** Cache configuration for caching data on write. */
94    protected final CacheConfig cacheConf;
95  
96    /**
97     * Name for this object used when logging or in toString. Is either
98     * the result of a toString on stream or else name of passed file Path.
99     */
100   protected final String name;
101 
102   /**
103    * The data block encoding which will be used.
104    * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
105    */
106   protected final HFileDataBlockEncoder blockEncoder;
107   
108   protected final HFileContext hFileContext;
109 
110   public AbstractHFileWriter(CacheConfig cacheConf,
111       FSDataOutputStream outputStream, Path path, 
112       KVComparator comparator, HFileContext fileContext) {
113     this.outputStream = outputStream;
114     this.path = path;
115     this.name = path != null ? path.getName() : outputStream.toString();
116     this.hFileContext = fileContext;
117     DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
118     if (encoding != DataBlockEncoding.NONE) {
119       this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
120     } else {
121       this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
122     }
123     this.comparator = comparator != null ? comparator
124         : KeyValue.COMPARATOR;
125 
126     closeOutputStream = path != null;
127     this.cacheConf = cacheConf;
128   }
129 
130   /**
131    * Add last bits of metadata to file info before it is written out.
132    */
133   protected void finishFileInfo() throws IOException {
134     if (lastKeyBuffer != null) {
135       // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
136       // byte buffer. Won't take a tuple.
137       fileInfo.append(FileInfo.LASTKEY, Arrays.copyOfRange(lastKeyBuffer,
138           lastKeyOffset, lastKeyOffset + lastKeyLength), false);
139     }
140 
141     // Average key length.
142     int avgKeyLen =
143         entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
144     fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
145 
146     // Average value length.
147     int avgValueLen =
148         entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
149     fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
150   }
151 
152   /**
153    * Add to the file info. All added key/value pairs can be obtained using
154    * {@link HFile.Reader#loadFileInfo()}.
155    *
156    * @param k Key
157    * @param v Value
158    * @throws IOException in case the key or the value are invalid
159    */
160   @Override
161   public void appendFileInfo(final byte[] k, final byte[] v)
162       throws IOException {
163     fileInfo.append(k, v, true);
164   }
165 
166   /**
167    * Sets the file info offset in the trailer, finishes up populating fields in
168    * the file info, and writes the file info into the given data output. The
169    * reason the data output is not always {@link #outputStream} is that we store
170    * file info as a block in version 2.
171    *
172    * @param trailer fixed file trailer
173    * @param out the data output to write the file info to
174    * @throws IOException
175    */
176   protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
177   throws IOException {
178     trailer.setFileInfoOffset(outputStream.getPos());
179     finishFileInfo();
180     fileInfo.write(out);
181   }
182 
183   /**
184    * Checks that the given key does not violate the key order.
185    *
186    * @param key Key to check.
187    * @return true if the key is duplicate
188    * @throws IOException if the key or the key order is wrong
189    */
190   protected boolean checkKey(final byte[] key, final int offset,
191       final int length) throws IOException {
192     boolean isDuplicateKey = false;
193 
194     if (key == null || length <= 0) {
195       throw new IOException("Key cannot be null or empty");
196     }
197     if (lastKeyBuffer != null) {
198       int keyComp = comparator.compareFlatKey(lastKeyBuffer, lastKeyOffset,
199           lastKeyLength, key, offset, length);
200 
201       if (keyComp > 0) {
202         throw new IOException("Added a key not lexically larger than"
203             + " previous key="
204             + Bytes.toStringBinary(key, offset, length)
205             + ", lastkey="
206             + Bytes.toStringBinary(lastKeyBuffer, lastKeyOffset,
207                 lastKeyLength));
208       } else if (keyComp == 0) {
209         isDuplicateKey = true;
210       }
211     }
212     return isDuplicateKey;
213   }
214 
215   /** Checks the given value for validity. */
216   protected void checkValue(final byte[] value, final int offset,
217       final int length) throws IOException {
218     if (value == null) {
219       throw new IOException("Value cannot be null");
220     }
221   }
222 
223   /**
224    * @return Path or null if we were passed a stream rather than a Path.
225    */
226   @Override
227   public Path getPath() {
228     return path;
229   }
230 
231   @Override
232   public String toString() {
233     return "writer=" + (path != null ? path.toString() : null) + ", name="
234         + name + ", compression=" + hFileContext.getCompression().getName();
235   }
236 
237   /**
238    * Sets remaining trailer fields, writes the trailer to disk, and optionally
239    * closes the output stream.
240    */
241   protected void finishClose(FixedFileTrailer trailer) throws IOException {
242     trailer.setMetaIndexCount(metaNames.size());
243     trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
244     trailer.setEntryCount(entryCount);
245     trailer.setCompressionCodec(hFileContext.getCompression());
246 
247     trailer.serialize(outputStream);
248 
249     if (closeOutputStream) {
250       outputStream.close();
251       outputStream = null;
252     }
253   }
254 
255   public static Compression.Algorithm compressionByName(String algoName) {
256     if (algoName == null)
257       return HFile.DEFAULT_COMPRESSION_ALGORITHM;
258     return Compression.getCompressionAlgorithmByName(algoName);
259   }
260 
261   /** A helper method to create HFile output streams in constructors */
262   protected static FSDataOutputStream createOutputStream(Configuration conf,
263       FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
264     FsPermission perms = FSUtils.getFilePermissions(fs, conf,
265         HConstants.DATA_FILE_UMASK_KEY);
266     return FSUtils.create(fs, path, perms, favoredNodes);
267   }
268 }