1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import java.io.ByteArrayOutputStream;
22 import java.io.DataOutputStream;
23 import java.io.IOException;
24 import java.io.OutputStream;
25 import java.nio.ByteBuffer;
26 import java.util.ArrayList;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FSDataOutputStream;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.KeyValue;
37 import org.apache.hadoop.hbase.KeyValue.KeyComparator;
38 import org.apache.hadoop.hbase.io.compress.Compression;
39 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
40 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
41 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
42 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
43 import org.apache.hadoop.hbase.regionserver.MemStore;
44 import org.apache.hadoop.hbase.util.ChecksumType;
45 import org.apache.hadoop.hbase.util.BloomFilterWriter;
46 import org.apache.hadoop.hbase.util.Bytes;
47 import org.apache.hadoop.io.Writable;
48 import org.apache.hadoop.io.compress.Compressor;
49
50
51
52
53 @InterfaceAudience.Private
54 public class HFileWriterV1 extends AbstractHFileWriter {
55
56
57 static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
58
59
60 public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
61
62 private static final Log LOG = LogFactory.getLog(HFileWriterV1.class);
63
64
65 private DataOutputStream out;
66
67
68 private long blockBegin;
69
70
71 private ArrayList<byte[]> blockKeys = new ArrayList<byte[]>();
72
73
74 private ArrayList<Long> blockOffsets = new ArrayList<Long>();
75
76
77 private ArrayList<Integer> blockDataSizes = new ArrayList<Integer>();
78
79 private Compressor compressor;
80
81
82 private ByteArrayOutputStream baos;
83 private DataOutputStream baosDos;
84
85 static class WriterFactoryV1 extends HFile.WriterFactory {
86 WriterFactoryV1(Configuration conf, CacheConfig cacheConf) {
87 super(conf, cacheConf);
88 }
89
90 @Override
91 public Writer createWriter(FileSystem fs, Path path,
92 FSDataOutputStream ostream, int blockSize,
93 Algorithm compressAlgo, HFileDataBlockEncoder dataBlockEncoder,
94 KeyComparator comparator, final ChecksumType checksumType,
95 final int bytesPerChecksum) throws IOException {
96
97 return new HFileWriterV1(conf, cacheConf, fs, path, ostream, blockSize,
98 compressAlgo, dataBlockEncoder, comparator);
99 }
100 }
101
102
103 public HFileWriterV1(Configuration conf, CacheConfig cacheConf,
104 FileSystem fs, Path path, FSDataOutputStream ostream,
105 int blockSize, Compression.Algorithm compress,
106 HFileDataBlockEncoder blockEncoder,
107 final KeyComparator comparator) throws IOException {
108 super(cacheConf, ostream == null ? createOutputStream(conf, fs, path) : ostream, path,
109 blockSize, compress, blockEncoder, comparator);
110 }
111
112
113
114
115
116
117 private void checkBlockBoundary() throws IOException {
118 if (this.out != null && this.out.size() < blockSize)
119 return;
120 finishBlock();
121 newBlock();
122 }
123
124
125
126
127
128
129 private void finishBlock() throws IOException {
130 if (this.out == null)
131 return;
132 long startTimeNs = System.nanoTime();
133
134 int size = releaseCompressingStream(this.out);
135 this.out = null;
136 blockKeys.add(firstKeyInBlock);
137 blockOffsets.add(Long.valueOf(blockBegin));
138 blockDataSizes.add(Integer.valueOf(size));
139 this.totalUncompressedBytes += size;
140
141 HFile.offerWriteLatency(System.nanoTime() - startTimeNs);
142
143 if (cacheConf.shouldCacheDataOnWrite()) {
144 baosDos.flush();
145
146 byte[] bytes = baos.toByteArray();
147 HFileBlock block = new HFileBlock(BlockType.DATA,
148 (int) (outputStream.getPos() - blockBegin), bytes.length, -1,
149 ByteBuffer.wrap(bytes, 0, bytes.length), HFileBlock.FILL_HEADER,
150 blockBegin, MemStore.NO_PERSISTENT_TS,
151 HFileBlock.MINOR_VERSION_NO_CHECKSUM,
152 0,
153 ChecksumType.NULL.getCode(),
154 (int) (outputStream.getPos() - blockBegin) +
155 HFileBlock.HEADER_SIZE_NO_CHECKSUM);
156
157 block = blockEncoder.diskToCacheFormat(block, false);
158 cacheConf.getBlockCache().cacheBlock(
159 new BlockCacheKey(name, blockBegin, DataBlockEncoding.NONE,
160 block.getBlockType()), block);
161 baosDos.close();
162 }
163 }
164
165
166
167
168
169
170 private void newBlock() throws IOException {
171
172 blockBegin = outputStream.getPos();
173 this.out = getCompressingStream();
174 BlockType.DATA.write(out);
175 firstKeyInBlock = null;
176 if (cacheConf.shouldCacheDataOnWrite()) {
177 this.baos = new ByteArrayOutputStream();
178 this.baosDos = new DataOutputStream(baos);
179 baosDos.write(HFileBlock.DUMMY_HEADER_NO_CHECKSUM);
180 }
181 }
182
183
184
185
186
187
188
189
190
191
192
193
194 private DataOutputStream getCompressingStream() throws IOException {
195 this.compressor = compressAlgo.getCompressor();
196
197
198
199
200
201
202
203
204 OutputStream os = this.compressAlgo.createCompressionStream(
205 this.outputStream, this.compressor, 0);
206 return new DataOutputStream(os);
207 }
208
209
210
211
212
213
214
215
216
217
218
219
220
221 private int releaseCompressingStream(final DataOutputStream dos)
222 throws IOException {
223 dos.flush();
224 this.compressAlgo.returnCompressor(this.compressor);
225 this.compressor = null;
226 return dos.size();
227 }
228
229
230
231
232
233
234
235
236
237
238
239
240 public void appendMetaBlock(String metaBlockName, Writable content) {
241 byte[] key = Bytes.toBytes(metaBlockName);
242 int i;
243 for (i = 0; i < metaNames.size(); ++i) {
244
245 byte[] cur = metaNames.get(i);
246 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
247 key.length) > 0) {
248 break;
249 }
250 }
251 metaNames.add(i, key);
252 metaData.add(i, content);
253 }
254
255
256
257
258
259
260
261
262
263 public void append(final KeyValue kv) throws IOException {
264 append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
265 kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
266 }
267
268
269
270
271
272
273
274
275
276
277
278 public void append(final byte[] key, final byte[] value) throws IOException {
279 append(key, 0, key.length, value, 0, value.length);
280 }
281
282
283
284
285
286
287
288
289
290
291
292
293
294 private void append(final byte[] key, final int koffset, final int klength,
295 final byte[] value, final int voffset, final int vlength)
296 throws IOException {
297 boolean dupKey = checkKey(key, koffset, klength);
298 checkValue(value, voffset, vlength);
299 if (!dupKey) {
300 checkBlockBoundary();
301 }
302
303 this.out.writeInt(klength);
304 totalKeyLength += klength;
305 this.out.writeInt(vlength);
306 totalValueLength += vlength;
307 this.out.write(key, koffset, klength);
308 this.out.write(value, voffset, vlength);
309
310 if (this.firstKeyInBlock == null) {
311
312 this.firstKeyInBlock = new byte[klength];
313 System.arraycopy(key, koffset, this.firstKeyInBlock, 0, klength);
314 }
315 this.lastKeyBuffer = key;
316 this.lastKeyOffset = koffset;
317 this.lastKeyLength = klength;
318 this.entryCount++;
319
320 if (cacheConf.shouldCacheDataOnWrite()) {
321 this.baosDos.writeInt(klength);
322 this.baosDos.writeInt(vlength);
323 this.baosDos.write(key, koffset, klength);
324 this.baosDos.write(value, voffset, vlength);
325 }
326 }
327
328 public void close() throws IOException {
329 if (this.outputStream == null) {
330 return;
331 }
332
333 blockEncoder.saveMetadata(this);
334
335
336
337 finishBlock();
338
339 FixedFileTrailer trailer = new FixedFileTrailer(1,
340 HFileBlock.MINOR_VERSION_NO_CHECKSUM);
341
342
343 ArrayList<Long> metaOffsets = null;
344 ArrayList<Integer> metaDataSizes = null;
345 if (metaNames.size() > 0) {
346 metaOffsets = new ArrayList<Long>(metaNames.size());
347 metaDataSizes = new ArrayList<Integer>(metaNames.size());
348 for (int i = 0; i < metaNames.size(); ++i) {
349
350 long curPos = outputStream.getPos();
351 metaOffsets.add(curPos);
352
353 DataOutputStream dos = getCompressingStream();
354 BlockType.META.write(dos);
355 metaData.get(i).write(dos);
356 int size = releaseCompressingStream(dos);
357
358 metaDataSizes.add(size);
359 }
360 }
361
362 writeFileInfo(trailer, outputStream);
363
364
365 trailer.setLoadOnOpenOffset(writeBlockIndex(this.outputStream,
366 this.blockKeys, this.blockOffsets, this.blockDataSizes));
367 LOG.info("Wrote a version 1 block index with " + this.blockKeys.size()
368 + " keys");
369
370 if (metaNames.size() > 0) {
371
372 writeBlockIndex(this.outputStream, metaNames, metaOffsets, metaDataSizes);
373 }
374
375
376 trailer.setDataIndexCount(blockKeys.size());
377
378 finishClose(trailer);
379 }
380
381 @Override
382 protected void finishFileInfo() throws IOException {
383 super.finishFileInfo();
384
385
386 fileInfo.append(FileInfo.COMPARATOR,
387 Bytes.toBytes(comparator.getClass().getName()), false);
388 }
389
390 @Override
391 public void addInlineBlockWriter(InlineBlockWriter bloomWriter) {
392
393 throw new UnsupportedOperationException();
394 }
395
396
397
398
399
400 @Override
401 public void addGeneralBloomFilter(BloomFilterWriter bfw) {
402 appendMetaBlock(BLOOM_FILTER_META_KEY,
403 bfw.getMetaWriter());
404 Writable dataWriter = bfw.getDataWriter();
405 if (dataWriter != null) {
406 appendMetaBlock(BLOOM_FILTER_DATA_KEY, dataWriter);
407 }
408 }
409
410 @Override
411 public void addDeleteFamilyBloomFilter(BloomFilterWriter bfw)
412 throws IOException {
413 throw new IOException("Delete Bloom filter is not supported in HFile V1");
414 }
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430 private static long writeBlockIndex(final FSDataOutputStream out,
431 final List<byte[]> keys, final List<Long> offsets,
432 final List<Integer> uncompressedSizes) throws IOException {
433 long pos = out.getPos();
434
435 if (keys.size() > 0) {
436 BlockType.INDEX_V1.write(out);
437
438 for (int i = 0; i < keys.size(); ++i) {
439 out.writeLong(offsets.get(i).longValue());
440 out.writeInt(uncompressedSizes.get(i).intValue());
441 byte[] key = keys.get(i);
442 Bytes.writeByteArray(out, key);
443 }
444 }
445 return pos;
446 }
447
448 }