View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.DataOutputStream;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FSDataOutputStream;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.fs.permission.FsPermission;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValue.KVComparator;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.io.compress.Compression;
39  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.FSUtils;
43  import org.apache.hadoop.io.Writable;
44  
45  /**
46   * Common functionality needed by all versions of {@link HFile} writers.
47   */
48  @InterfaceAudience.Private
49  public abstract class AbstractHFileWriter implements HFile.Writer {
50  
51    /** The Cell previously appended. Becomes the last cell in the file.*/
52    protected Cell lastCell = null;
53  
54    /** FileSystem stream to write into. */
55    protected FSDataOutputStream outputStream;
56  
57    /** True if we opened the <code>outputStream</code> (and so will close it). */
58    protected final boolean closeOutputStream;
59  
60    /** A "file info" block: a key-value map of file-wide metadata. */
61    protected FileInfo fileInfo = new HFile.FileInfo();
62  
63    /** Total # of key/value entries, i.e. how many times add() was called. */
64    protected long entryCount = 0;
65  
66    /** Used for calculating the average key length. */
67    protected long totalKeyLength = 0;
68  
69    /** Used for calculating the average value length. */
70    protected long totalValueLength = 0;
71  
72    /** Total uncompressed bytes, maybe calculate a compression ratio later. */
73    protected long totalUncompressedBytes = 0;
74  
75    /** Key comparator. Used to ensure we write in order. */
76    protected final KVComparator comparator;
77  
78    /** Meta block names. */
79    protected List<byte[]> metaNames = new ArrayList<byte[]>();
80  
81    /** {@link Writable}s representing meta block data. */
82    protected List<Writable> metaData = new ArrayList<Writable>();
83  
84    /**
85     * First cell in a block.
86     * This reference should be short-lived since we write hfiles in a burst.
87     */
88    protected Cell firstCellInBlock = null;
89  
90    /** May be null if we were passed a stream. */
91    protected final Path path;
92  
93  
94    /** Cache configuration for caching data on write. */
95    protected final CacheConfig cacheConf;
96  
97    /**
98     * Name for this object used when logging or in toString. Is either
99     * the result of a toString on stream or else name of passed file Path.
100    */
101   protected final String name;
102 
103   /**
104    * The data block encoding which will be used.
105    * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
106    */
107   protected final HFileDataBlockEncoder blockEncoder;
108   
109   protected final HFileContext hFileContext;
110 
111   public AbstractHFileWriter(CacheConfig cacheConf,
112       FSDataOutputStream outputStream, Path path, 
113       KVComparator comparator, HFileContext fileContext) {
114     this.outputStream = outputStream;
115     this.path = path;
116     this.name = path != null ? path.getName() : outputStream.toString();
117     this.hFileContext = fileContext;
118     DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
119     if (encoding != DataBlockEncoding.NONE) {
120       this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
121     } else {
122       this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
123     }
124     this.comparator = comparator != null ? comparator
125         : KeyValue.COMPARATOR;
126 
127     closeOutputStream = path != null;
128     this.cacheConf = cacheConf;
129   }
130 
131   /**
132    * Add last bits of metadata to file info before it is written out.
133    */
134   protected void finishFileInfo() throws IOException {
135     if (lastCell != null) {
136       // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
137       // byte buffer. Won't take a tuple.
138       byte [] lastKey = CellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
139       fileInfo.append(FileInfo.LASTKEY, lastKey, false);
140     }
141 
142     // Average key length.
143     int avgKeyLen =
144         entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
145     fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
146 
147     // Average value length.
148     int avgValueLen =
149         entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
150     fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
151 
152     fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
153       false);
154   }
155 
156   /**
157    * Add to the file info. All added key/value pairs can be obtained using
158    * {@link HFile.Reader#loadFileInfo()}.
159    *
160    * @param k Key
161    * @param v Value
162    * @throws IOException in case the key or the value are invalid
163    */
164   @Override
165   public void appendFileInfo(final byte[] k, final byte[] v)
166       throws IOException {
167     fileInfo.append(k, v, true);
168   }
169 
170   /**
171    * Sets the file info offset in the trailer, finishes up populating fields in
172    * the file info, and writes the file info into the given data output. The
173    * reason the data output is not always {@link #outputStream} is that we store
174    * file info as a block in version 2.
175    *
176    * @param trailer fixed file trailer
177    * @param out the data output to write the file info to
178    * @throws IOException
179    */
180   protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
181   throws IOException {
182     trailer.setFileInfoOffset(outputStream.getPos());
183     finishFileInfo();
184     fileInfo.write(out);
185   }
186 
187   /**
188    * Checks that the given Cell's key does not violate the key order.
189    *
190    * @param cell Cell whose key to check.
191    * @return true if the key is duplicate
192    * @throws IOException if the key or the key order is wrong
193    */
194   protected boolean checkKey(final Cell cell) throws IOException {
195     boolean isDuplicateKey = false;
196 
197     if (cell == null) {
198       throw new IOException("Key cannot be null or empty");
199     }
200     if (lastCell != null) {
201       int keyComp = comparator.compareOnlyKeyPortion(lastCell, cell);
202 
203       if (keyComp > 0) {
204         throw new IOException("Added a key not lexically larger than"
205             + " previous. Current cell = " + cell + ", lastCell = " + lastCell);
206       } else if (keyComp == 0) {
207         isDuplicateKey = true;
208       }
209     }
210     return isDuplicateKey;
211   }
212 
213   /** Checks the given value for validity. */
214   protected void checkValue(final byte[] value, final int offset,
215       final int length) throws IOException {
216     if (value == null) {
217       throw new IOException("Value cannot be null");
218     }
219   }
220 
221   /**
222    * @return Path or null if we were passed a stream rather than a Path.
223    */
224   @Override
225   public Path getPath() {
226     return path;
227   }
228 
229   @Override
230   public String toString() {
231     return "writer=" + (path != null ? path.toString() : null) + ", name="
232         + name + ", compression=" + hFileContext.getCompression().getName();
233   }
234 
235   /**
236    * Sets remaining trailer fields, writes the trailer to disk, and optionally
237    * closes the output stream.
238    */
239   protected void finishClose(FixedFileTrailer trailer) throws IOException {
240     trailer.setMetaIndexCount(metaNames.size());
241     trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
242     trailer.setEntryCount(entryCount);
243     trailer.setCompressionCodec(hFileContext.getCompression());
244 
245     trailer.serialize(outputStream);
246 
247     if (closeOutputStream) {
248       outputStream.close();
249       outputStream = null;
250     }
251   }
252 
253   public static Compression.Algorithm compressionByName(String algoName) {
254     if (algoName == null)
255       return HFile.DEFAULT_COMPRESSION_ALGORITHM;
256     return Compression.getCompressionAlgorithmByName(algoName);
257   }
258 
259   /** A helper method to create HFile output streams in constructors */
260   protected static FSDataOutputStream createOutputStream(Configuration conf,
261       FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
262     FsPermission perms = FSUtils.getFilePermissions(fs, conf,
263         HConstants.DATA_FILE_UMASK_KEY);
264     return FSUtils.create(conf, fs, path, perms, favoredNodes);
265   }
266 }