1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.InetSocketAddress;
24 import java.security.Key;
25 import java.security.KeyException;
26 import java.security.PrivilegedExceptionAction;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Iterator;
33 import java.util.List;
34 import java.util.NavigableSet;
35 import java.util.Set;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.CompletionService;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.ExecutorCompletionService;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.ThreadPoolExecutor;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.locks.ReentrantReadWriteLock;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.hbase.Cell;
52 import org.apache.hadoop.hbase.CellComparator;
53 import org.apache.hadoop.hbase.CellUtil;
54 import org.apache.hadoop.hbase.CompoundConfiguration;
55 import org.apache.hadoop.hbase.HColumnDescriptor;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HRegionInfo;
58 import org.apache.hadoop.hbase.KeyValue;
59 import org.apache.hadoop.hbase.RemoteExceptionHandler;
60 import org.apache.hadoop.hbase.TableName;
61 import org.apache.hadoop.hbase.Tag;
62 import org.apache.hadoop.hbase.TagType;
63 import org.apache.hadoop.hbase.classification.InterfaceAudience;
64 import org.apache.hadoop.hbase.client.Scan;
65 import org.apache.hadoop.hbase.conf.ConfigurationManager;
66 import org.apache.hadoop.hbase.io.compress.Compression;
67 import org.apache.hadoop.hbase.io.crypto.Cipher;
68 import org.apache.hadoop.hbase.io.crypto.Encryption;
69 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
70 import org.apache.hadoop.hbase.io.hfile.HFile;
71 import org.apache.hadoop.hbase.io.hfile.HFileContext;
72 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
73 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
74 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
75 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
76 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
77 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
78 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
79 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
80 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
81 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
82 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
83 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
84 import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
85 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
86 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
87 import org.apache.hadoop.hbase.security.EncryptionUtil;
88 import org.apache.hadoop.hbase.security.User;
89 import org.apache.hadoop.hbase.util.Bytes;
90 import org.apache.hadoop.hbase.util.ChecksumType;
91 import org.apache.hadoop.hbase.util.ClassSize;
92 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
93 import org.apache.hadoop.hbase.util.Pair;
94 import org.apache.hadoop.hbase.util.ReflectionUtils;
95 import org.apache.hadoop.util.StringUtils;
96 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
97
98 import com.google.common.annotations.VisibleForTesting;
99 import com.google.common.base.Preconditions;
100 import com.google.common.collect.ImmutableCollection;
101 import com.google.common.collect.ImmutableList;
102 import com.google.common.collect.Lists;
103 import com.google.common.collect.Sets;
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 @InterfaceAudience.Private
129 public class HStore implements Store {
130 private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
131 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
132 "hbase.server.compactchecker.interval.multiplier";
133 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
134 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
135 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
136
137 static final Log LOG = LogFactory.getLog(HStore.class);
138
139 protected final MemStore memstore;
140
141 private final HRegion region;
142 private final HColumnDescriptor family;
143 private final HRegionFileSystem fs;
144 private Configuration conf;
145 private final CacheConfig cacheConf;
146 private long lastCompactSize = 0;
147 volatile boolean forceMajor = false;
148
149 static int closeCheckInterval = 0;
150 private volatile long storeSize = 0L;
151 private volatile long totalUncompressedBytes = 0L;
152
153
154
155
156
157
158
159
160
161
162 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
163 private final boolean verifyBulkLoads;
164
165 private ScanInfo scanInfo;
166
167
168 final List<StoreFile> filesCompacting = Lists.newArrayList();
169
170
171 private final Set<ChangedReadersObserver> changedReaderObservers =
172 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
173
174 private final int blocksize;
175 private HFileDataBlockEncoder dataBlockEncoder;
176
177
178 private ChecksumType checksumType;
179 private int bytesPerChecksum;
180
181
182 private final KeyValue.KVComparator comparator;
183
184 final StoreEngine<?, ?, ?, ?> storeEngine;
185
186 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
187 private volatile OffPeakHours offPeakHours;
188
189 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
190 private int flushRetriesNumber;
191 private int pauseTime;
192
193 private long blockingFileCount;
194 private int compactionCheckMultiplier;
195
196 private Encryption.Context cryptoContext = Encryption.Context.NONE;
197
198 private volatile long flushedCellsCount = 0;
199 private volatile long compactedCellsCount = 0;
200 private volatile long majorCompactedCellsCount = 0;
201 private volatile long flushedCellsSize = 0;
202 private volatile long compactedCellsSize = 0;
203 private volatile long majorCompactedCellsSize = 0;
204
205
206
207
208
209
210
211
212
213 protected HStore(final HRegion region, final HColumnDescriptor family,
214 final Configuration confParam) throws IOException {
215
216 HRegionInfo info = region.getRegionInfo();
217 this.fs = region.getRegionFileSystem();
218
219
220 fs.createStoreDir(family.getNameAsString());
221 this.region = region;
222 this.family = family;
223
224
225
226 this.conf = new CompoundConfiguration()
227 .add(confParam)
228 .addStringMap(region.getTableDesc().getConfiguration())
229 .addStringMap(family.getConfiguration())
230 .addWritableMap(family.getValues());
231 this.blocksize = family.getBlocksize();
232
233 this.dataBlockEncoder =
234 new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
235
236 this.comparator = info.getComparator();
237
238 long timeToPurgeDeletes =
239 Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
240 LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
241 "ms in store " + this);
242
243 long ttl = determineTTLFromFamily(family);
244
245
246 scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
247 String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
248 this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
249 Configuration.class, KeyValue.KVComparator.class }, new Object[] { conf, this.comparator });
250 this.offPeakHours = OffPeakHours.getInstance(conf);
251
252
253 this.cacheConf = new CacheConfig(conf, family);
254
255 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
256
257 this.blockingFileCount =
258 conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
259 this.compactionCheckMultiplier = conf.getInt(
260 COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
261 if (this.compactionCheckMultiplier <= 0) {
262 LOG.error("Compaction check period multiplier must be positive, setting default: "
263 + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
264 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
265 }
266
267 if (HStore.closeCheckInterval == 0) {
268 HStore.closeCheckInterval = conf.getInt(
269 "hbase.hstore.close.check.interval", 10*1000*1000
270 }
271
272 this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
273 this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
274
275
276 this.checksumType = getChecksumType(conf);
277
278 this.bytesPerChecksum = getBytesPerChecksum(conf);
279 flushRetriesNumber = conf.getInt(
280 "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
281 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
282 if (flushRetriesNumber <= 0) {
283 throw new IllegalArgumentException(
284 "hbase.hstore.flush.retries.number must be > 0, not "
285 + flushRetriesNumber);
286 }
287
288
289 String cipherName = family.getEncryptionType();
290 if (cipherName != null) {
291 Cipher cipher;
292 Key key;
293 byte[] keyBytes = family.getEncryptionKey();
294 if (keyBytes != null) {
295
296 String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
297 User.getCurrent().getShortName());
298 try {
299
300 key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
301 } catch (KeyException e) {
302
303
304 if (LOG.isDebugEnabled()) {
305 LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
306 }
307 String alternateKeyName =
308 conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
309 if (alternateKeyName != null) {
310 try {
311 key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
312 } catch (KeyException ex) {
313 throw new IOException(ex);
314 }
315 } else {
316 throw new IOException(e);
317 }
318 }
319
320 cipher = Encryption.getCipher(conf, key.getAlgorithm());
321 if (cipher == null) {
322 throw new RuntimeException("Cipher '" + key.getAlgorithm() + "' is not available");
323 }
324
325
326
327 if (!cipher.getName().equalsIgnoreCase(cipherName)) {
328 throw new RuntimeException("Encryption for family '" + family.getNameAsString() +
329 "' configured with type '" + cipherName +
330 "' but key specifies algorithm '" + cipher.getName() + "'");
331 }
332 } else {
333
334 cipher = Encryption.getCipher(conf, cipherName);
335 if (cipher == null) {
336 throw new RuntimeException("Cipher '" + cipherName + "' is not available");
337 }
338 key = cipher.getRandomKey();
339 }
340 cryptoContext = Encryption.newContext(conf);
341 cryptoContext.setCipher(cipher);
342 cryptoContext.setKey(key);
343 }
344 }
345
346
347
348
349
350 private static long determineTTLFromFamily(final HColumnDescriptor family) {
351
352 long ttl = family.getTimeToLive();
353 if (ttl == HConstants.FOREVER) {
354
355 ttl = Long.MAX_VALUE;
356 } else if (ttl == -1) {
357 ttl = Long.MAX_VALUE;
358 } else {
359
360 ttl *= 1000;
361 }
362 return ttl;
363 }
364
365 @Override
366 public String getColumnFamilyName() {
367 return this.family.getNameAsString();
368 }
369
370 @Override
371 public TableName getTableName() {
372 return this.getRegionInfo().getTable();
373 }
374
375 @Override
376 public FileSystem getFileSystem() {
377 return this.fs.getFileSystem();
378 }
379
380 public HRegionFileSystem getRegionFileSystem() {
381 return this.fs;
382 }
383
384
385 @Override
386 public long getStoreFileTtl() {
387
388 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
389 }
390
391 @Override
392 public long getMemstoreFlushSize() {
393
394 return this.region.memstoreFlushSize;
395 }
396
397 @Override
398 public long getFlushableSize() {
399 return this.memstore.getFlushableSize();
400 }
401
402 @Override
403 public long getSnapshotSize() {
404 return this.memstore.getSnapshotSize();
405 }
406
407 @Override
408 public long getCompactionCheckMultiplier() {
409 return this.compactionCheckMultiplier;
410 }
411
412 @Override
413 public long getBlockingFileCount() {
414 return blockingFileCount;
415 }
416
417
418
419
420
421
422
423 public static int getBytesPerChecksum(Configuration conf) {
424 return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
425 HFile.DEFAULT_BYTES_PER_CHECKSUM);
426 }
427
428
429
430
431
432
433 public static ChecksumType getChecksumType(Configuration conf) {
434 String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
435 if (checksumName == null) {
436 return HFile.DEFAULT_CHECKSUM_TYPE;
437 } else {
438 return ChecksumType.nameToType(checksumName);
439 }
440 }
441
442
443
444
445 public static int getCloseCheckInterval() {
446 return closeCheckInterval;
447 }
448
449 @Override
450 public HColumnDescriptor getFamily() {
451 return this.family;
452 }
453
454
455
456
457 @Override
458 public long getMaxSequenceId() {
459 return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
460 }
461
462 @Override
463 public long getMaxMemstoreTS() {
464 return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
465 }
466
467
468
469
470
471
472
473 @Deprecated
474 public static Path getStoreHomedir(final Path tabledir,
475 final HRegionInfo hri, final byte[] family) {
476 return getStoreHomedir(tabledir, hri.getEncodedName(), family);
477 }
478
479
480
481
482
483
484
485 @Deprecated
486 public static Path getStoreHomedir(final Path tabledir,
487 final String encodedName, final byte[] family) {
488 return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
489 }
490
491 @Override
492 public HFileDataBlockEncoder getDataBlockEncoder() {
493 return dataBlockEncoder;
494 }
495
496
497
498
499
500 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
501 this.dataBlockEncoder = blockEncoder;
502 }
503
504
505
506
507
508
509 private List<StoreFile> loadStoreFiles() throws IOException {
510 Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
511 return openStoreFiles(files);
512 }
513
514 private List<StoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
515 if (files == null || files.size() == 0) {
516 return new ArrayList<StoreFile>();
517 }
518
519 ThreadPoolExecutor storeFileOpenerThreadPool =
520 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
521 this.getColumnFamilyName());
522 CompletionService<StoreFile> completionService =
523 new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
524
525 int totalValidStoreFile = 0;
526 for (final StoreFileInfo storeFileInfo: files) {
527
528 completionService.submit(new Callable<StoreFile>() {
529 @Override
530 public StoreFile call() throws IOException {
531 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
532 return storeFile;
533 }
534 });
535 totalValidStoreFile++;
536 }
537
538 ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
539 IOException ioe = null;
540 try {
541 for (int i = 0; i < totalValidStoreFile; i++) {
542 try {
543 Future<StoreFile> future = completionService.take();
544 StoreFile storeFile = future.get();
545 long length = storeFile.getReader().length();
546 this.storeSize += length;
547 this.totalUncompressedBytes +=
548 storeFile.getReader().getTotalUncompressedBytes();
549 if (LOG.isDebugEnabled()) {
550 LOG.debug("loaded " + storeFile.toStringDetailed());
551 }
552 results.add(storeFile);
553 } catch (InterruptedException e) {
554 if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
555 } catch (ExecutionException e) {
556 if (ioe == null) ioe = new IOException(e.getCause());
557 }
558 }
559 } finally {
560 storeFileOpenerThreadPool.shutdownNow();
561 }
562 if (ioe != null) {
563
564 for (StoreFile file : results) {
565 try {
566 if (file != null) file.closeReader(true);
567 } catch (IOException e) {
568 LOG.warn(e.getMessage());
569 }
570 }
571 throw ioe;
572 }
573
574 return results;
575 }
576
577
578
579
580
581
582
583
584 @Override
585 public void refreshStoreFiles() throws IOException {
586 Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
587 refreshStoreFilesInternal(newFiles);
588 }
589
590 @Override
591 public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
592 List<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(newFiles.size());
593 for (String file : newFiles) {
594 storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file));
595 }
596 refreshStoreFilesInternal(storeFiles);
597 }
598
599
600
601
602
603
604
605
606 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
607 StoreFileManager sfm = storeEngine.getStoreFileManager();
608 Collection<StoreFile> currentFiles = sfm.getStorefiles();
609 if (currentFiles == null) currentFiles = new ArrayList<StoreFile>(0);
610
611 if (newFiles == null) newFiles = new ArrayList<StoreFileInfo>(0);
612
613 HashMap<StoreFileInfo, StoreFile> currentFilesSet = new HashMap<StoreFileInfo, StoreFile>(currentFiles.size());
614 for (StoreFile sf : currentFiles) {
615 currentFilesSet.put(sf.getFileInfo(), sf);
616 }
617 HashSet<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
618
619 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
620 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
621
622 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
623 return;
624 }
625
626 LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString()
627 + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
628
629 Set<StoreFile> toBeRemovedStoreFiles = new HashSet<StoreFile>(toBeRemovedFiles.size());
630 for (StoreFileInfo sfi : toBeRemovedFiles) {
631 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
632 }
633
634
635 List<StoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
636
637
638 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles);
639
640
641
642
643 if (!toBeAddedFiles.isEmpty()) {
644 region.getMVCC().advanceMemstoreReadPointIfNeeded(this.getMaxSequenceId());
645 }
646
647
648 completeCompaction(toBeRemovedStoreFiles, false);
649 }
650
651 private StoreFile createStoreFileAndReader(final Path p) throws IOException {
652 StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
653 return createStoreFileAndReader(info);
654 }
655
656 private StoreFile createStoreFileAndReader(final StoreFileInfo info)
657 throws IOException {
658 info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
659 StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
660 this.family.getBloomFilterType());
661 storeFile.createReader();
662 return storeFile;
663 }
664
665 @Override
666 public Pair<Long, Cell> add(final Cell cell) {
667 lock.readLock().lock();
668 try {
669 return this.memstore.add(cell);
670 } finally {
671 lock.readLock().unlock();
672 }
673 }
674
675 @Override
676 public long timeOfOldestEdit() {
677 return memstore.timeOfOldestEdit();
678 }
679
680
681
682
683
684
685
686 protected long delete(final KeyValue kv) {
687 lock.readLock().lock();
688 try {
689 return this.memstore.delete(kv);
690 } finally {
691 lock.readLock().unlock();
692 }
693 }
694
695 @Override
696 public void rollback(final Cell cell) {
697 lock.readLock().lock();
698 try {
699 this.memstore.rollback(cell);
700 } finally {
701 lock.readLock().unlock();
702 }
703 }
704
705
706
707
708 @Override
709 public Collection<StoreFile> getStorefiles() {
710 return this.storeEngine.getStoreFileManager().getStorefiles();
711 }
712
713 @Override
714 public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
715 HFile.Reader reader = null;
716 try {
717 LOG.info("Validating hfile at " + srcPath + " for inclusion in "
718 + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
719 reader = HFile.createReader(srcPath.getFileSystem(conf),
720 srcPath, cacheConf, conf);
721 reader.loadFileInfo();
722
723 byte[] firstKey = reader.getFirstRowKey();
724 Preconditions.checkState(firstKey != null, "First key can not be null");
725 byte[] lk = reader.getLastKey();
726 Preconditions.checkState(lk != null, "Last key can not be null");
727 byte[] lastKey = KeyValue.createKeyValueFromKey(lk).getRow();
728
729 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
730 " last=" + Bytes.toStringBinary(lastKey));
731 LOG.debug("Region bounds: first=" +
732 Bytes.toStringBinary(getRegionInfo().getStartKey()) +
733 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
734
735 if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
736 throw new WrongRegionException(
737 "Bulk load file " + srcPath.toString() + " does not fit inside region "
738 + this.getRegionInfo().getRegionNameAsString());
739 }
740
741 if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
742 HConstants.DEFAULT_MAX_FILE_SIZE)) {
743 LOG.warn("Trying to bulk load hfile " + srcPath.toString() + " with size: " +
744 reader.length() + " bytes can be problematic as it may lead to oversplitting.");
745 }
746
747 if (verifyBulkLoads) {
748 long verificationStartTime = EnvironmentEdgeManager.currentTime();
749 LOG.info("Full verification started for bulk load hfile: " + srcPath.toString());
750 Cell prevCell = null;
751 HFileScanner scanner = reader.getScanner(false, false, false);
752 scanner.seekTo();
753 do {
754 Cell cell = scanner.getKeyValue();
755 if (prevCell != null) {
756 if (CellComparator.compareRows(prevCell, cell) > 0) {
757 throw new InvalidHFileException("Previous row is greater than"
758 + " current row: path=" + srcPath + " previous="
759 + CellUtil.getCellKeyAsString(prevCell) + " current="
760 + CellUtil.getCellKeyAsString(cell));
761 }
762 if (CellComparator.compareFamilies(prevCell, cell) != 0) {
763 throw new InvalidHFileException("Previous key had different"
764 + " family compared to current key: path=" + srcPath
765 + " previous="
766 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
767 prevCell.getFamilyLength())
768 + " current="
769 + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
770 cell.getFamilyLength()));
771 }
772 }
773 prevCell = cell;
774 } while (scanner.next());
775 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString()
776 + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime)
777 + " ms");
778 }
779 } finally {
780 if (reader != null) reader.close();
781 }
782 }
783
784 @Override
785 public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
786 Path srcPath = new Path(srcPathStr);
787 Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
788
789 LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
790 + dstPath + " - updating store file list.");
791
792 StoreFile sf = createStoreFileAndReader(dstPath);
793 bulkLoadHFile(sf);
794
795 LOG.info("Successfully loaded store file " + srcPath + " into store " + this
796 + " (new location: " + dstPath + ")");
797
798 return dstPath;
799 }
800
801 @Override
802 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
803 StoreFile sf = createStoreFileAndReader(fileInfo);
804 bulkLoadHFile(sf);
805 }
806
807 private void bulkLoadHFile(StoreFile sf) throws IOException {
808 StoreFile.Reader r = sf.getReader();
809 this.storeSize += r.length();
810 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
811
812
813 this.lock.writeLock().lock();
814 try {
815 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
816 } finally {
817
818
819
820
821
822 this.lock.writeLock().unlock();
823 }
824 notifyChangedReadersObservers();
825 LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
826 if (LOG.isTraceEnabled()) {
827 String traceMessage = "BULK LOAD time,size,store size,store files ["
828 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
829 + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
830 LOG.trace(traceMessage);
831 }
832 }
833
834 @Override
835 public ImmutableCollection<StoreFile> close() throws IOException {
836 this.lock.writeLock().lock();
837 try {
838
839 ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
840
841 if (!result.isEmpty()) {
842
843 ThreadPoolExecutor storeFileCloserThreadPool = this.region
844 .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
845 + this.getColumnFamilyName());
846
847
848 CompletionService<Void> completionService =
849 new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
850 for (final StoreFile f : result) {
851 completionService.submit(new Callable<Void>() {
852 @Override
853 public Void call() throws IOException {
854 boolean evictOnClose =
855 cacheConf != null? cacheConf.shouldEvictOnClose(): true;
856 f.closeReader(evictOnClose);
857 return null;
858 }
859 });
860 }
861
862 IOException ioe = null;
863 try {
864 for (int i = 0; i < result.size(); i++) {
865 try {
866 Future<Void> future = completionService.take();
867 future.get();
868 } catch (InterruptedException e) {
869 if (ioe == null) {
870 ioe = new InterruptedIOException();
871 ioe.initCause(e);
872 }
873 } catch (ExecutionException e) {
874 if (ioe == null) ioe = new IOException(e.getCause());
875 }
876 }
877 } finally {
878 storeFileCloserThreadPool.shutdownNow();
879 }
880 if (ioe != null) throw ioe;
881 }
882 LOG.info("Closed " + this);
883 return result;
884 } finally {
885 this.lock.writeLock().unlock();
886 }
887 }
888
889
890
891
892
893
894 void snapshot() {
895 this.lock.writeLock().lock();
896 try {
897 this.memstore.snapshot();
898 } finally {
899 this.lock.writeLock().unlock();
900 }
901 }
902
903
904
905
906
907
908
909
910
911
912 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
913 MonitoredTask status) throws IOException {
914
915
916
917
918
919 StoreFlusher flusher = storeEngine.getStoreFlusher();
920 IOException lastException = null;
921 for (int i = 0; i < flushRetriesNumber; i++) {
922 try {
923 List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
924 Path lastPathName = null;
925 try {
926 for (Path pathName : pathNames) {
927 lastPathName = pathName;
928 validateStoreFile(pathName);
929 }
930 return pathNames;
931 } catch (Exception e) {
932 LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
933 if (e instanceof IOException) {
934 lastException = (IOException) e;
935 } else {
936 lastException = new IOException(e);
937 }
938 }
939 } catch (IOException e) {
940 LOG.warn("Failed flushing store file, retrying num=" + i, e);
941 lastException = e;
942 }
943 if (lastException != null && i < (flushRetriesNumber - 1)) {
944 try {
945 Thread.sleep(pauseTime);
946 } catch (InterruptedException e) {
947 IOException iie = new InterruptedIOException();
948 iie.initCause(e);
949 throw iie;
950 }
951 }
952 }
953 throw lastException;
954 }
955
956
957
958
959
960
961
962
963 private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
964 throws IOException {
965
966 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
967
968 status.setStatus("Flushing " + this + ": reopening flushed file");
969 StoreFile sf = createStoreFileAndReader(dstPath);
970
971 StoreFile.Reader r = sf.getReader();
972 this.storeSize += r.length();
973 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
974
975 if (LOG.isInfoEnabled()) {
976 LOG.info("Added " + sf + ", entries=" + r.getEntries() +
977 ", sequenceid=" + logCacheFlushId +
978 ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
979 }
980 return sf;
981 }
982
983
984
985
986
987
988
989
990
991 @Override
992 public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
993 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
994 throws IOException {
995 final CacheConfig writerCacheConf;
996 if (isCompaction) {
997
998 writerCacheConf = new CacheConfig(cacheConf);
999 writerCacheConf.setCacheDataOnWrite(false);
1000 } else {
1001 writerCacheConf = cacheConf;
1002 }
1003 InetSocketAddress[] favoredNodes = null;
1004 if (region.getRegionServerServices() != null) {
1005 favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
1006 region.getRegionInfo().getEncodedName());
1007 }
1008 HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
1009 cryptoContext);
1010 StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
1011 this.getFileSystem())
1012 .withFilePath(fs.createTempName())
1013 .withComparator(comparator)
1014 .withBloomType(family.getBloomFilterType())
1015 .withMaxKeyCount(maxKeyCount)
1016 .withFavoredNodes(favoredNodes)
1017 .withFileContext(hFileContext)
1018 .build();
1019 return w;
1020 }
1021
1022 private HFileContext createFileContext(Compression.Algorithm compression,
1023 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
1024 if (compression == null) {
1025 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
1026 }
1027 HFileContext hFileContext = new HFileContextBuilder()
1028 .withIncludesMvcc(includeMVCCReadpoint)
1029 .withIncludesTags(includesTag)
1030 .withCompression(compression)
1031 .withCompressTags(family.isCompressTags())
1032 .withChecksumType(checksumType)
1033 .withBytesPerCheckSum(bytesPerChecksum)
1034 .withBlockSize(blocksize)
1035 .withHBaseCheckSum(true)
1036 .withDataBlockEncoding(family.getDataBlockEncoding())
1037 .withEncryptionContext(cryptoContext)
1038 .withCreateTime(EnvironmentEdgeManager.currentTime())
1039 .build();
1040 return hFileContext;
1041 }
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051 private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
1052 throws IOException {
1053 this.lock.writeLock().lock();
1054 try {
1055 this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
1056 if (snapshotId > 0) {
1057 this.memstore.clearSnapshot(snapshotId);
1058 }
1059 } finally {
1060
1061
1062
1063
1064
1065 this.lock.writeLock().unlock();
1066 }
1067
1068
1069 notifyChangedReadersObservers();
1070
1071 if (LOG.isTraceEnabled()) {
1072 long totalSize = 0;
1073 for (StoreFile sf : sfs) {
1074 totalSize += sf.getReader().length();
1075 }
1076 String traceMessage = "FLUSH time,count,size,store size,store files ["
1077 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
1078 + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
1079 LOG.trace(traceMessage);
1080 }
1081 return needsCompaction();
1082 }
1083
1084
1085
1086
1087
1088 private void notifyChangedReadersObservers() throws IOException {
1089 for (ChangedReadersObserver o: this.changedReaderObservers) {
1090 o.updateReaders();
1091 }
1092 }
1093
1094
1095
1096
1097
1098
1099 @Override
1100 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
1101 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1102 byte[] stopRow, long readPt) throws IOException {
1103 Collection<StoreFile> storeFilesToScan;
1104 List<KeyValueScanner> memStoreScanners;
1105 this.lock.readLock().lock();
1106 try {
1107 storeFilesToScan =
1108 this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
1109 memStoreScanners = this.memstore.getScanners(readPt);
1110 } finally {
1111 this.lock.readLock().unlock();
1112 }
1113
1114
1115
1116
1117
1118
1119 List<StoreFileScanner> sfScanners = StoreFileScanner
1120 .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher,
1121 readPt);
1122 List<KeyValueScanner> scanners =
1123 new ArrayList<KeyValueScanner>(sfScanners.size()+1);
1124 scanners.addAll(sfScanners);
1125
1126 scanners.addAll(memStoreScanners);
1127 return scanners;
1128 }
1129
1130 @Override
1131 public void addChangedReaderObserver(ChangedReadersObserver o) {
1132 this.changedReaderObservers.add(o);
1133 }
1134
1135 @Override
1136 public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1137
1138 this.changedReaderObservers.remove(o);
1139 }
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188 @Override
1189 public List<StoreFile> compact(CompactionContext compaction,
1190 CompactionThroughputController throughputController) throws IOException {
1191 return compact(compaction, throughputController, null);
1192 }
1193
1194 @Override
1195 public List<StoreFile> compact(CompactionContext compaction,
1196 CompactionThroughputController throughputController, User user) throws IOException {
1197 assert compaction != null;
1198 List<StoreFile> sfs = null;
1199 CompactionRequest cr = compaction.getRequest();;
1200 try {
1201
1202
1203
1204 long compactionStartTime = EnvironmentEdgeManager.currentTime();
1205 assert compaction.hasSelection();
1206 Collection<StoreFile> filesToCompact = cr.getFiles();
1207 assert !filesToCompact.isEmpty();
1208 synchronized (filesCompacting) {
1209
1210
1211 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1212 }
1213
1214
1215 LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1216 + this + " of " + this.getRegionInfo().getRegionNameAsString()
1217 + " into tmpdir=" + fs.getTempDir() + ", totalSize="
1218 + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1219
1220
1221 List<Path> newFiles = compaction.compact(throughputController, user);
1222
1223
1224 if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1225 LOG.warn("hbase.hstore.compaction.complete is set to false");
1226 sfs = new ArrayList<StoreFile>(newFiles.size());
1227 for (Path newFile : newFiles) {
1228
1229 StoreFile sf = createStoreFileAndReader(newFile);
1230 sf.closeReader(true);
1231 sfs.add(sf);
1232 }
1233 return sfs;
1234 }
1235
1236 sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
1237 writeCompactionWalRecord(filesToCompact, sfs);
1238 replaceStoreFiles(filesToCompact, sfs);
1239 if (cr.isMajor()) {
1240 majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
1241 majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
1242 } else {
1243 compactedCellsCount += getCompactionProgress().totalCompactingKVs;
1244 compactedCellsSize += getCompactionProgress().totalCompactedSize;
1245 }
1246
1247 completeCompaction(filesToCompact, true);
1248
1249 logCompactionEndMessage(cr, sfs, compactionStartTime);
1250 return sfs;
1251 } finally {
1252 finishCompactionRequest(cr);
1253 }
1254 }
1255
1256 private List<StoreFile> moveCompatedFilesIntoPlace(
1257 final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
1258 List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
1259 for (Path newFile : newFiles) {
1260 assert newFile != null;
1261 final StoreFile sf = moveFileIntoPlace(newFile);
1262 if (this.getCoprocessorHost() != null) {
1263 final Store thisStore = this;
1264 if (user == null) {
1265 getCoprocessorHost().postCompact(thisStore, sf, cr);
1266 } else {
1267 try {
1268 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1269 @Override
1270 public Void run() throws Exception {
1271 getCoprocessorHost().postCompact(thisStore, sf, cr);
1272 return null;
1273 }
1274 });
1275 } catch (InterruptedException ie) {
1276 InterruptedIOException iioe = new InterruptedIOException();
1277 iioe.initCause(ie);
1278 throw iioe;
1279 }
1280 }
1281 }
1282 assert sf != null;
1283 sfs.add(sf);
1284 }
1285 return sfs;
1286 }
1287
1288
1289 StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1290 validateStoreFile(newFile);
1291
1292 Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1293 return createStoreFileAndReader(destPath);
1294 }
1295
1296
1297
1298
1299
1300
1301 private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1302 Collection<StoreFile> newFiles) throws IOException {
1303 if (region.getWAL() == null) return;
1304 List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
1305 for (StoreFile f : filesCompacted) {
1306 inputPaths.add(f.getPath());
1307 }
1308 List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1309 for (StoreFile f : newFiles) {
1310 outputPaths.add(f.getPath());
1311 }
1312 HRegionInfo info = this.region.getRegionInfo();
1313 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1314 family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1315 WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
1316 this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
1317 }
1318
1319 @VisibleForTesting
1320 void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1321 final Collection<StoreFile> result) throws IOException {
1322 this.lock.writeLock().lock();
1323 try {
1324 this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1325 filesCompacting.removeAll(compactedFiles);
1326 } finally {
1327 this.lock.writeLock().unlock();
1328 }
1329 }
1330
1331
1332
1333
1334
1335
1336
1337 private void logCompactionEndMessage(
1338 CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
1339 long now = EnvironmentEdgeManager.currentTime();
1340 StringBuilder message = new StringBuilder(
1341 "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
1342 + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
1343 + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into ");
1344 if (sfs.isEmpty()) {
1345 message.append("none, ");
1346 } else {
1347 for (StoreFile sf: sfs) {
1348 message.append(sf.getPath().getName());
1349 message.append("(size=");
1350 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1351 message.append("), ");
1352 }
1353 }
1354 message.append("total size for store is ")
1355 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize, "", 1))
1356 .append(". This selection was in queue for ")
1357 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1358 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1359 .append(" to execute.");
1360 LOG.info(message.toString());
1361 if (LOG.isTraceEnabled()) {
1362 int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1363 long resultSize = 0;
1364 for (StoreFile sf : sfs) {
1365 resultSize += sf.getReader().length();
1366 }
1367 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1368 + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1369 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
1370 LOG.trace(traceMessage);
1371 }
1372 }
1373
1374
1375
1376
1377
1378
1379
1380 @Override
1381 public void replayCompactionMarker(CompactionDescriptor compaction,
1382 boolean pickCompactionFiles, boolean removeFiles)
1383 throws IOException {
1384 LOG.debug("Completing compaction from the WAL marker");
1385 List<String> compactionInputs = compaction.getCompactionInputList();
1386 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402 String familyName = this.getColumnFamilyName();
1403 List<String> inputFiles = new ArrayList<String>(compactionInputs.size());
1404 for (String compactionInput : compactionInputs) {
1405 Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
1406 inputFiles.add(inputPath.getName());
1407 }
1408
1409
1410 List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1411 for (StoreFile sf : this.getStorefiles()) {
1412 if (inputFiles.contains(sf.getPath().getName())) {
1413 inputStoreFiles.add(sf);
1414 }
1415 }
1416
1417
1418 List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
1419
1420 if (pickCompactionFiles) {
1421 for (StoreFile sf : this.getStorefiles()) {
1422 compactionOutputs.remove(sf.getPath().getName());
1423 }
1424 for (String compactionOutput : compactionOutputs) {
1425 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
1426 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
1427 outputStoreFiles.add(storeFile);
1428 }
1429 }
1430
1431 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1432 LOG.info("Replaying compaction marker, replacing input files: " +
1433 inputStoreFiles + " with output files : " + outputStoreFiles);
1434 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1435 this.completeCompaction(inputStoreFiles, removeFiles);
1436 }
1437 }
1438
1439
1440
1441
1442
1443
1444
1445
1446 public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1447 List<StoreFile> filesToCompact;
1448 boolean isMajor;
1449
1450 this.lock.readLock().lock();
1451 try {
1452 synchronized (filesCompacting) {
1453 filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1454 if (!filesCompacting.isEmpty()) {
1455
1456
1457 StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1458 int idx = filesToCompact.indexOf(last);
1459 Preconditions.checkArgument(idx != -1);
1460 filesToCompact.subList(0, idx + 1).clear();
1461 }
1462 int count = filesToCompact.size();
1463 if (N > count) {
1464 throw new RuntimeException("Not enough files");
1465 }
1466
1467 filesToCompact = filesToCompact.subList(count - N, count);
1468 isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1469 filesCompacting.addAll(filesToCompact);
1470 Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1471 }
1472 } finally {
1473 this.lock.readLock().unlock();
1474 }
1475
1476 try {
1477
1478 List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1479 .compactForTesting(filesToCompact, isMajor);
1480 for (Path newFile: newFiles) {
1481
1482 StoreFile sf = moveFileIntoPlace(newFile);
1483 if (this.getCoprocessorHost() != null) {
1484 this.getCoprocessorHost().postCompact(this, sf, null);
1485 }
1486 replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1487 completeCompaction(filesToCompact, true);
1488 }
1489 } finally {
1490 synchronized (filesCompacting) {
1491 filesCompacting.removeAll(filesToCompact);
1492 }
1493 }
1494 }
1495
1496 @Override
1497 public boolean hasReferences() {
1498 return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1499 }
1500
1501 @Override
1502 public CompactionProgress getCompactionProgress() {
1503 return this.storeEngine.getCompactor().getProgress();
1504 }
1505
1506 @Override
1507 public boolean isMajorCompaction() throws IOException {
1508 for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1509
1510 if (sf.getReader() == null) {
1511 LOG.debug("StoreFile " + sf + " has null Reader");
1512 return false;
1513 }
1514 }
1515 return storeEngine.getCompactionPolicy().isMajorCompaction(
1516 this.storeEngine.getStoreFileManager().getStorefiles());
1517 }
1518
1519 @Override
1520 public CompactionContext requestCompaction() throws IOException {
1521 return requestCompaction(Store.NO_PRIORITY, null);
1522 }
1523
1524 @Override
1525 public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1526 throws IOException {
1527 return requestCompaction(priority, baseRequest, null);
1528 }
1529 @Override
1530 public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
1531 User user) throws IOException {
1532
1533 if (!this.areWritesEnabled()) {
1534 return null;
1535 }
1536
1537
1538 removeUnneededFiles();
1539
1540 final CompactionContext compaction = storeEngine.createCompaction();
1541 CompactionRequest request = null;
1542 this.lock.readLock().lock();
1543 try {
1544 synchronized (filesCompacting) {
1545 final Store thisStore = this;
1546
1547 if (this.getCoprocessorHost() != null) {
1548 final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1549 boolean override = false;
1550 if (user == null) {
1551 override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
1552 baseRequest);
1553 } else {
1554 try {
1555 override = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
1556 @Override
1557 public Boolean run() throws Exception {
1558 return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc,
1559 baseRequest);
1560 }
1561 });
1562 } catch (InterruptedException ie) {
1563 InterruptedIOException iioe = new InterruptedIOException();
1564 iioe.initCause(ie);
1565 throw iioe;
1566 }
1567 }
1568 if (override) {
1569
1570 compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1571 }
1572 }
1573
1574
1575 if (!compaction.hasSelection()) {
1576 boolean isUserCompaction = priority == Store.PRIORITY_USER;
1577 boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1578 offPeakCompactionTracker.compareAndSet(false, true);
1579 try {
1580 compaction.select(this.filesCompacting, isUserCompaction,
1581 mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1582 } catch (IOException e) {
1583 if (mayUseOffPeak) {
1584 offPeakCompactionTracker.set(false);
1585 }
1586 throw e;
1587 }
1588 assert compaction.hasSelection();
1589 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1590
1591 offPeakCompactionTracker.set(false);
1592 }
1593 }
1594 if (this.getCoprocessorHost() != null) {
1595 if (user == null) {
1596 this.getCoprocessorHost().postCompactSelection(
1597 this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
1598 } else {
1599 try {
1600 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1601 @Override
1602 public Void run() throws Exception {
1603 getCoprocessorHost().postCompactSelection(
1604 thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);
1605 return null;
1606 }
1607 });
1608 } catch (InterruptedException ie) {
1609 InterruptedIOException iioe = new InterruptedIOException();
1610 iioe.initCause(ie);
1611 throw iioe;
1612 }
1613 }
1614 }
1615
1616
1617 if (baseRequest != null) {
1618
1619
1620 compaction.forceSelect(
1621 baseRequest.combineWith(compaction.getRequest()));
1622 }
1623
1624 request = compaction.getRequest();
1625 final Collection<StoreFile> selectedFiles = request.getFiles();
1626 if (selectedFiles.isEmpty()) {
1627 return null;
1628 }
1629
1630 addToCompactingFiles(selectedFiles);
1631
1632
1633 this.forceMajor = this.forceMajor && !request.isMajor();
1634
1635
1636
1637 request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1638 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1639 }
1640 } finally {
1641 this.lock.readLock().unlock();
1642 }
1643
1644 LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
1645 + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1646 + (request.isAllFiles() ? " (all files)" : ""));
1647 this.region.reportCompactionRequestStart(request.isMajor());
1648 return compaction;
1649 }
1650
1651
1652 private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
1653 if (filesToAdd == null) return;
1654
1655 if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1656 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1657 }
1658 filesCompacting.addAll(filesToAdd);
1659 Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1660 }
1661
1662 private void removeUnneededFiles() throws IOException {
1663 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
1664 if (getFamily().getMinVersions() > 0) {
1665 LOG.debug("Skipping expired store file removal due to min version being " +
1666 getFamily().getMinVersions());
1667 return;
1668 }
1669 this.lock.readLock().lock();
1670 Collection<StoreFile> delSfs = null;
1671 try {
1672 synchronized (filesCompacting) {
1673 long cfTtl = getStoreFileTtl();
1674 if (cfTtl != Long.MAX_VALUE) {
1675 delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
1676 EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1677 addToCompactingFiles(delSfs);
1678 }
1679 }
1680 } finally {
1681 this.lock.readLock().unlock();
1682 }
1683 if (delSfs == null || delSfs.isEmpty()) return;
1684
1685 Collection<StoreFile> newFiles = new ArrayList<StoreFile>();
1686 writeCompactionWalRecord(delSfs, newFiles);
1687 replaceStoreFiles(delSfs, newFiles);
1688 completeCompaction(delSfs);
1689 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
1690 + this + " of " + this.getRegionInfo().getRegionNameAsString()
1691 + "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1));
1692 }
1693
1694 @Override
1695 public void cancelRequestedCompaction(CompactionContext compaction) {
1696 finishCompactionRequest(compaction.getRequest());
1697 }
1698
1699 private void finishCompactionRequest(CompactionRequest cr) {
1700 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1701 if (cr.isOffPeak()) {
1702 offPeakCompactionTracker.set(false);
1703 cr.setOffPeak(false);
1704 }
1705 synchronized (filesCompacting) {
1706 filesCompacting.removeAll(cr.getFiles());
1707 }
1708 }
1709
1710
1711
1712
1713
1714
1715
1716 private void validateStoreFile(Path path)
1717 throws IOException {
1718 StoreFile storeFile = null;
1719 try {
1720 storeFile = createStoreFileAndReader(path);
1721 } catch (IOException e) {
1722 LOG.error("Failed to open store file : " + path
1723 + ", keeping it in tmp location", e);
1724 throw e;
1725 } finally {
1726 if (storeFile != null) {
1727 storeFile.closeReader(false);
1728 }
1729 }
1730 }
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746 @VisibleForTesting
1747 protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1748 throws IOException {
1749 completeCompaction(compactedFiles, true);
1750 }
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767 @VisibleForTesting
1768 protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles)
1769 throws IOException {
1770 try {
1771
1772
1773
1774
1775 notifyChangedReadersObservers();
1776
1777
1778
1779 LOG.debug("Removing store files after compaction...");
1780 for (StoreFile compactedFile : compactedFiles) {
1781 compactedFile.closeReader(true);
1782 }
1783 if (removeFiles) {
1784 this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
1785 }
1786 } catch (IOException e) {
1787 e = RemoteExceptionHandler.checkIOException(e);
1788 LOG.error("Failed removing compacted files in " + this +
1789 ". Files we were trying to remove are " + compactedFiles.toString() +
1790 "; some of them may have been already removed", e);
1791 }
1792
1793
1794 this.storeSize = 0L;
1795 this.totalUncompressedBytes = 0L;
1796 for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1797 StoreFile.Reader r = hsf.getReader();
1798 if (r == null) {
1799 LOG.warn("StoreFile " + hsf + " has a null Reader");
1800 continue;
1801 }
1802 this.storeSize += r.length();
1803 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1804 }
1805 }
1806
1807
1808
1809
1810
1811 int versionsToReturn(final int wantedVersions) {
1812 if (wantedVersions <= 0) {
1813 throw new IllegalArgumentException("Number of versions must be > 0");
1814 }
1815
1816 int maxVersions = this.family.getMaxVersions();
1817 return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1818 }
1819
1820
1821
1822
1823
1824
1825 static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
1826
1827
1828 if (cell.getTagsLength() > 0) {
1829
1830
1831
1832 Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
1833 cell.getTagsLength());
1834 while (i.hasNext()) {
1835 Tag t = i.next();
1836 if (TagType.TTL_TAG_TYPE == t.getType()) {
1837
1838
1839 long ts = cell.getTimestamp();
1840 assert t.getTagLength() == Bytes.SIZEOF_LONG;
1841 long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
1842 if (ts + ttl < now) {
1843 return true;
1844 }
1845
1846
1847 break;
1848 }
1849 }
1850 }
1851 return false;
1852 }
1853
1854 @Override
1855 public Cell getRowKeyAtOrBefore(final byte[] row) throws IOException {
1856
1857
1858
1859
1860
1861
1862 long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.scanInfo.getTtl();
1863
1864 KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1865
1866 GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1867 this.comparator, kv, ttlToUse, this.getRegionInfo().isMetaRegion());
1868 this.lock.readLock().lock();
1869 try {
1870
1871 this.memstore.getRowKeyAtOrBefore(state);
1872
1873
1874 Iterator<StoreFile> sfIterator = this.storeEngine.getStoreFileManager()
1875 .getCandidateFilesForRowKeyBefore(state.getTargetKey());
1876 while (sfIterator.hasNext()) {
1877 StoreFile sf = sfIterator.next();
1878 sfIterator.remove();
1879 boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
1880 Cell candidate = state.getCandidate();
1881
1882 if (candidate != null && CellUtil.matchingRow(candidate, row)) {
1883 return candidate;
1884 }
1885 if (haveNewCandidate) {
1886 sfIterator = this.storeEngine.getStoreFileManager().updateCandidateFilesForRowKeyBefore(
1887 sfIterator, state.getTargetKey(), candidate);
1888 }
1889 }
1890 return state.getCandidate();
1891 } finally {
1892 this.lock.readLock().unlock();
1893 }
1894 }
1895
1896
1897
1898
1899
1900
1901
1902
1903 private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
1904 final GetClosestRowBeforeTracker state)
1905 throws IOException {
1906 StoreFile.Reader r = f.getReader();
1907 if (r == null) {
1908 LOG.warn("StoreFile " + f + " has a null Reader");
1909 return false;
1910 }
1911 if (r.getEntries() == 0) {
1912 LOG.warn("StoreFile " + f + " is a empty store file");
1913 return false;
1914 }
1915
1916 byte [] fk = r.getFirstKey();
1917 if (fk == null) return false;
1918 KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1919 byte [] lk = r.getLastKey();
1920 KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1921 KeyValue firstOnRow = state.getTargetKey();
1922 if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1923
1924
1925 if (!state.isTargetTable(lastKV)) return false;
1926
1927
1928 firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1929 }
1930
1931 HFileScanner scanner = r.getScanner(true, true, false);
1932
1933 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1934
1935
1936 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1937
1938 while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1939 firstOnRow.getKeyLength())) {
1940 Cell kv = scanner.getKeyValue();
1941 if (!state.isTargetTable(kv)) break;
1942 if (!state.isBetterCandidate(kv)) break;
1943
1944 firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1945
1946 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1947
1948 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1949 }
1950 return false;
1951 }
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961 private boolean seekToScanner(final HFileScanner scanner,
1962 final KeyValue firstOnRow,
1963 final KeyValue firstKV)
1964 throws IOException {
1965 KeyValue kv = firstOnRow;
1966
1967 if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1968 int result = scanner.seekTo(kv);
1969 return result != -1;
1970 }
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982 private boolean walkForwardInSingleRow(final HFileScanner scanner,
1983 final KeyValue firstOnRow,
1984 final GetClosestRowBeforeTracker state)
1985 throws IOException {
1986 boolean foundCandidate = false;
1987 do {
1988 Cell kv = scanner.getKeyValue();
1989
1990 if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
1991
1992 if (state.isTooFar(kv, firstOnRow)) break;
1993 if (state.isExpired(kv)) {
1994 continue;
1995 }
1996
1997 if (state.handle(kv)) {
1998 foundCandidate = true;
1999 break;
2000 }
2001 } while(scanner.next());
2002 return foundCandidate;
2003 }
2004
2005 @Override
2006 public boolean canSplit() {
2007 this.lock.readLock().lock();
2008 try {
2009
2010 boolean result = !hasReferences();
2011 if (!result && LOG.isDebugEnabled()) {
2012 LOG.debug("Cannot split region due to reference files being there");
2013 }
2014 return result;
2015 } finally {
2016 this.lock.readLock().unlock();
2017 }
2018 }
2019
2020 @Override
2021 public byte[] getSplitPoint() {
2022 this.lock.readLock().lock();
2023 try {
2024
2025 assert !this.getRegionInfo().isMetaRegion();
2026
2027 if (hasReferences()) {
2028 return null;
2029 }
2030 return this.storeEngine.getStoreFileManager().getSplitPoint();
2031 } catch(IOException e) {
2032 LOG.warn("Failed getting store size for " + this, e);
2033 } finally {
2034 this.lock.readLock().unlock();
2035 }
2036 return null;
2037 }
2038
2039 @Override
2040 public long getLastCompactSize() {
2041 return this.lastCompactSize;
2042 }
2043
2044 @Override
2045 public long getSize() {
2046 return storeSize;
2047 }
2048
2049 @Override
2050 public void triggerMajorCompaction() {
2051 this.forceMajor = true;
2052 }
2053
2054
2055
2056
2057
2058
2059 @Override
2060 public KeyValueScanner getScanner(Scan scan,
2061 final NavigableSet<byte []> targetCols, long readPt) throws IOException {
2062 lock.readLock().lock();
2063 try {
2064 KeyValueScanner scanner = null;
2065 if (this.getCoprocessorHost() != null) {
2066 scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
2067 }
2068 if (scanner == null) {
2069 scanner = scan.isReversed() ? new ReversedStoreScanner(this,
2070 getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
2071 getScanInfo(), scan, targetCols, readPt);
2072 }
2073 return scanner;
2074 } finally {
2075 lock.readLock().unlock();
2076 }
2077 }
2078
2079 @Override
2080 public String toString() {
2081 return this.getColumnFamilyName();
2082 }
2083
2084 @Override
2085 public int getStorefilesCount() {
2086 return this.storeEngine.getStoreFileManager().getStorefileCount();
2087 }
2088
2089 @Override
2090 public long getStoreSizeUncompressed() {
2091 return this.totalUncompressedBytes;
2092 }
2093
2094 @Override
2095 public long getStorefilesSize() {
2096 long size = 0;
2097 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2098 StoreFile.Reader r = s.getReader();
2099 if (r == null) {
2100 LOG.warn("StoreFile " + s + " has a null Reader");
2101 continue;
2102 }
2103 size += r.length();
2104 }
2105 return size;
2106 }
2107
2108 @Override
2109 public long getStorefilesIndexSize() {
2110 long size = 0;
2111 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2112 StoreFile.Reader r = s.getReader();
2113 if (r == null) {
2114 LOG.warn("StoreFile " + s + " has a null Reader");
2115 continue;
2116 }
2117 size += r.indexSize();
2118 }
2119 return size;
2120 }
2121
2122 @Override
2123 public long getTotalStaticIndexSize() {
2124 long size = 0;
2125 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2126 StoreFile.Reader r = s.getReader();
2127 if (r == null) {
2128 continue;
2129 }
2130 size += r.getUncompressedDataIndexSize();
2131 }
2132 return size;
2133 }
2134
2135 @Override
2136 public long getTotalStaticBloomSize() {
2137 long size = 0;
2138 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2139 StoreFile.Reader r = s.getReader();
2140 if (r == null) {
2141 continue;
2142 }
2143 size += r.getTotalBloomSize();
2144 }
2145 return size;
2146 }
2147
2148 @Override
2149 public long getMemStoreSize() {
2150 return this.memstore.size();
2151 }
2152
2153 @Override
2154 public int getCompactPriority() {
2155 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
2156 if (priority == PRIORITY_USER) {
2157 LOG.warn("Compaction priority is USER despite there being no user compaction");
2158 }
2159 return priority;
2160 }
2161
2162 @Override
2163 public boolean throttleCompaction(long compactionSize) {
2164 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
2165 }
2166
2167 public HRegion getHRegion() {
2168 return this.region;
2169 }
2170
2171 @Override
2172 public RegionCoprocessorHost getCoprocessorHost() {
2173 return this.region.getCoprocessorHost();
2174 }
2175
2176 @Override
2177 public HRegionInfo getRegionInfo() {
2178 return this.fs.getRegionInfo();
2179 }
2180
2181 @Override
2182 public boolean areWritesEnabled() {
2183 return this.region.areWritesEnabled();
2184 }
2185
2186 @Override
2187 public long getSmallestReadPoint() {
2188 return this.region.getSmallestReadPoint();
2189 }
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204 public long updateColumnValue(byte [] row, byte [] f,
2205 byte [] qualifier, long newValue)
2206 throws IOException {
2207
2208 this.lock.readLock().lock();
2209 try {
2210 long now = EnvironmentEdgeManager.currentTime();
2211
2212 return this.memstore.updateColumnValue(row,
2213 f,
2214 qualifier,
2215 newValue,
2216 now);
2217
2218 } finally {
2219 this.lock.readLock().unlock();
2220 }
2221 }
2222
2223 @Override
2224 public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
2225 this.lock.readLock().lock();
2226 try {
2227 return this.memstore.upsert(cells, readpoint);
2228 } finally {
2229 this.lock.readLock().unlock();
2230 }
2231 }
2232
2233 @Override
2234 public StoreFlushContext createFlushContext(long cacheFlushId) {
2235 return new StoreFlusherImpl(cacheFlushId);
2236 }
2237
2238 private class StoreFlusherImpl implements StoreFlushContext {
2239
2240 private long cacheFlushSeqNum;
2241 private MemStoreSnapshot snapshot;
2242 private List<Path> tempFiles;
2243 private List<Path> committedFiles;
2244 private long cacheFlushCount;
2245 private long cacheFlushSize;
2246
2247 private StoreFlusherImpl(long cacheFlushSeqNum) {
2248 this.cacheFlushSeqNum = cacheFlushSeqNum;
2249 }
2250
2251
2252
2253
2254
2255 @Override
2256 public void prepare() {
2257 this.snapshot = memstore.snapshot();
2258 this.cacheFlushCount = snapshot.getCellsCount();
2259 this.cacheFlushSize = snapshot.getSize();
2260 committedFiles = new ArrayList<Path>(1);
2261 }
2262
2263 @Override
2264 public void flushCache(MonitoredTask status) throws IOException {
2265 tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
2266 }
2267
2268 @Override
2269 public boolean commit(MonitoredTask status) throws IOException {
2270 if (this.tempFiles == null || this.tempFiles.isEmpty()) {
2271 return false;
2272 }
2273 List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
2274 for (Path storeFilePath : tempFiles) {
2275 try {
2276 storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status));
2277 } catch (IOException ex) {
2278 LOG.error("Failed to commit store file " + storeFilePath, ex);
2279
2280 for (StoreFile sf : storeFiles) {
2281 Path pathToDelete = sf.getPath();
2282 try {
2283 sf.deleteReader();
2284 } catch (IOException deleteEx) {
2285 LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
2286 Runtime.getRuntime().halt(1);
2287 }
2288 }
2289 throw new IOException("Failed to commit the flush", ex);
2290 }
2291 }
2292
2293 for (StoreFile sf : storeFiles) {
2294 if (HStore.this.getCoprocessorHost() != null) {
2295 HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
2296 }
2297 committedFiles.add(sf.getPath());
2298 }
2299
2300 HStore.this.flushedCellsCount += cacheFlushCount;
2301 HStore.this.flushedCellsSize += cacheFlushSize;
2302
2303
2304 return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
2305 }
2306
2307 @Override
2308 public List<Path> getCommittedFiles() {
2309 return committedFiles;
2310 }
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320 @Override
2321 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
2322 throws IOException {
2323 List<StoreFile> storeFiles = new ArrayList<StoreFile>(fileNames.size());
2324 for (String file : fileNames) {
2325
2326 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
2327 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
2328 storeFiles.add(storeFile);
2329 HStore.this.storeSize += storeFile.getReader().length();
2330 HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
2331 if (LOG.isInfoEnabled()) {
2332 LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
2333 " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
2334 ", sequenceid=" + + storeFile.getReader().getSequenceID() +
2335 ", filesize=" + StringUtils.humanReadableInt(storeFile.getReader().length()));
2336 }
2337 }
2338
2339 long snapshotId = -1;
2340 if (dropMemstoreSnapshot && snapshot != null) {
2341 snapshotId = snapshot.getId();
2342 }
2343 HStore.this.updateStorefiles(storeFiles, snapshotId);
2344 }
2345
2346
2347
2348
2349
2350 @Override
2351 public void abort() throws IOException {
2352 if (snapshot == null) {
2353 return;
2354 }
2355 HStore.this.updateStorefiles(new ArrayList<StoreFile>(0), snapshot.getId());
2356 }
2357 }
2358
2359 @Override
2360 public boolean needsCompaction() {
2361 return this.storeEngine.needsCompaction(this.filesCompacting);
2362 }
2363
2364 @Override
2365 public CacheConfig getCacheConfig() {
2366 return this.cacheConf;
2367 }
2368
2369 public static final long FIXED_OVERHEAD =
2370 ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
2371 + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
2372
2373 public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2374 + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2375 + ClassSize.CONCURRENT_SKIPLISTMAP
2376 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2377 + ScanInfo.FIXED_OVERHEAD);
2378
2379 @Override
2380 public long heapSize() {
2381 return DEEP_OVERHEAD + this.memstore.heapSize();
2382 }
2383
2384 @Override
2385 public KeyValue.KVComparator getComparator() {
2386 return comparator;
2387 }
2388
2389 @Override
2390 public ScanInfo getScanInfo() {
2391 return scanInfo;
2392 }
2393
2394
2395
2396
2397
2398 void setScanInfo(ScanInfo scanInfo) {
2399 this.scanInfo = scanInfo;
2400 }
2401
2402 @Override
2403 public boolean hasTooManyStoreFiles() {
2404 return getStorefilesCount() > this.blockingFileCount;
2405 }
2406
2407 @Override
2408 public long getFlushedCellsCount() {
2409 return flushedCellsCount;
2410 }
2411
2412 @Override
2413 public long getFlushedCellsSize() {
2414 return flushedCellsSize;
2415 }
2416
2417 @Override
2418 public long getCompactedCellsCount() {
2419 return compactedCellsCount;
2420 }
2421
2422 @Override
2423 public long getCompactedCellsSize() {
2424 return compactedCellsSize;
2425 }
2426
2427 @Override
2428 public long getMajorCompactedCellsCount() {
2429 return majorCompactedCellsCount;
2430 }
2431
2432 @Override
2433 public long getMajorCompactedCellsSize() {
2434 return majorCompactedCellsSize;
2435 }
2436
2437
2438
2439
2440
2441 @VisibleForTesting
2442 public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2443 return this.storeEngine;
2444 }
2445
2446 protected OffPeakHours getOffPeakHours() {
2447 return this.offPeakHours;
2448 }
2449
2450
2451
2452
2453 @Override
2454 public void onConfigurationChange(Configuration conf) {
2455 this.conf = new CompoundConfiguration()
2456 .add(conf)
2457 .addWritableMap(family.getValues());
2458 this.storeEngine.compactionPolicy.setConf(conf);
2459 this.offPeakHours = OffPeakHours.getInstance(conf);
2460 }
2461
2462
2463
2464
2465 @Override
2466 public void registerChildren(ConfigurationManager manager) {
2467
2468 }
2469
2470
2471
2472
2473 @Override
2474 public void deregisterChildren(ConfigurationManager manager) {
2475
2476 }
2477
2478 @Override
2479 public double getCompactionPressure() {
2480 return storeEngine.getStoreFileManager().getCompactionPressure();
2481 }
2482 }