1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.InetSocketAddress;
24 import java.security.Key;
25 import java.security.KeyException;
26 import java.security.PrivilegedExceptionAction;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Iterator;
33 import java.util.List;
34 import java.util.NavigableSet;
35 import java.util.Set;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.CompletionService;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.ExecutorCompletionService;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.ThreadPoolExecutor;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.locks.ReentrantReadWriteLock;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.hbase.Cell;
52 import org.apache.hadoop.hbase.CellComparator;
53 import org.apache.hadoop.hbase.CellUtil;
54 import org.apache.hadoop.hbase.CompoundConfiguration;
55 import org.apache.hadoop.hbase.HColumnDescriptor;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HRegionInfo;
58 import org.apache.hadoop.hbase.KeyValue;
59 import org.apache.hadoop.hbase.RemoteExceptionHandler;
60 import org.apache.hadoop.hbase.TableName;
61 import org.apache.hadoop.hbase.Tag;
62 import org.apache.hadoop.hbase.TagType;
63 import org.apache.hadoop.hbase.classification.InterfaceAudience;
64 import org.apache.hadoop.hbase.client.Scan;
65 import org.apache.hadoop.hbase.conf.ConfigurationManager;
66 import org.apache.hadoop.hbase.io.compress.Compression;
67 import org.apache.hadoop.hbase.io.crypto.Cipher;
68 import org.apache.hadoop.hbase.io.crypto.Encryption;
69 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
70 import org.apache.hadoop.hbase.io.hfile.HFile;
71 import org.apache.hadoop.hbase.io.hfile.HFileContext;
72 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
73 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
74 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
75 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
76 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
77 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
78 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
79 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
80 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
81 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
82 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
83 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
84 import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
85 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
86 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
87 import org.apache.hadoop.hbase.security.EncryptionUtil;
88 import org.apache.hadoop.hbase.security.User;
89 import org.apache.hadoop.hbase.util.Bytes;
90 import org.apache.hadoop.hbase.util.ChecksumType;
91 import org.apache.hadoop.hbase.util.ClassSize;
92 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
93 import org.apache.hadoop.hbase.util.ReflectionUtils;
94 import org.apache.hadoop.util.StringUtils;
95 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
96
97 import com.google.common.annotations.VisibleForTesting;
98 import com.google.common.base.Preconditions;
99 import com.google.common.collect.ImmutableCollection;
100 import com.google.common.collect.ImmutableList;
101 import com.google.common.collect.Lists;
102 import com.google.common.collect.Sets;
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 @InterfaceAudience.Private
118 public class HStore implements Store {
119 private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
120 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
121 "hbase.server.compactchecker.interval.multiplier";
122 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
123 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
124 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
125
126 private static final Log LOG = LogFactory.getLog(HStore.class);
127
128 protected final MemStore memstore;
129
130 private final HRegion region;
131 private final HColumnDescriptor family;
132 private final HRegionFileSystem fs;
133 private Configuration conf;
134 private final CacheConfig cacheConf;
135 private long lastCompactSize = 0;
136 volatile boolean forceMajor = false;
137
138 static int closeCheckInterval = 0;
139 private volatile long storeSize = 0L;
140 private volatile long totalUncompressedBytes = 0L;
141
142
143
144
145
146
147
148
149
150
151 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
152 private final boolean verifyBulkLoads;
153
154 private ScanInfo scanInfo;
155
156
157
158 private final List<StoreFile> filesCompacting = Lists.newArrayList();
159
160
161 private final Set<ChangedReadersObserver> changedReaderObservers =
162 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
163
164 private final int blocksize;
165 private HFileDataBlockEncoder dataBlockEncoder;
166
167
168 private ChecksumType checksumType;
169 private int bytesPerChecksum;
170
171
172 private final KeyValue.KVComparator comparator;
173
174 final StoreEngine<?, ?, ?, ?> storeEngine;
175
176 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
177 private volatile OffPeakHours offPeakHours;
178
179 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
180 private int flushRetriesNumber;
181 private int pauseTime;
182
183 private long blockingFileCount;
184 private int compactionCheckMultiplier;
185
186 private Encryption.Context cryptoContext = Encryption.Context.NONE;
187
188 private volatile long flushedCellsCount = 0;
189 private volatile long compactedCellsCount = 0;
190 private volatile long majorCompactedCellsCount = 0;
191 private volatile long flushedCellsSize = 0;
192 private volatile long compactedCellsSize = 0;
193 private volatile long majorCompactedCellsSize = 0;
194
195
196
197
198
199
200
201
202
203 protected HStore(final HRegion region, final HColumnDescriptor family,
204 final Configuration confParam) throws IOException {
205
206 HRegionInfo info = region.getRegionInfo();
207 this.fs = region.getRegionFileSystem();
208
209
210 fs.createStoreDir(family.getNameAsString());
211 this.region = region;
212 this.family = family;
213
214
215
216 this.conf = new CompoundConfiguration()
217 .add(confParam)
218 .addStringMap(region.getTableDesc().getConfiguration())
219 .addStringMap(family.getConfiguration())
220 .addWritableMap(family.getValues());
221 this.blocksize = family.getBlocksize();
222
223 this.dataBlockEncoder =
224 new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
225
226 this.comparator = info.getComparator();
227
228 long timeToPurgeDeletes =
229 Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
230 LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
231 "ms in store " + this);
232
233 long ttl = determineTTLFromFamily(family);
234
235
236 scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
237 String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
238 this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
239 Configuration.class, KeyValue.KVComparator.class }, new Object[] { conf, this.comparator });
240 this.offPeakHours = OffPeakHours.getInstance(conf);
241
242
243 this.cacheConf = new CacheConfig(conf, family);
244
245 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
246
247 this.blockingFileCount =
248 conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
249 this.compactionCheckMultiplier = conf.getInt(
250 COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
251 if (this.compactionCheckMultiplier <= 0) {
252 LOG.error("Compaction check period multiplier must be positive, setting default: "
253 + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
254 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
255 }
256
257 if (HStore.closeCheckInterval == 0) {
258 HStore.closeCheckInterval = conf.getInt(
259 "hbase.hstore.close.check.interval", 10*1000*1000
260 }
261
262 this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
263 this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
264
265
266 this.checksumType = getChecksumType(conf);
267
268 this.bytesPerChecksum = getBytesPerChecksum(conf);
269 flushRetriesNumber = conf.getInt(
270 "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
271 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
272 if (flushRetriesNumber <= 0) {
273 throw new IllegalArgumentException(
274 "hbase.hstore.flush.retries.number must be > 0, not "
275 + flushRetriesNumber);
276 }
277
278
279 String cipherName = family.getEncryptionType();
280 if (cipherName != null) {
281 Cipher cipher;
282 Key key;
283 byte[] keyBytes = family.getEncryptionKey();
284 if (keyBytes != null) {
285
286 String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
287 User.getCurrent().getShortName());
288 try {
289
290 key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
291 } catch (KeyException e) {
292
293
294 if (LOG.isDebugEnabled()) {
295 LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
296 }
297 String alternateKeyName =
298 conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
299 if (alternateKeyName != null) {
300 try {
301 key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
302 } catch (KeyException ex) {
303 throw new IOException(ex);
304 }
305 } else {
306 throw new IOException(e);
307 }
308 }
309
310 cipher = Encryption.getCipher(conf, key.getAlgorithm());
311 if (cipher == null) {
312 throw new RuntimeException("Cipher '" + key.getAlgorithm() + "' is not available");
313 }
314
315
316
317 if (!cipher.getName().equalsIgnoreCase(cipherName)) {
318 throw new RuntimeException("Encryption for family '" + family.getNameAsString() +
319 "' configured with type '" + cipherName +
320 "' but key specifies algorithm '" + cipher.getName() + "'");
321 }
322 } else {
323
324 cipher = Encryption.getCipher(conf, cipherName);
325 if (cipher == null) {
326 throw new RuntimeException("Cipher '" + cipherName + "' is not available");
327 }
328 key = cipher.getRandomKey();
329 }
330 cryptoContext = Encryption.newContext(conf);
331 cryptoContext.setCipher(cipher);
332 cryptoContext.setKey(key);
333 }
334
335 LOG.info("Store=" + getColumnFamilyName() +
336 ", memstore type=" + this.memstore.getClass().getSimpleName() +
337 ", verifyBulkLoads=" + verifyBulkLoads +
338 ", encoding=" + family.getDataBlockEncoding() +
339 ", compression=" + family.getCompressionType());
340 }
341
342
343
344
345
346 private static long determineTTLFromFamily(final HColumnDescriptor family) {
347
348 long ttl = family.getTimeToLive();
349 if (ttl == HConstants.FOREVER) {
350
351 ttl = Long.MAX_VALUE;
352 } else if (ttl == -1) {
353 ttl = Long.MAX_VALUE;
354 } else {
355
356 ttl *= 1000;
357 }
358 return ttl;
359 }
360
361 @Override
362 public String getColumnFamilyName() {
363 return this.family.getNameAsString();
364 }
365
366 @Override
367 public TableName getTableName() {
368 return this.getRegionInfo().getTable();
369 }
370
371 @Override
372 public FileSystem getFileSystem() {
373 return this.fs.getFileSystem();
374 }
375
376 public HRegionFileSystem getRegionFileSystem() {
377 return this.fs;
378 }
379
380
381 @Override
382 public long getStoreFileTtl() {
383
384 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
385 }
386
387 @Override
388 public long getMemstoreFlushSize() {
389
390 return this.region.memstoreFlushSize;
391 }
392
393 @Override
394 public long getFlushableSize() {
395 return this.memstore.getFlushableSize();
396 }
397
398 @Override
399 public long getSnapshotSize() {
400 return this.memstore.getSnapshotSize();
401 }
402
403 @Override
404 public long getCompactionCheckMultiplier() {
405 return this.compactionCheckMultiplier;
406 }
407
408 @Override
409 public long getBlockingFileCount() {
410 return blockingFileCount;
411 }
412
413
414
415
416
417
418
419 public static int getBytesPerChecksum(Configuration conf) {
420 return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
421 HFile.DEFAULT_BYTES_PER_CHECKSUM);
422 }
423
424
425
426
427
428
429 public static ChecksumType getChecksumType(Configuration conf) {
430 String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
431 if (checksumName == null) {
432 return ChecksumType.getDefaultChecksumType();
433 } else {
434 return ChecksumType.nameToType(checksumName);
435 }
436 }
437
438
439
440
441 public static int getCloseCheckInterval() {
442 return closeCheckInterval;
443 }
444
445 @Override
446 public HColumnDescriptor getFamily() {
447 return this.family;
448 }
449
450
451
452
453 @Override
454 public long getMaxSequenceId() {
455 return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
456 }
457
458 @Override
459 public long getMaxMemstoreTS() {
460 return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
461 }
462
463
464
465
466
467
468
469 @Deprecated
470 public static Path getStoreHomedir(final Path tabledir,
471 final HRegionInfo hri, final byte[] family) {
472 return getStoreHomedir(tabledir, hri.getEncodedName(), family);
473 }
474
475
476
477
478
479
480
481 @Deprecated
482 public static Path getStoreHomedir(final Path tabledir,
483 final String encodedName, final byte[] family) {
484 return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
485 }
486
487 @Override
488 public HFileDataBlockEncoder getDataBlockEncoder() {
489 return dataBlockEncoder;
490 }
491
492
493
494
495
496 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
497 this.dataBlockEncoder = blockEncoder;
498 }
499
500
501
502
503
504
505 private List<StoreFile> loadStoreFiles() throws IOException {
506 Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
507 return openStoreFiles(files);
508 }
509
510 private List<StoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
511 if (files == null || files.size() == 0) {
512 return new ArrayList<StoreFile>();
513 }
514
515 ThreadPoolExecutor storeFileOpenerThreadPool =
516 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
517 this.getColumnFamilyName());
518 CompletionService<StoreFile> completionService =
519 new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
520
521 int totalValidStoreFile = 0;
522 for (final StoreFileInfo storeFileInfo: files) {
523
524 completionService.submit(new Callable<StoreFile>() {
525 @Override
526 public StoreFile call() throws IOException {
527 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
528 return storeFile;
529 }
530 });
531 totalValidStoreFile++;
532 }
533
534 ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
535 IOException ioe = null;
536 try {
537 for (int i = 0; i < totalValidStoreFile; i++) {
538 try {
539 Future<StoreFile> future = completionService.take();
540 StoreFile storeFile = future.get();
541 long length = storeFile.getReader().length();
542 this.storeSize += length;
543 this.totalUncompressedBytes +=
544 storeFile.getReader().getTotalUncompressedBytes();
545 if (LOG.isDebugEnabled()) {
546 LOG.debug("loaded " + storeFile.toStringDetailed());
547 }
548 results.add(storeFile);
549 } catch (InterruptedException e) {
550 if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
551 } catch (ExecutionException e) {
552 if (ioe == null) ioe = new IOException(e.getCause());
553 }
554 }
555 } finally {
556 storeFileOpenerThreadPool.shutdownNow();
557 }
558 if (ioe != null) {
559
560 boolean evictOnClose =
561 cacheConf != null? cacheConf.shouldEvictOnClose(): true;
562 for (StoreFile file : results) {
563 try {
564 if (file != null) file.closeReader(evictOnClose);
565 } catch (IOException e) {
566 LOG.warn(e.getMessage());
567 }
568 }
569 throw ioe;
570 }
571
572 return results;
573 }
574
575
576
577
578
579
580
581
582 @Override
583 public void refreshStoreFiles() throws IOException {
584 Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
585 refreshStoreFilesInternal(newFiles);
586 }
587
588 @Override
589 public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
590 List<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(newFiles.size());
591 for (String file : newFiles) {
592 storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file));
593 }
594 refreshStoreFilesInternal(storeFiles);
595 }
596
597
598
599
600
601
602
603
604 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
605 StoreFileManager sfm = storeEngine.getStoreFileManager();
606 Collection<StoreFile> currentFiles = sfm.getStorefiles();
607 if (currentFiles == null) currentFiles = new ArrayList<StoreFile>(0);
608
609 if (newFiles == null) newFiles = new ArrayList<StoreFileInfo>(0);
610
611 HashMap<StoreFileInfo, StoreFile> currentFilesSet =
612 new HashMap<StoreFileInfo, StoreFile>(currentFiles.size());
613 for (StoreFile sf : currentFiles) {
614 currentFilesSet.put(sf.getFileInfo(), sf);
615 }
616 HashSet<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
617
618 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
619 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
620
621 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
622 return;
623 }
624
625 LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString()
626 + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
627
628 Set<StoreFile> toBeRemovedStoreFiles = new HashSet<StoreFile>(toBeRemovedFiles.size());
629 for (StoreFileInfo sfi : toBeRemovedFiles) {
630 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
631 }
632
633
634 List<StoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
635
636
637 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles);
638
639
640
641
642 if (!toBeAddedFiles.isEmpty()) {
643 region.getMVCC().advanceTo(this.getMaxSequenceId());
644 }
645
646
647 completeCompaction(toBeRemovedStoreFiles, false);
648 }
649
650 private StoreFile createStoreFileAndReader(final Path p) throws IOException {
651 StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
652 return createStoreFileAndReader(info);
653 }
654
655 private StoreFile createStoreFileAndReader(final StoreFileInfo info)
656 throws IOException {
657 info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
658 StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
659 this.family.getBloomFilterType());
660 StoreFile.Reader r = storeFile.createReader();
661 r.setReplicaStoreFile(isPrimaryReplicaStore());
662 return storeFile;
663 }
664
665 @Override
666 public long add(final Cell cell) {
667 lock.readLock().lock();
668 try {
669 return this.memstore.add(cell);
670 } finally {
671 lock.readLock().unlock();
672 }
673 }
674
675 @Override
676 public long timeOfOldestEdit() {
677 return memstore.timeOfOldestEdit();
678 }
679
680
681
682
683
684
685
686 protected long delete(final KeyValue kv) {
687 lock.readLock().lock();
688 try {
689 return this.memstore.delete(kv);
690 } finally {
691 lock.readLock().unlock();
692 }
693 }
694
695 @Override
696 public void rollback(final Cell cell) {
697 lock.readLock().lock();
698 try {
699 this.memstore.rollback(cell);
700 } finally {
701 lock.readLock().unlock();
702 }
703 }
704
705
706
707
708 @Override
709 public Collection<StoreFile> getStorefiles() {
710 return this.storeEngine.getStoreFileManager().getStorefiles();
711 }
712
713 @Override
714 public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
715 HFile.Reader reader = null;
716 try {
717 LOG.info("Validating hfile at " + srcPath + " for inclusion in "
718 + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
719 reader = HFile.createReader(srcPath.getFileSystem(conf),
720 srcPath, cacheConf, conf);
721 reader.loadFileInfo();
722
723 byte[] firstKey = reader.getFirstRowKey();
724 Preconditions.checkState(firstKey != null, "First key can not be null");
725 byte[] lk = reader.getLastKey();
726 Preconditions.checkState(lk != null, "Last key can not be null");
727 byte[] lastKey = KeyValue.createKeyValueFromKey(lk).getRow();
728
729 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
730 " last=" + Bytes.toStringBinary(lastKey));
731 LOG.debug("Region bounds: first=" +
732 Bytes.toStringBinary(getRegionInfo().getStartKey()) +
733 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
734
735 if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
736 throw new WrongRegionException(
737 "Bulk load file " + srcPath.toString() + " does not fit inside region "
738 + this.getRegionInfo().getRegionNameAsString());
739 }
740
741 if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
742 HConstants.DEFAULT_MAX_FILE_SIZE)) {
743 LOG.warn("Trying to bulk load hfile " + srcPath.toString() + " with size: " +
744 reader.length() + " bytes can be problematic as it may lead to oversplitting.");
745 }
746
747 if (verifyBulkLoads) {
748 long verificationStartTime = EnvironmentEdgeManager.currentTime();
749 LOG.info("Full verification started for bulk load hfile: " + srcPath.toString());
750 Cell prevCell = null;
751 HFileScanner scanner = reader.getScanner(false, false, false);
752 scanner.seekTo();
753 do {
754 Cell cell = scanner.getKeyValue();
755 if (prevCell != null) {
756 if (CellComparator.compareRows(prevCell, cell) > 0) {
757 throw new InvalidHFileException("Previous row is greater than"
758 + " current row: path=" + srcPath + " previous="
759 + CellUtil.getCellKeyAsString(prevCell) + " current="
760 + CellUtil.getCellKeyAsString(cell));
761 }
762 if (CellComparator.compareFamilies(prevCell, cell) != 0) {
763 throw new InvalidHFileException("Previous key had different"
764 + " family compared to current key: path=" + srcPath
765 + " previous="
766 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
767 prevCell.getFamilyLength())
768 + " current="
769 + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
770 cell.getFamilyLength()));
771 }
772 }
773 prevCell = cell;
774 } while (scanner.next());
775 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString()
776 + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime)
777 + " ms");
778 }
779 } finally {
780 if (reader != null) reader.close();
781 }
782 }
783
784 @Override
785 public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
786 Path srcPath = new Path(srcPathStr);
787 Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
788
789 LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
790 + dstPath + " - updating store file list.");
791
792 StoreFile sf = createStoreFileAndReader(dstPath);
793 bulkLoadHFile(sf);
794
795 LOG.info("Successfully loaded store file " + srcPath + " into store " + this
796 + " (new location: " + dstPath + ")");
797
798 return dstPath;
799 }
800
801 @Override
802 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
803 StoreFile sf = createStoreFileAndReader(fileInfo);
804 bulkLoadHFile(sf);
805 }
806
807 private void bulkLoadHFile(StoreFile sf) throws IOException {
808 StoreFile.Reader r = sf.getReader();
809 this.storeSize += r.length();
810 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
811
812
813 this.lock.writeLock().lock();
814 try {
815 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
816 } finally {
817
818
819
820
821
822 this.lock.writeLock().unlock();
823 }
824 notifyChangedReadersObservers();
825 LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
826 if (LOG.isTraceEnabled()) {
827 String traceMessage = "BULK LOAD time,size,store size,store files ["
828 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
829 + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
830 LOG.trace(traceMessage);
831 }
832 }
833
834 @Override
835 public ImmutableCollection<StoreFile> close() throws IOException {
836 this.lock.writeLock().lock();
837 try {
838
839 ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
840
841 if (!result.isEmpty()) {
842
843 ThreadPoolExecutor storeFileCloserThreadPool = this.region
844 .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
845 + this.getColumnFamilyName());
846
847
848 CompletionService<Void> completionService =
849 new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
850 for (final StoreFile f : result) {
851 completionService.submit(new Callable<Void>() {
852 @Override
853 public Void call() throws IOException {
854 boolean evictOnClose =
855 cacheConf != null? cacheConf.shouldEvictOnClose(): true;
856 f.closeReader(evictOnClose);
857 return null;
858 }
859 });
860 }
861
862 IOException ioe = null;
863 try {
864 for (int i = 0; i < result.size(); i++) {
865 try {
866 Future<Void> future = completionService.take();
867 future.get();
868 } catch (InterruptedException e) {
869 if (ioe == null) {
870 ioe = new InterruptedIOException();
871 ioe.initCause(e);
872 }
873 } catch (ExecutionException e) {
874 if (ioe == null) ioe = new IOException(e.getCause());
875 }
876 }
877 } finally {
878 storeFileCloserThreadPool.shutdownNow();
879 }
880 if (ioe != null) throw ioe;
881 }
882 LOG.info("Closed " + this);
883 return result;
884 } finally {
885 this.lock.writeLock().unlock();
886 }
887 }
888
889
890
891
892
893
894 void snapshot() {
895 this.lock.writeLock().lock();
896 try {
897 this.memstore.snapshot();
898 } finally {
899 this.lock.writeLock().unlock();
900 }
901 }
902
903
904
905
906
907
908
909
910
911 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
912 MonitoredTask status) throws IOException {
913
914
915
916
917
918 StoreFlusher flusher = storeEngine.getStoreFlusher();
919 IOException lastException = null;
920 for (int i = 0; i < flushRetriesNumber; i++) {
921 try {
922 List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
923 Path lastPathName = null;
924 try {
925 for (Path pathName : pathNames) {
926 lastPathName = pathName;
927 validateStoreFile(pathName);
928 }
929 return pathNames;
930 } catch (Exception e) {
931 LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
932 if (e instanceof IOException) {
933 lastException = (IOException) e;
934 } else {
935 lastException = new IOException(e);
936 }
937 }
938 } catch (IOException e) {
939 LOG.warn("Failed flushing store file, retrying num=" + i, e);
940 lastException = e;
941 }
942 if (lastException != null && i < (flushRetriesNumber - 1)) {
943 try {
944 Thread.sleep(pauseTime);
945 } catch (InterruptedException e) {
946 IOException iie = new InterruptedIOException();
947 iie.initCause(e);
948 throw iie;
949 }
950 }
951 }
952 throw lastException;
953 }
954
955
956
957
958
959
960
961
962 private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
963 throws IOException {
964
965 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
966
967 status.setStatus("Flushing " + this + ": reopening flushed file");
968 StoreFile sf = createStoreFileAndReader(dstPath);
969
970 StoreFile.Reader r = sf.getReader();
971 this.storeSize += r.length();
972 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
973
974 if (LOG.isInfoEnabled()) {
975 LOG.info("Added " + sf + ", entries=" + r.getEntries() +
976 ", sequenceid=" + logCacheFlushId +
977 ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
978 }
979 return sf;
980 }
981
982 @Override
983 public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
984 boolean isCompaction, boolean includeMVCCReadpoint,
985 boolean includesTag)
986 throws IOException {
987 return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
988 includesTag, false);
989 }
990
991
992
993
994
995
996
997
998
999 @Override
1000 public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
1001 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
1002 boolean shouldDropBehind)
1003 throws IOException {
1004 final CacheConfig writerCacheConf;
1005 if (isCompaction) {
1006
1007 writerCacheConf = new CacheConfig(cacheConf);
1008 writerCacheConf.setCacheDataOnWrite(false);
1009 } else {
1010 writerCacheConf = cacheConf;
1011 }
1012 InetSocketAddress[] favoredNodes = null;
1013 if (region.getRegionServerServices() != null) {
1014 favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
1015 region.getRegionInfo().getEncodedName());
1016 }
1017 HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
1018 cryptoContext);
1019 StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
1020 this.getFileSystem())
1021 .withFilePath(fs.createTempName())
1022 .withComparator(comparator)
1023 .withBloomType(family.getBloomFilterType())
1024 .withMaxKeyCount(maxKeyCount)
1025 .withFavoredNodes(favoredNodes)
1026 .withFileContext(hFileContext)
1027 .withShouldDropCacheBehind(shouldDropBehind)
1028 .build();
1029 return w;
1030 }
1031
1032 private HFileContext createFileContext(Compression.Algorithm compression,
1033 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
1034 if (compression == null) {
1035 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
1036 }
1037 HFileContext hFileContext = new HFileContextBuilder()
1038 .withIncludesMvcc(includeMVCCReadpoint)
1039 .withIncludesTags(includesTag)
1040 .withCompression(compression)
1041 .withCompressTags(family.isCompressTags())
1042 .withChecksumType(checksumType)
1043 .withBytesPerCheckSum(bytesPerChecksum)
1044 .withBlockSize(blocksize)
1045 .withHBaseCheckSum(true)
1046 .withDataBlockEncoding(family.getDataBlockEncoding())
1047 .withEncryptionContext(cryptoContext)
1048 .withCreateTime(EnvironmentEdgeManager.currentTime())
1049 .build();
1050 return hFileContext;
1051 }
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061 private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
1062 throws IOException {
1063 this.lock.writeLock().lock();
1064 try {
1065 this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
1066 if (snapshotId > 0) {
1067 this.memstore.clearSnapshot(snapshotId);
1068 }
1069 } finally {
1070
1071
1072
1073
1074
1075 this.lock.writeLock().unlock();
1076 }
1077
1078
1079 notifyChangedReadersObservers();
1080
1081 if (LOG.isTraceEnabled()) {
1082 long totalSize = 0;
1083 for (StoreFile sf : sfs) {
1084 totalSize += sf.getReader().length();
1085 }
1086 String traceMessage = "FLUSH time,count,size,store size,store files ["
1087 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
1088 + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
1089 LOG.trace(traceMessage);
1090 }
1091 return needsCompaction();
1092 }
1093
1094
1095
1096
1097
1098 private void notifyChangedReadersObservers() throws IOException {
1099 for (ChangedReadersObserver o: this.changedReaderObservers) {
1100 o.updateReaders();
1101 }
1102 }
1103
1104
1105
1106
1107
1108
1109 @Override
1110 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
1111 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1112 byte[] stopRow, long readPt) throws IOException {
1113 Collection<StoreFile> storeFilesToScan;
1114 List<KeyValueScanner> memStoreScanners;
1115 this.lock.readLock().lock();
1116 try {
1117 storeFilesToScan =
1118 this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
1119 memStoreScanners = this.memstore.getScanners(readPt);
1120 } finally {
1121 this.lock.readLock().unlock();
1122 }
1123
1124
1125
1126
1127
1128
1129 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
1130 cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
1131 List<KeyValueScanner> scanners =
1132 new ArrayList<KeyValueScanner>(sfScanners.size()+1);
1133 scanners.addAll(sfScanners);
1134
1135 if (memStoreScanners != null) {
1136 scanners.addAll(memStoreScanners);
1137 }
1138 return scanners;
1139 }
1140
1141 @Override
1142 public void addChangedReaderObserver(ChangedReadersObserver o) {
1143 this.changedReaderObservers.add(o);
1144 }
1145
1146 @Override
1147 public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1148
1149 this.changedReaderObservers.remove(o);
1150 }
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199 @Override
1200 public List<StoreFile> compact(CompactionContext compaction,
1201 CompactionThroughputController throughputController) throws IOException {
1202 return compact(compaction, throughputController, null);
1203 }
1204
1205 @Override
1206 public List<StoreFile> compact(CompactionContext compaction,
1207 CompactionThroughputController throughputController, User user) throws IOException {
1208 assert compaction != null;
1209 List<StoreFile> sfs = null;
1210 CompactionRequest cr = compaction.getRequest();
1211 try {
1212
1213
1214
1215 long compactionStartTime = EnvironmentEdgeManager.currentTime();
1216 assert compaction.hasSelection();
1217 Collection<StoreFile> filesToCompact = cr.getFiles();
1218 assert !filesToCompact.isEmpty();
1219 synchronized (filesCompacting) {
1220
1221
1222 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1223 }
1224
1225
1226 LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1227 + this + " of " + this.getRegionInfo().getRegionNameAsString()
1228 + " into tmpdir=" + fs.getTempDir() + ", totalSize="
1229 + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1230
1231
1232 List<Path> newFiles = compaction.compact(throughputController, user);
1233
1234
1235 if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1236 LOG.warn("hbase.hstore.compaction.complete is set to false");
1237 sfs = new ArrayList<StoreFile>(newFiles.size());
1238 final boolean evictOnClose =
1239 cacheConf != null? cacheConf.shouldEvictOnClose(): true;
1240 for (Path newFile : newFiles) {
1241
1242 StoreFile sf = createStoreFileAndReader(newFile);
1243 sf.closeReader(evictOnClose);
1244 sfs.add(sf);
1245 }
1246 return sfs;
1247 }
1248
1249 sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
1250 writeCompactionWalRecord(filesToCompact, sfs);
1251 replaceStoreFiles(filesToCompact, sfs);
1252 if (cr.isMajor()) {
1253 majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
1254 majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
1255 } else {
1256 compactedCellsCount += getCompactionProgress().totalCompactingKVs;
1257 compactedCellsSize += getCompactionProgress().totalCompactedSize;
1258 }
1259
1260 completeCompaction(filesToCompact, true);
1261
1262 logCompactionEndMessage(cr, sfs, compactionStartTime);
1263 return sfs;
1264 } finally {
1265 finishCompactionRequest(cr);
1266 }
1267 }
1268
1269 private List<StoreFile> moveCompatedFilesIntoPlace(
1270 final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
1271 List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
1272 for (Path newFile : newFiles) {
1273 assert newFile != null;
1274 final StoreFile sf = moveFileIntoPlace(newFile);
1275 if (this.getCoprocessorHost() != null) {
1276 final Store thisStore = this;
1277 if (user == null) {
1278 getCoprocessorHost().postCompact(thisStore, sf, cr);
1279 } else {
1280 try {
1281 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1282 @Override
1283 public Void run() throws Exception {
1284 getCoprocessorHost().postCompact(thisStore, sf, cr);
1285 return null;
1286 }
1287 });
1288 } catch (InterruptedException ie) {
1289 InterruptedIOException iioe = new InterruptedIOException();
1290 iioe.initCause(ie);
1291 throw iioe;
1292 }
1293 }
1294 }
1295 assert sf != null;
1296 sfs.add(sf);
1297 }
1298 return sfs;
1299 }
1300
1301
1302 StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1303 validateStoreFile(newFile);
1304
1305 Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1306 return createStoreFileAndReader(destPath);
1307 }
1308
1309
1310
1311
1312
1313
1314 private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1315 Collection<StoreFile> newFiles) throws IOException {
1316 if (region.getWAL() == null) return;
1317 List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
1318 for (StoreFile f : filesCompacted) {
1319 inputPaths.add(f.getPath());
1320 }
1321 List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1322 for (StoreFile f : newFiles) {
1323 outputPaths.add(f.getPath());
1324 }
1325 HRegionInfo info = this.region.getRegionInfo();
1326 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1327 family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1328 WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
1329 this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
1330 }
1331
1332 @VisibleForTesting
1333 void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1334 final Collection<StoreFile> result) throws IOException {
1335 this.lock.writeLock().lock();
1336 try {
1337 this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1338 synchronized (filesCompacting) {
1339 filesCompacting.removeAll(compactedFiles);
1340 }
1341 } finally {
1342 this.lock.writeLock().unlock();
1343 }
1344 }
1345
1346
1347
1348
1349
1350
1351
1352 private void logCompactionEndMessage(
1353 CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
1354 long now = EnvironmentEdgeManager.currentTime();
1355 StringBuilder message = new StringBuilder(
1356 "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
1357 + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
1358 + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into ");
1359 if (sfs.isEmpty()) {
1360 message.append("none, ");
1361 } else {
1362 for (StoreFile sf: sfs) {
1363 message.append(sf.getPath().getName());
1364 message.append("(size=");
1365 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1366 message.append("), ");
1367 }
1368 }
1369 message.append("total size for store is ")
1370 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize, "", 1))
1371 .append(". This selection was in queue for ")
1372 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1373 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1374 .append(" to execute.");
1375 LOG.info(message.toString());
1376 if (LOG.isTraceEnabled()) {
1377 int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1378 long resultSize = 0;
1379 for (StoreFile sf : sfs) {
1380 resultSize += sf.getReader().length();
1381 }
1382 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1383 + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1384 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
1385 LOG.trace(traceMessage);
1386 }
1387 }
1388
1389
1390
1391
1392
1393
1394
1395 @Override
1396 public void replayCompactionMarker(CompactionDescriptor compaction,
1397 boolean pickCompactionFiles, boolean removeFiles)
1398 throws IOException {
1399 LOG.debug("Completing compaction from the WAL marker");
1400 List<String> compactionInputs = compaction.getCompactionInputList();
1401 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417 String familyName = this.getColumnFamilyName();
1418 List<String> inputFiles = new ArrayList<String>(compactionInputs.size());
1419 for (String compactionInput : compactionInputs) {
1420 Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
1421 inputFiles.add(inputPath.getName());
1422 }
1423
1424
1425 List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1426 for (StoreFile sf : this.getStorefiles()) {
1427 if (inputFiles.contains(sf.getPath().getName())) {
1428 inputStoreFiles.add(sf);
1429 }
1430 }
1431
1432
1433 List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
1434
1435 if (pickCompactionFiles) {
1436 for (StoreFile sf : this.getStorefiles()) {
1437 compactionOutputs.remove(sf.getPath().getName());
1438 }
1439 for (String compactionOutput : compactionOutputs) {
1440 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
1441 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
1442 outputStoreFiles.add(storeFile);
1443 }
1444 }
1445
1446 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1447 LOG.info("Replaying compaction marker, replacing input files: " +
1448 inputStoreFiles + " with output files : " + outputStoreFiles);
1449 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1450 this.completeCompaction(inputStoreFiles, removeFiles);
1451 }
1452 }
1453
1454
1455
1456
1457
1458
1459
1460
1461 public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1462 List<StoreFile> filesToCompact;
1463 boolean isMajor;
1464
1465 this.lock.readLock().lock();
1466 try {
1467 synchronized (filesCompacting) {
1468 filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1469 if (!filesCompacting.isEmpty()) {
1470
1471
1472 StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1473 int idx = filesToCompact.indexOf(last);
1474 Preconditions.checkArgument(idx != -1);
1475 filesToCompact.subList(0, idx + 1).clear();
1476 }
1477 int count = filesToCompact.size();
1478 if (N > count) {
1479 throw new RuntimeException("Not enough files");
1480 }
1481
1482 filesToCompact = filesToCompact.subList(count - N, count);
1483 isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1484 filesCompacting.addAll(filesToCompact);
1485 Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1486 }
1487 } finally {
1488 this.lock.readLock().unlock();
1489 }
1490
1491 try {
1492
1493 List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1494 .compactForTesting(filesToCompact, isMajor);
1495 for (Path newFile: newFiles) {
1496
1497 StoreFile sf = moveFileIntoPlace(newFile);
1498 if (this.getCoprocessorHost() != null) {
1499 this.getCoprocessorHost().postCompact(this, sf, null);
1500 }
1501 replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1502 completeCompaction(filesToCompact, true);
1503 }
1504 } finally {
1505 synchronized (filesCompacting) {
1506 filesCompacting.removeAll(filesToCompact);
1507 }
1508 }
1509 }
1510
1511 @Override
1512 public boolean hasReferences() {
1513 return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1514 }
1515
1516 @Override
1517 public CompactionProgress getCompactionProgress() {
1518 return this.storeEngine.getCompactor().getProgress();
1519 }
1520
1521 @Override
1522 public boolean isMajorCompaction() throws IOException {
1523 for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1524
1525 if (sf.getReader() == null) {
1526 LOG.debug("StoreFile " + sf + " has null Reader");
1527 return false;
1528 }
1529 }
1530 return storeEngine.getCompactionPolicy().isMajorCompaction(
1531 this.storeEngine.getStoreFileManager().getStorefiles());
1532 }
1533
1534 @Override
1535 public CompactionContext requestCompaction() throws IOException {
1536 return requestCompaction(Store.NO_PRIORITY, null);
1537 }
1538
1539 @Override
1540 public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1541 throws IOException {
1542 return requestCompaction(priority, baseRequest, null);
1543 }
1544 @Override
1545 public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
1546 User user) throws IOException {
1547
1548 if (!this.areWritesEnabled()) {
1549 return null;
1550 }
1551
1552
1553 removeUnneededFiles();
1554
1555 final CompactionContext compaction = storeEngine.createCompaction();
1556 CompactionRequest request = null;
1557 this.lock.readLock().lock();
1558 try {
1559 synchronized (filesCompacting) {
1560 final Store thisStore = this;
1561
1562 if (this.getCoprocessorHost() != null) {
1563 final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1564 boolean override = false;
1565 if (user == null) {
1566 override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
1567 baseRequest);
1568 } else {
1569 try {
1570 override = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
1571 @Override
1572 public Boolean run() throws Exception {
1573 return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc,
1574 baseRequest);
1575 }
1576 });
1577 } catch (InterruptedException ie) {
1578 InterruptedIOException iioe = new InterruptedIOException();
1579 iioe.initCause(ie);
1580 throw iioe;
1581 }
1582 }
1583 if (override) {
1584
1585 compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1586 }
1587 }
1588
1589
1590 if (!compaction.hasSelection()) {
1591 boolean isUserCompaction = priority == Store.PRIORITY_USER;
1592 boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1593 offPeakCompactionTracker.compareAndSet(false, true);
1594 try {
1595 compaction.select(this.filesCompacting, isUserCompaction,
1596 mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1597 } catch (IOException e) {
1598 if (mayUseOffPeak) {
1599 offPeakCompactionTracker.set(false);
1600 }
1601 throw e;
1602 }
1603 assert compaction.hasSelection();
1604 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1605
1606 offPeakCompactionTracker.set(false);
1607 }
1608 }
1609 if (this.getCoprocessorHost() != null) {
1610 if (user == null) {
1611 this.getCoprocessorHost().postCompactSelection(
1612 this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
1613 } else {
1614 try {
1615 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1616 @Override
1617 public Void run() throws Exception {
1618 getCoprocessorHost().postCompactSelection(
1619 thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);
1620 return null;
1621 }
1622 });
1623 } catch (InterruptedException ie) {
1624 InterruptedIOException iioe = new InterruptedIOException();
1625 iioe.initCause(ie);
1626 throw iioe;
1627 }
1628 }
1629 }
1630
1631
1632 if (baseRequest != null) {
1633
1634
1635 compaction.forceSelect(
1636 baseRequest.combineWith(compaction.getRequest()));
1637 }
1638
1639 request = compaction.getRequest();
1640 final Collection<StoreFile> selectedFiles = request.getFiles();
1641 if (selectedFiles.isEmpty()) {
1642 return null;
1643 }
1644
1645 addToCompactingFiles(selectedFiles);
1646
1647
1648 this.forceMajor = this.forceMajor && !request.isMajor();
1649
1650
1651
1652 request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1653 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1654 }
1655 } finally {
1656 this.lock.readLock().unlock();
1657 }
1658
1659 LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
1660 + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1661 + (request.isAllFiles() ? " (all files)" : ""));
1662 this.region.reportCompactionRequestStart(request.isMajor());
1663 return compaction;
1664 }
1665
1666
1667 private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
1668 if (filesToAdd == null) return;
1669
1670 if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1671 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1672 }
1673 filesCompacting.addAll(filesToAdd);
1674 Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1675 }
1676
1677 private void removeUnneededFiles() throws IOException {
1678 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
1679 if (getFamily().getMinVersions() > 0) {
1680 LOG.debug("Skipping expired store file removal due to min version being " +
1681 getFamily().getMinVersions());
1682 return;
1683 }
1684 this.lock.readLock().lock();
1685 Collection<StoreFile> delSfs = null;
1686 try {
1687 synchronized (filesCompacting) {
1688 long cfTtl = getStoreFileTtl();
1689 if (cfTtl != Long.MAX_VALUE) {
1690 delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
1691 EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1692 addToCompactingFiles(delSfs);
1693 }
1694 }
1695 } finally {
1696 this.lock.readLock().unlock();
1697 }
1698 if (delSfs == null || delSfs.isEmpty()) return;
1699
1700 Collection<StoreFile> newFiles = new ArrayList<StoreFile>();
1701 writeCompactionWalRecord(delSfs, newFiles);
1702 replaceStoreFiles(delSfs, newFiles);
1703 completeCompaction(delSfs);
1704 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
1705 + this + " of " + this.getRegionInfo().getRegionNameAsString()
1706 + "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1));
1707 }
1708
1709 @Override
1710 public void cancelRequestedCompaction(CompactionContext compaction) {
1711 finishCompactionRequest(compaction.getRequest());
1712 }
1713
1714 private void finishCompactionRequest(CompactionRequest cr) {
1715 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1716 if (cr.isOffPeak()) {
1717 offPeakCompactionTracker.set(false);
1718 cr.setOffPeak(false);
1719 }
1720 synchronized (filesCompacting) {
1721 filesCompacting.removeAll(cr.getFiles());
1722 }
1723 }
1724
1725
1726
1727
1728
1729
1730
1731 private void validateStoreFile(Path path)
1732 throws IOException {
1733 StoreFile storeFile = null;
1734 try {
1735 storeFile = createStoreFileAndReader(path);
1736 } catch (IOException e) {
1737 LOG.error("Failed to open store file : " + path
1738 + ", keeping it in tmp location", e);
1739 throw e;
1740 } finally {
1741 if (storeFile != null) {
1742 storeFile.closeReader(false);
1743 }
1744 }
1745 }
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761 @VisibleForTesting
1762 protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1763 throws IOException {
1764 completeCompaction(compactedFiles, true);
1765 }
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782 @VisibleForTesting
1783 protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles)
1784 throws IOException {
1785 try {
1786
1787
1788
1789
1790 notifyChangedReadersObservers();
1791
1792
1793
1794 LOG.debug("Removing store files after compaction...");
1795 boolean evictOnClose =
1796 cacheConf != null? cacheConf.shouldEvictOnClose(): true;
1797 for (StoreFile compactedFile : compactedFiles) {
1798 compactedFile.closeReader(evictOnClose);
1799 }
1800 if (removeFiles) {
1801 this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
1802 }
1803 } catch (IOException e) {
1804 e = RemoteExceptionHandler.checkIOException(e);
1805 LOG.error("Failed removing compacted files in " + this +
1806 ". Files we were trying to remove are " + compactedFiles.toString() +
1807 "; some of them may have been already removed", e);
1808 }
1809
1810
1811 this.storeSize = 0L;
1812 this.totalUncompressedBytes = 0L;
1813 for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1814 StoreFile.Reader r = hsf.getReader();
1815 if (r == null) {
1816 LOG.warn("StoreFile " + hsf + " has a null Reader");
1817 continue;
1818 }
1819 this.storeSize += r.length();
1820 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1821 }
1822 }
1823
1824
1825
1826
1827
1828 int versionsToReturn(final int wantedVersions) {
1829 if (wantedVersions <= 0) {
1830 throw new IllegalArgumentException("Number of versions must be > 0");
1831 }
1832
1833 int maxVersions = this.family.getMaxVersions();
1834 return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1835 }
1836
1837
1838
1839
1840
1841
1842 static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
1843
1844
1845 if (cell.getTagsLength() > 0) {
1846
1847
1848
1849 Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
1850 cell.getTagsLength());
1851 while (i.hasNext()) {
1852 Tag t = i.next();
1853 if (TagType.TTL_TAG_TYPE == t.getType()) {
1854
1855
1856 long ts = cell.getTimestamp();
1857 assert t.getTagLength() == Bytes.SIZEOF_LONG;
1858 long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
1859 if (ts + ttl < now) {
1860 return true;
1861 }
1862
1863
1864 break;
1865 }
1866 }
1867 }
1868 return false;
1869 }
1870
1871 @Override
1872 public Cell getRowKeyAtOrBefore(final byte[] row) throws IOException {
1873
1874
1875
1876
1877
1878
1879 long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.scanInfo.getTtl();
1880
1881 KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1882
1883 GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1884 this.comparator, kv, ttlToUse, this.getRegionInfo().isMetaRegion());
1885 this.lock.readLock().lock();
1886 try {
1887
1888 this.memstore.getRowKeyAtOrBefore(state);
1889
1890
1891 Iterator<StoreFile> sfIterator = this.storeEngine.getStoreFileManager()
1892 .getCandidateFilesForRowKeyBefore(state.getTargetKey());
1893 while (sfIterator.hasNext()) {
1894 StoreFile sf = sfIterator.next();
1895 sfIterator.remove();
1896 boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
1897 Cell candidate = state.getCandidate();
1898
1899 if (candidate != null && CellUtil.matchingRow(candidate, row)) {
1900 return candidate;
1901 }
1902 if (haveNewCandidate) {
1903 sfIterator = this.storeEngine.getStoreFileManager().updateCandidateFilesForRowKeyBefore(
1904 sfIterator, state.getTargetKey(), candidate);
1905 }
1906 }
1907 return state.getCandidate();
1908 } finally {
1909 this.lock.readLock().unlock();
1910 }
1911 }
1912
1913
1914
1915
1916
1917
1918
1919
1920 private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
1921 final GetClosestRowBeforeTracker state)
1922 throws IOException {
1923 StoreFile.Reader r = f.getReader();
1924 if (r == null) {
1925 LOG.warn("StoreFile " + f + " has a null Reader");
1926 return false;
1927 }
1928 if (r.getEntries() == 0) {
1929 LOG.warn("StoreFile " + f + " is a empty store file");
1930 return false;
1931 }
1932
1933 byte [] fk = r.getFirstKey();
1934 if (fk == null) return false;
1935 KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1936 byte [] lk = r.getLastKey();
1937 KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1938 KeyValue firstOnRow = state.getTargetKey();
1939 if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1940
1941
1942 if (!state.isTargetTable(lastKV)) return false;
1943
1944
1945 firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1946 }
1947
1948 HFileScanner scanner = r.getScanner(true, true, false);
1949
1950 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1951
1952
1953 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1954
1955 while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1956 firstOnRow.getKeyLength())) {
1957 Cell kv = scanner.getKeyValue();
1958 if (!state.isTargetTable(kv)) break;
1959 if (!state.isBetterCandidate(kv)) break;
1960
1961 firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1962
1963 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1964
1965 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1966 }
1967 return false;
1968 }
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978 private boolean seekToScanner(final HFileScanner scanner,
1979 final KeyValue firstOnRow,
1980 final KeyValue firstKV)
1981 throws IOException {
1982 KeyValue kv = firstOnRow;
1983
1984 if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1985 int result = scanner.seekTo(kv);
1986 return result != -1;
1987 }
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999 private boolean walkForwardInSingleRow(final HFileScanner scanner,
2000 final KeyValue firstOnRow,
2001 final GetClosestRowBeforeTracker state)
2002 throws IOException {
2003 boolean foundCandidate = false;
2004 do {
2005 Cell kv = scanner.getKeyValue();
2006
2007 if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
2008
2009 if (state.isTooFar(kv, firstOnRow)) break;
2010 if (state.isExpired(kv)) {
2011 continue;
2012 }
2013
2014 if (state.handle(kv)) {
2015 foundCandidate = true;
2016 break;
2017 }
2018 } while(scanner.next());
2019 return foundCandidate;
2020 }
2021
2022 @Override
2023 public boolean canSplit() {
2024 this.lock.readLock().lock();
2025 try {
2026
2027 boolean result = !hasReferences();
2028 if (!result && LOG.isDebugEnabled()) {
2029 LOG.debug("Cannot split region due to reference files being there");
2030 }
2031 return result;
2032 } finally {
2033 this.lock.readLock().unlock();
2034 }
2035 }
2036
2037 @Override
2038 public byte[] getSplitPoint() {
2039 this.lock.readLock().lock();
2040 try {
2041
2042 assert !this.getRegionInfo().isMetaRegion();
2043
2044 if (hasReferences()) {
2045 return null;
2046 }
2047 return this.storeEngine.getStoreFileManager().getSplitPoint();
2048 } catch(IOException e) {
2049 LOG.warn("Failed getting store size for " + this, e);
2050 } finally {
2051 this.lock.readLock().unlock();
2052 }
2053 return null;
2054 }
2055
2056 @Override
2057 public long getLastCompactSize() {
2058 return this.lastCompactSize;
2059 }
2060
2061 @Override
2062 public long getSize() {
2063 return storeSize;
2064 }
2065
2066 @Override
2067 public void triggerMajorCompaction() {
2068 this.forceMajor = true;
2069 }
2070
2071
2072
2073
2074
2075
2076 @Override
2077 public KeyValueScanner getScanner(Scan scan,
2078 final NavigableSet<byte []> targetCols, long readPt) throws IOException {
2079 lock.readLock().lock();
2080 try {
2081 KeyValueScanner scanner = null;
2082 if (this.getCoprocessorHost() != null) {
2083 scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
2084 }
2085 if (scanner == null) {
2086 scanner = scan.isReversed() ? new ReversedStoreScanner(this,
2087 getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
2088 getScanInfo(), scan, targetCols, readPt);
2089 }
2090 return scanner;
2091 } finally {
2092 lock.readLock().unlock();
2093 }
2094 }
2095
2096 @Override
2097 public String toString() {
2098 return this.getColumnFamilyName();
2099 }
2100
2101 @Override
2102 public int getStorefilesCount() {
2103 return this.storeEngine.getStoreFileManager().getStorefileCount();
2104 }
2105
2106 @Override
2107 public long getStoreSizeUncompressed() {
2108 return this.totalUncompressedBytes;
2109 }
2110
2111 @Override
2112 public long getStorefilesSize() {
2113 long size = 0;
2114 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2115 StoreFile.Reader r = s.getReader();
2116 if (r == null) {
2117 LOG.warn("StoreFile " + s + " has a null Reader");
2118 continue;
2119 }
2120 size += r.length();
2121 }
2122 return size;
2123 }
2124
2125 @Override
2126 public long getStorefilesIndexSize() {
2127 long size = 0;
2128 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2129 StoreFile.Reader r = s.getReader();
2130 if (r == null) {
2131 LOG.warn("StoreFile " + s + " has a null Reader");
2132 continue;
2133 }
2134 size += r.indexSize();
2135 }
2136 return size;
2137 }
2138
2139 @Override
2140 public long getTotalStaticIndexSize() {
2141 long size = 0;
2142 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2143 StoreFile.Reader r = s.getReader();
2144 if (r == null) {
2145 continue;
2146 }
2147 size += r.getUncompressedDataIndexSize();
2148 }
2149 return size;
2150 }
2151
2152 @Override
2153 public long getTotalStaticBloomSize() {
2154 long size = 0;
2155 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2156 StoreFile.Reader r = s.getReader();
2157 if (r == null) {
2158 continue;
2159 }
2160 size += r.getTotalBloomSize();
2161 }
2162 return size;
2163 }
2164
2165 @Override
2166 public long getMemStoreSize() {
2167 return this.memstore.size();
2168 }
2169
2170 @Override
2171 public int getCompactPriority() {
2172 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
2173 if (priority == PRIORITY_USER) {
2174 LOG.warn("Compaction priority is USER despite there being no user compaction");
2175 }
2176 return priority;
2177 }
2178
2179 @Override
2180 public boolean throttleCompaction(long compactionSize) {
2181 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
2182 }
2183
2184 public HRegion getHRegion() {
2185 return this.region;
2186 }
2187
2188 @Override
2189 public RegionCoprocessorHost getCoprocessorHost() {
2190 return this.region.getCoprocessorHost();
2191 }
2192
2193 @Override
2194 public HRegionInfo getRegionInfo() {
2195 return this.fs.getRegionInfo();
2196 }
2197
2198 @Override
2199 public boolean areWritesEnabled() {
2200 return this.region.areWritesEnabled();
2201 }
2202
2203 @Override
2204 public long getSmallestReadPoint() {
2205 return this.region.getSmallestReadPoint();
2206 }
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221 public long updateColumnValue(byte [] row, byte [] f,
2222 byte [] qualifier, long newValue)
2223 throws IOException {
2224
2225 this.lock.readLock().lock();
2226 try {
2227 long now = EnvironmentEdgeManager.currentTime();
2228
2229 return this.memstore.updateColumnValue(row,
2230 f,
2231 qualifier,
2232 newValue,
2233 now);
2234
2235 } finally {
2236 this.lock.readLock().unlock();
2237 }
2238 }
2239
2240 @Override
2241 public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
2242 this.lock.readLock().lock();
2243 try {
2244 return this.memstore.upsert(cells, readpoint);
2245 } finally {
2246 this.lock.readLock().unlock();
2247 }
2248 }
2249
2250 @Override
2251 public StoreFlushContext createFlushContext(long cacheFlushId) {
2252 return new StoreFlusherImpl(cacheFlushId);
2253 }
2254
2255 private final class StoreFlusherImpl implements StoreFlushContext {
2256
2257 private long cacheFlushSeqNum;
2258 private MemStoreSnapshot snapshot;
2259 private List<Path> tempFiles;
2260 private List<Path> committedFiles;
2261 private long cacheFlushCount;
2262 private long cacheFlushSize;
2263
2264 private StoreFlusherImpl(long cacheFlushSeqNum) {
2265 this.cacheFlushSeqNum = cacheFlushSeqNum;
2266 }
2267
2268
2269
2270
2271
2272 @Override
2273 public void prepare() {
2274 this.snapshot = memstore.snapshot();
2275 this.cacheFlushCount = snapshot.getCellsCount();
2276 this.cacheFlushSize = snapshot.getSize();
2277 committedFiles = new ArrayList<Path>(1);
2278 }
2279
2280 @Override
2281 public void flushCache(MonitoredTask status) throws IOException {
2282 tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
2283 }
2284
2285 @Override
2286 public boolean commit(MonitoredTask status) throws IOException {
2287 if (this.tempFiles == null || this.tempFiles.isEmpty()) {
2288 return false;
2289 }
2290 List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
2291 for (Path storeFilePath : tempFiles) {
2292 try {
2293 storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status));
2294 } catch (IOException ex) {
2295 LOG.error("Failed to commit store file " + storeFilePath, ex);
2296
2297 for (StoreFile sf : storeFiles) {
2298 Path pathToDelete = sf.getPath();
2299 try {
2300 sf.deleteReader();
2301 } catch (IOException deleteEx) {
2302 LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
2303 Runtime.getRuntime().halt(1);
2304 }
2305 }
2306 throw new IOException("Failed to commit the flush", ex);
2307 }
2308 }
2309
2310 for (StoreFile sf : storeFiles) {
2311 if (HStore.this.getCoprocessorHost() != null) {
2312 HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
2313 }
2314 committedFiles.add(sf.getPath());
2315 }
2316
2317 HStore.this.flushedCellsCount += cacheFlushCount;
2318 HStore.this.flushedCellsSize += cacheFlushSize;
2319
2320
2321 return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
2322 }
2323
2324 @Override
2325 public List<Path> getCommittedFiles() {
2326 return committedFiles;
2327 }
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337 @Override
2338 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
2339 throws IOException {
2340 List<StoreFile> storeFiles = new ArrayList<StoreFile>(fileNames.size());
2341 for (String file : fileNames) {
2342
2343 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
2344 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
2345 storeFiles.add(storeFile);
2346 HStore.this.storeSize += storeFile.getReader().length();
2347 HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
2348 if (LOG.isInfoEnabled()) {
2349 LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
2350 " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
2351 ", sequenceid=" + + storeFile.getReader().getSequenceID() +
2352 ", filesize=" + StringUtils.humanReadableInt(storeFile.getReader().length()));
2353 }
2354 }
2355
2356 long snapshotId = -1;
2357 if (dropMemstoreSnapshot && snapshot != null) {
2358 snapshotId = snapshot.getId();
2359 }
2360 HStore.this.updateStorefiles(storeFiles, snapshotId);
2361 }
2362
2363
2364
2365
2366
2367 @Override
2368 public void abort() throws IOException {
2369 if (snapshot == null) {
2370 return;
2371 }
2372 HStore.this.updateStorefiles(new ArrayList<StoreFile>(0), snapshot.getId());
2373 }
2374 }
2375
2376 @Override
2377 public boolean needsCompaction() {
2378 List<StoreFile> filesCompactingClone = null;
2379 synchronized (filesCompacting) {
2380 filesCompactingClone = Lists.newArrayList(filesCompacting);
2381 }
2382 return this.storeEngine.needsCompaction(filesCompactingClone);
2383 }
2384
2385 @Override
2386 public CacheConfig getCacheConfig() {
2387 return this.cacheConf;
2388 }
2389
2390 public static final long FIXED_OVERHEAD =
2391 ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
2392 + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
2393
2394 public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2395 + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2396 + ClassSize.CONCURRENT_SKIPLISTMAP
2397 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2398 + ScanInfo.FIXED_OVERHEAD);
2399
2400 @Override
2401 public long heapSize() {
2402 return DEEP_OVERHEAD + this.memstore.heapSize();
2403 }
2404
2405 @Override
2406 public KeyValue.KVComparator getComparator() {
2407 return comparator;
2408 }
2409
2410 @Override
2411 public ScanInfo getScanInfo() {
2412 return scanInfo;
2413 }
2414
2415
2416
2417
2418
2419 void setScanInfo(ScanInfo scanInfo) {
2420 this.scanInfo = scanInfo;
2421 }
2422
2423 @Override
2424 public boolean hasTooManyStoreFiles() {
2425 return getStorefilesCount() > this.blockingFileCount;
2426 }
2427
2428 @Override
2429 public long getFlushedCellsCount() {
2430 return flushedCellsCount;
2431 }
2432
2433 @Override
2434 public long getFlushedCellsSize() {
2435 return flushedCellsSize;
2436 }
2437
2438 @Override
2439 public long getCompactedCellsCount() {
2440 return compactedCellsCount;
2441 }
2442
2443 @Override
2444 public long getCompactedCellsSize() {
2445 return compactedCellsSize;
2446 }
2447
2448 @Override
2449 public long getMajorCompactedCellsCount() {
2450 return majorCompactedCellsCount;
2451 }
2452
2453 @Override
2454 public long getMajorCompactedCellsSize() {
2455 return majorCompactedCellsSize;
2456 }
2457
2458
2459
2460
2461
2462 @VisibleForTesting
2463 public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2464 return this.storeEngine;
2465 }
2466
2467 protected OffPeakHours getOffPeakHours() {
2468 return this.offPeakHours;
2469 }
2470
2471
2472
2473
2474 @Override
2475 public void onConfigurationChange(Configuration conf) {
2476 this.conf = new CompoundConfiguration()
2477 .add(conf)
2478 .addWritableMap(family.getValues());
2479 this.storeEngine.compactionPolicy.setConf(conf);
2480 this.offPeakHours = OffPeakHours.getInstance(conf);
2481 }
2482
2483
2484
2485
2486 @Override
2487 public void registerChildren(ConfigurationManager manager) {
2488
2489 }
2490
2491
2492
2493
2494 @Override
2495 public void deregisterChildren(ConfigurationManager manager) {
2496
2497 }
2498
2499 @Override
2500 public double getCompactionPressure() {
2501 return storeEngine.getStoreFileManager().getCompactionPressure();
2502 }
2503
2504 @Override
2505 public boolean isPrimaryReplicaStore() {
2506 return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
2507 }
2508 }