1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.NavigableSet;
29 import java.util.SortedSet;
30 import java.util.concurrent.Callable;
31 import java.util.concurrent.CompletionService;
32 import java.util.concurrent.CopyOnWriteArraySet;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorCompletionService;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.ThreadPoolExecutor;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicLong;
39 import java.util.concurrent.locks.ReentrantReadWriteLock;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.classification.InterfaceAudience;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.hbase.Cell;
48 import org.apache.hadoop.hbase.CompoundConfiguration;
49 import org.apache.hadoop.hbase.HColumnDescriptor;
50 import org.apache.hadoop.hbase.HConstants;
51 import org.apache.hadoop.hbase.HRegionInfo;
52 import org.apache.hadoop.hbase.KeyValue;
53 import org.apache.hadoop.hbase.RemoteExceptionHandler;
54 import org.apache.hadoop.hbase.client.Scan;
55 import org.apache.hadoop.hbase.exceptions.InvalidHFileException;
56 import org.apache.hadoop.hbase.exceptions.WrongRegionException;
57 import org.apache.hadoop.hbase.io.compress.Compression;
58 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
59 import org.apache.hadoop.hbase.io.hfile.HFile;
60 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
61 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
62 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
63 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
64 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
65 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
66 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
67 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
68 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
69 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
70 import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
71 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
72 import org.apache.hadoop.hbase.util.Bytes;
73 import org.apache.hadoop.hbase.util.ChecksumType;
74 import org.apache.hadoop.hbase.util.ClassSize;
75 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
76 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77 import org.apache.hadoop.util.StringUtils;
78
79 import com.google.common.annotations.VisibleForTesting;
80 import com.google.common.base.Preconditions;
81 import com.google.common.collect.ImmutableCollection;
82 import com.google.common.collect.ImmutableList;
83 import com.google.common.collect.Lists;
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 @InterfaceAudience.Private
109 public class HStore implements Store {
110 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
111 "hbase.server.compactchecker.interval.multiplier";
112 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
113 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
114 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
115
116 static final Log LOG = LogFactory.getLog(HStore.class);
117
118 protected final MemStore memstore;
119
120 private final HRegion region;
121 private final HColumnDescriptor family;
122 private final HRegionFileSystem fs;
123 private final Configuration conf;
124 private final CacheConfig cacheConf;
125 private long lastCompactSize = 0;
126 volatile boolean forceMajor = false;
127
128 static int closeCheckInterval = 0;
129 private volatile long storeSize = 0L;
130 private volatile long totalUncompressedBytes = 0L;
131
132
133
134
135
136
137
138
139
140
141 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
142 private final boolean verifyBulkLoads;
143
144 private ScanInfo scanInfo;
145
146 final List<StoreFile> filesCompacting = Lists.newArrayList();
147
148
149 private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
150 new CopyOnWriteArraySet<ChangedReadersObserver>();
151
152 private final int blocksize;
153 private HFileDataBlockEncoder dataBlockEncoder;
154
155
156 private ChecksumType checksumType;
157 private int bytesPerChecksum;
158
159
160 private final KeyValue.KVComparator comparator;
161
162 final StoreEngine<?, ?, ?, ?> storeEngine;
163
164 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
165 private final OffPeakHours offPeakHours;
166
167 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
168 private int flushRetriesNumber;
169 private int pauseTime;
170
171 private long blockingFileCount;
172 private int compactionCheckMultiplier;
173
174
175
176
177
178
179
180
181
182 protected HStore(final HRegion region, final HColumnDescriptor family,
183 final Configuration confParam) throws IOException {
184
185 HRegionInfo info = region.getRegionInfo();
186 this.fs = region.getRegionFileSystem();
187
188
189 fs.createStoreDir(family.getNameAsString());
190 this.region = region;
191 this.family = family;
192
193
194
195 this.conf = new CompoundConfiguration()
196 .add(confParam)
197 .addStringMap(region.getTableDesc().getConfiguration())
198 .addStringMap(family.getConfiguration())
199 .addWritableMap(family.getValues());
200 this.blocksize = family.getBlocksize();
201
202 this.dataBlockEncoder =
203 new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
204 family.getDataBlockEncoding());
205
206 this.comparator = info.getComparator();
207
208 long timeToPurgeDeletes =
209 Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
210 LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
211 "ms in store " + this);
212
213 long ttl = determineTTLFromFamily(family);
214
215
216 scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
217 this.memstore = new MemStore(conf, this.comparator);
218 this.offPeakHours = OffPeakHours.getInstance(conf);
219
220
221 this.cacheConf = new CacheConfig(conf, family);
222
223 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
224
225 this.blockingFileCount =
226 conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
227 this.compactionCheckMultiplier = conf.getInt(
228 COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
229 if (this.compactionCheckMultiplier <= 0) {
230 LOG.error("Compaction check period multiplier must be positive, setting default: "
231 + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
232 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
233 }
234
235 if (HStore.closeCheckInterval == 0) {
236 HStore.closeCheckInterval = conf.getInt(
237 "hbase.hstore.close.check.interval", 10*1000*1000
238 }
239
240 this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
241 this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
242
243
244 this.checksumType = getChecksumType(conf);
245
246 this.bytesPerChecksum = getBytesPerChecksum(conf);
247 flushRetriesNumber = conf.getInt(
248 "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
249 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
250 if (flushRetriesNumber <= 0) {
251 throw new IllegalArgumentException(
252 "hbase.hstore.flush.retries.number must be > 0, not "
253 + flushRetriesNumber);
254 }
255 }
256
257
258
259
260
261 private static long determineTTLFromFamily(final HColumnDescriptor family) {
262
263 long ttl = family.getTimeToLive();
264 if (ttl == HConstants.FOREVER) {
265
266 ttl = Long.MAX_VALUE;
267 } else if (ttl == -1) {
268 ttl = Long.MAX_VALUE;
269 } else {
270
271 ttl *= 1000;
272 }
273 return ttl;
274 }
275
276 public String getColumnFamilyName() {
277 return this.family.getNameAsString();
278 }
279
280 @Override
281 public String getTableName() {
282 return this.getRegionInfo().getTableNameAsString();
283 }
284
285 @Override
286 public FileSystem getFileSystem() {
287 return this.fs.getFileSystem();
288 }
289
290 public HRegionFileSystem getRegionFileSystem() {
291 return this.fs;
292 }
293
294
295 @Override
296 public long getStoreFileTtl() {
297
298 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
299 }
300
301 @Override
302 public long getMemstoreFlushSize() {
303 return this.region.memstoreFlushSize;
304 }
305
306 @Override
307 public long getCompactionCheckMultiplier() {
308 return this.compactionCheckMultiplier;
309 }
310
311 public long getBlockingFileCount() {
312 return blockingFileCount;
313 }
314
315
316
317
318
319
320
321 public static int getBytesPerChecksum(Configuration conf) {
322 return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
323 HFile.DEFAULT_BYTES_PER_CHECKSUM);
324 }
325
326
327
328
329
330
331 public static ChecksumType getChecksumType(Configuration conf) {
332 String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
333 if (checksumName == null) {
334 return HFile.DEFAULT_CHECKSUM_TYPE;
335 } else {
336 return ChecksumType.nameToType(checksumName);
337 }
338 }
339
340
341
342
343 public static int getCloseCheckInterval() {
344 return closeCheckInterval;
345 }
346
347 public HColumnDescriptor getFamily() {
348 return this.family;
349 }
350
351
352
353
354 long getMaxSequenceId(boolean includeBulkFiles) {
355 return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles);
356 }
357
358 @Override
359 public long getMaxMemstoreTS() {
360 return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
361 }
362
363
364
365
366
367
368
369 @Deprecated
370 public static Path getStoreHomedir(final Path tabledir,
371 final HRegionInfo hri, final byte[] family) {
372 return getStoreHomedir(tabledir, hri.getEncodedName(), family);
373 }
374
375
376
377
378
379
380
381 @Deprecated
382 public static Path getStoreHomedir(final Path tabledir,
383 final String encodedName, final byte[] family) {
384 return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
385 }
386
387 @Override
388 public HFileDataBlockEncoder getDataBlockEncoder() {
389 return dataBlockEncoder;
390 }
391
392
393
394
395
396 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
397 this.dataBlockEncoder = blockEncoder;
398 }
399
400
401
402
403
404
405 private List<StoreFile> loadStoreFiles() throws IOException {
406 Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
407 if (files == null || files.size() == 0) {
408 return new ArrayList<StoreFile>();
409 }
410
411
412 ThreadPoolExecutor storeFileOpenerThreadPool =
413 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
414 this.getColumnFamilyName());
415 CompletionService<StoreFile> completionService =
416 new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
417
418 int totalValidStoreFile = 0;
419 for (final StoreFileInfo storeFileInfo: files) {
420
421 completionService.submit(new Callable<StoreFile>() {
422 public StoreFile call() throws IOException {
423 StoreFile storeFile = createStoreFileAndReader(storeFileInfo.getPath());
424 return storeFile;
425 }
426 });
427 totalValidStoreFile++;
428 }
429
430 ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
431 IOException ioe = null;
432 try {
433 for (int i = 0; i < totalValidStoreFile; i++) {
434 try {
435 Future<StoreFile> future = completionService.take();
436 StoreFile storeFile = future.get();
437 long length = storeFile.getReader().length();
438 this.storeSize += length;
439 this.totalUncompressedBytes +=
440 storeFile.getReader().getTotalUncompressedBytes();
441 if (LOG.isDebugEnabled()) {
442 LOG.debug("loaded " + storeFile.toStringDetailed());
443 }
444 results.add(storeFile);
445 } catch (InterruptedException e) {
446 if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
447 } catch (ExecutionException e) {
448 if (ioe == null) ioe = new IOException(e.getCause());
449 }
450 }
451 } finally {
452 storeFileOpenerThreadPool.shutdownNow();
453 }
454 if (ioe != null) {
455
456 try {
457 for (StoreFile file : results) {
458 if (file != null) file.closeReader(true);
459 }
460 } catch (IOException e) { }
461 throw ioe;
462 }
463
464 return results;
465 }
466
467 private StoreFile createStoreFileAndReader(final Path p) throws IOException {
468 return createStoreFileAndReader(p, this.dataBlockEncoder);
469 }
470
471 private StoreFile createStoreFileAndReader(final Path p, final HFileDataBlockEncoder encoder) throws IOException {
472 StoreFile storeFile = new StoreFile(this.getFileSystem(), p, this.conf, this.cacheConf,
473 this.family.getBloomFilterType(), encoder);
474 storeFile.createReader();
475 return storeFile;
476 }
477
478 @Override
479 public long add(final KeyValue kv) {
480 lock.readLock().lock();
481 try {
482 return this.memstore.add(kv);
483 } finally {
484 lock.readLock().unlock();
485 }
486 }
487
488 @Override
489 public long timeOfOldestEdit() {
490 return memstore.timeOfOldestEdit();
491 }
492
493
494
495
496
497
498
499 protected long delete(final KeyValue kv) {
500 lock.readLock().lock();
501 try {
502 return this.memstore.delete(kv);
503 } finally {
504 lock.readLock().unlock();
505 }
506 }
507
508 @Override
509 public void rollback(final KeyValue kv) {
510 lock.readLock().lock();
511 try {
512 this.memstore.rollback(kv);
513 } finally {
514 lock.readLock().unlock();
515 }
516 }
517
518
519
520
521 @Override
522 public Collection<StoreFile> getStorefiles() {
523 return this.storeEngine.getStoreFileManager().getStorefiles();
524 }
525
526 @Override
527 public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
528 HFile.Reader reader = null;
529 try {
530 LOG.info("Validating hfile at " + srcPath + " for inclusion in "
531 + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
532 reader = HFile.createReader(srcPath.getFileSystem(conf),
533 srcPath, cacheConf);
534 reader.loadFileInfo();
535
536 byte[] firstKey = reader.getFirstRowKey();
537 Preconditions.checkState(firstKey != null, "First key can not be null");
538 byte[] lk = reader.getLastKey();
539 Preconditions.checkState(lk != null, "Last key can not be null");
540 byte[] lastKey = KeyValue.createKeyValueFromKey(lk).getRow();
541
542 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
543 " last=" + Bytes.toStringBinary(lastKey));
544 LOG.debug("Region bounds: first=" +
545 Bytes.toStringBinary(getRegionInfo().getStartKey()) +
546 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
547
548 if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
549 throw new WrongRegionException(
550 "Bulk load file " + srcPath.toString() + " does not fit inside region "
551 + this.getRegionInfo().getRegionNameAsString());
552 }
553
554 if (verifyBulkLoads) {
555 KeyValue prevKV = null;
556 HFileScanner scanner = reader.getScanner(false, false, false);
557 scanner.seekTo();
558 do {
559 KeyValue kv = scanner.getKeyValue();
560 if (prevKV != null) {
561 if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getRowOffset(),
562 prevKV.getRowLength(), kv.getBuffer(), kv.getRowOffset(),
563 kv.getRowLength()) > 0) {
564 throw new InvalidHFileException("Previous row is greater than"
565 + " current row: path=" + srcPath + " previous="
566 + Bytes.toStringBinary(prevKV.getKey()) + " current="
567 + Bytes.toStringBinary(kv.getKey()));
568 }
569 if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getFamilyOffset(),
570 prevKV.getFamilyLength(), kv.getBuffer(), kv.getFamilyOffset(),
571 kv.getFamilyLength()) != 0) {
572 throw new InvalidHFileException("Previous key had different"
573 + " family compared to current key: path=" + srcPath
574 + " previous=" + Bytes.toStringBinary(prevKV.getFamily())
575 + " current=" + Bytes.toStringBinary(kv.getFamily()));
576 }
577 }
578 prevKV = kv;
579 } while (scanner.next());
580 }
581 } finally {
582 if (reader != null) reader.close();
583 }
584 }
585
586 @Override
587 public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
588 Path srcPath = new Path(srcPathStr);
589 Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
590
591 StoreFile sf = createStoreFileAndReader(dstPath);
592
593 StoreFile.Reader r = sf.getReader();
594 this.storeSize += r.length();
595 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
596
597 LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() +
598 "' as " + dstPath + " - updating store file list.");
599
600
601 this.lock.writeLock().lock();
602 try {
603 this.storeEngine.getStoreFileManager().insertNewFile(sf);
604 } finally {
605
606
607
608
609
610 this.lock.writeLock().unlock();
611 }
612 notifyChangedReadersObservers();
613 LOG.info("Successfully loaded store file " + srcPath
614 + " into store " + this + " (new location: " + dstPath + ")");
615 }
616
617 @Override
618 public ImmutableCollection<StoreFile> close() throws IOException {
619 this.lock.writeLock().lock();
620 try {
621
622 ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
623
624 if (!result.isEmpty()) {
625
626 ThreadPoolExecutor storeFileCloserThreadPool = this.region
627 .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
628 + this.getColumnFamilyName());
629
630
631 CompletionService<Void> completionService =
632 new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
633 for (final StoreFile f : result) {
634 completionService.submit(new Callable<Void>() {
635 public Void call() throws IOException {
636 f.closeReader(true);
637 return null;
638 }
639 });
640 }
641
642 IOException ioe = null;
643 try {
644 for (int i = 0; i < result.size(); i++) {
645 try {
646 Future<Void> future = completionService.take();
647 future.get();
648 } catch (InterruptedException e) {
649 if (ioe == null) {
650 ioe = new InterruptedIOException();
651 ioe.initCause(e);
652 }
653 } catch (ExecutionException e) {
654 if (ioe == null) ioe = new IOException(e.getCause());
655 }
656 }
657 } finally {
658 storeFileCloserThreadPool.shutdownNow();
659 }
660 if (ioe != null) throw ioe;
661 }
662 LOG.info("Closed " + this);
663 return result;
664 } finally {
665 this.lock.writeLock().unlock();
666 }
667 }
668
669
670
671
672
673
674 void snapshot() {
675 this.memstore.snapshot();
676 }
677
678
679
680
681
682
683
684
685
686
687
688
689 protected List<Path> flushCache(final long logCacheFlushId,
690 SortedSet<KeyValue> snapshot,
691 TimeRangeTracker snapshotTimeRangeTracker,
692 AtomicLong flushedSize,
693 MonitoredTask status) throws IOException {
694
695
696
697
698
699 StoreFlusher flusher = storeEngine.getStoreFlusher();
700 IOException lastException = null;
701 for (int i = 0; i < flushRetriesNumber; i++) {
702 try {
703 List<Path> pathNames = flusher.flushSnapshot(
704 snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
705 Path lastPathName = null;
706 try {
707 for (Path pathName : pathNames) {
708 lastPathName = pathName;
709 validateStoreFile(pathName);
710 }
711 return pathNames;
712 } catch (Exception e) {
713 LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
714 if (e instanceof IOException) {
715 lastException = (IOException) e;
716 } else {
717 lastException = new IOException(e);
718 }
719 }
720 } catch (IOException e) {
721 LOG.warn("Failed flushing store file, retring num=" + i, e);
722 lastException = e;
723 }
724 if (lastException != null) {
725 try {
726 Thread.sleep(pauseTime);
727 } catch (InterruptedException e) {
728 IOException iie = new InterruptedIOException();
729 iie.initCause(e);
730 throw iie;
731 }
732 }
733 }
734 throw lastException;
735 }
736
737
738
739
740
741
742
743 private StoreFile commitFile(final Path path,
744 final long logCacheFlushId,
745 TimeRangeTracker snapshotTimeRangeTracker,
746 AtomicLong flushedSize,
747 MonitoredTask status)
748 throws IOException {
749
750 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
751
752 status.setStatus("Flushing " + this + ": reopening flushed file");
753 StoreFile sf = createStoreFileAndReader(dstPath);
754
755 StoreFile.Reader r = sf.getReader();
756 this.storeSize += r.length();
757 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
758
759 if (LOG.isInfoEnabled()) {
760 LOG.info("Added " + sf + ", entries=" + r.getEntries() +
761 ", sequenceid=" + logCacheFlushId +
762 ", filesize=" + StringUtils.humanReadableInt(r.length()));
763 }
764 return sf;
765 }
766
767
768
769
770
771 private StoreFile.Writer createWriterInTmp(long maxKeyCount)
772 throws IOException {
773 return createWriterInTmp(maxKeyCount, this.family.getCompression(), false, true);
774 }
775
776
777
778
779
780
781
782 public StoreFile.Writer createWriterInTmp(long maxKeyCount,
783 Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint)
784 throws IOException {
785 final CacheConfig writerCacheConf;
786 if (isCompaction) {
787
788 writerCacheConf = new CacheConfig(cacheConf);
789 writerCacheConf.setCacheDataOnWrite(false);
790 } else {
791 writerCacheConf = cacheConf;
792 }
793 StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
794 this.getFileSystem(), blocksize)
795 .withFilePath(fs.createTempName())
796 .withDataBlockEncoder(dataBlockEncoder)
797 .withComparator(comparator)
798 .withBloomType(family.getBloomFilterType())
799 .withMaxKeyCount(maxKeyCount)
800 .withChecksumType(checksumType)
801 .withBytesPerChecksum(bytesPerChecksum)
802 .withCompression(compression)
803 .includeMVCCReadpoint(includeMVCCReadpoint)
804 .build();
805 return w;
806 }
807
808
809
810
811
812
813
814
815 private boolean updateStorefiles(
816 final List<StoreFile> sfs, final SortedSet<KeyValue> set) throws IOException {
817 this.lock.writeLock().lock();
818 try {
819 for (StoreFile sf : sfs) {
820 this.storeEngine.getStoreFileManager().insertNewFile(sf);
821 }
822 this.memstore.clearSnapshot(set);
823 } finally {
824
825
826
827
828
829 this.lock.writeLock().unlock();
830 }
831
832
833 notifyChangedReadersObservers();
834
835 return needsCompaction();
836 }
837
838
839
840
841
842 private void notifyChangedReadersObservers() throws IOException {
843 for (ChangedReadersObserver o: this.changedReaderObservers) {
844 o.updateReaders();
845 }
846 }
847
848
849
850
851
852
853 @Override
854 public List<KeyValueScanner> getScanners(boolean cacheBlocks,
855 boolean isGet, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
856 byte[] stopRow) throws IOException {
857 Collection<StoreFile> storeFilesToScan;
858 List<KeyValueScanner> memStoreScanners;
859 this.lock.readLock().lock();
860 try {
861 storeFilesToScan =
862 this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
863 memStoreScanners = this.memstore.getScanners();
864 } finally {
865 this.lock.readLock().unlock();
866 }
867
868
869
870
871
872
873 List<StoreFileScanner> sfScanners = StoreFileScanner
874 .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, isGet, isCompaction, matcher);
875 List<KeyValueScanner> scanners =
876 new ArrayList<KeyValueScanner>(sfScanners.size()+1);
877 scanners.addAll(sfScanners);
878
879 scanners.addAll(memStoreScanners);
880 return scanners;
881 }
882
883 @Override
884 public void addChangedReaderObserver(ChangedReadersObserver o) {
885 this.changedReaderObservers.add(o);
886 }
887
888 @Override
889 public void deleteChangedReaderObserver(ChangedReadersObserver o) {
890
891 this.changedReaderObservers.remove(o);
892 }
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941 public List<StoreFile> compact(CompactionContext compaction) throws IOException {
942 assert compaction != null && compaction.hasSelection();
943 CompactionRequest cr = compaction.getRequest();
944 Collection<StoreFile> filesToCompact = cr.getFiles();
945 assert !filesToCompact.isEmpty();
946 synchronized (filesCompacting) {
947
948
949 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
950 }
951
952
953 LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
954 + this + " of " + this.getRegionInfo().getRegionNameAsString()
955 + " into tmpdir=" + fs.getTempDir() + ", totalSize="
956 + StringUtils.humanReadableInt(cr.getSize()));
957
958 long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
959 List<StoreFile> sfs = null;
960 try {
961
962 List<Path> newFiles = compaction.compact();
963
964 if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
965 LOG.warn("hbase.hstore.compaction.complete is set to false");
966 sfs = new ArrayList<StoreFile>();
967 for (Path newFile : newFiles) {
968
969 StoreFile sf = createStoreFileAndReader(newFile);
970 sfs.add(sf);
971 }
972 return sfs;
973 }
974
975 sfs = moveCompatedFilesIntoPlace(cr, newFiles);
976 writeCompactionWalRecord(filesToCompact, sfs);
977 replaceStoreFiles(filesToCompact, sfs);
978
979 completeCompaction(filesToCompact);
980 } finally {
981 finishCompactionRequest(cr);
982 }
983 logCompactionEndMessage(cr, sfs, compactionStartTime);
984 return sfs;
985 }
986
987 private List<StoreFile> moveCompatedFilesIntoPlace(
988 CompactionRequest cr, List<Path> newFiles) throws IOException {
989 List<StoreFile> sfs = new ArrayList<StoreFile>();
990 for (Path newFile : newFiles) {
991 assert newFile != null;
992 StoreFile sf = moveFileIntoPlace(newFile);
993 if (this.getCoprocessorHost() != null) {
994 this.getCoprocessorHost().postCompact(this, sf, cr);
995 }
996 assert sf != null;
997 sfs.add(sf);
998 }
999 return sfs;
1000 }
1001
1002
1003 StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1004 validateStoreFile(newFile);
1005
1006 Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1007 return createStoreFileAndReader(destPath);
1008 }
1009
1010
1011
1012
1013
1014
1015 private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1016 Collection<StoreFile> newFiles) throws IOException {
1017 if (region.getLog() == null) return;
1018 List<Path> inputPaths = new ArrayList<Path>();
1019 for (StoreFile f : filesCompacted) {
1020 inputPaths.add(f.getPath());
1021 }
1022 List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1023 for (StoreFile f : newFiles) {
1024 outputPaths.add(f.getPath());
1025 }
1026 HRegionInfo info = this.region.getRegionInfo();
1027 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1028 family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1029 HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
1030 this.region.getRegionInfo(), compactionDescriptor);
1031 }
1032
1033 private void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1034 final Collection<StoreFile> result) throws IOException {
1035 this.lock.writeLock().lock();
1036 try {
1037 this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1038 filesCompacting.removeAll(compactedFiles);
1039 } finally {
1040 this.lock.writeLock().unlock();
1041 }
1042 }
1043
1044
1045
1046
1047
1048
1049
1050 private void logCompactionEndMessage(
1051 CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
1052 long now = EnvironmentEdgeManager.currentTimeMillis();
1053 StringBuilder message = new StringBuilder(
1054 "Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
1055 + cr.getFiles().size() + " file(s) in " + this + " of "
1056 + this.getRegionInfo().getRegionNameAsString()
1057 + " into ");
1058 if (sfs.isEmpty()) {
1059 message.append("none, ");
1060 } else {
1061 for (StoreFile sf: sfs) {
1062 message.append(sf.getPath().getName());
1063 message.append("(size=");
1064 message.append(StringUtils.humanReadableInt(sf.getReader().length()));
1065 message.append("), ");
1066 }
1067 }
1068 message.append("total size for store is ")
1069 .append(StringUtils.humanReadableInt(storeSize))
1070 .append(". This selection was in queue for ")
1071 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1072 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1073 .append(" to execute.");
1074 LOG.info(message.toString());
1075 }
1076
1077
1078
1079
1080
1081
1082
1083 public void completeCompactionMarker(CompactionDescriptor compaction)
1084 throws IOException {
1085 LOG.debug("Completing compaction from the WAL marker");
1086 List<String> compactionInputs = compaction.getCompactionInputList();
1087 List<String> compactionOutputs = compaction.getCompactionOutputList();
1088
1089 List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
1090 for (String compactionOutput : compactionOutputs) {
1091
1092 boolean found = false;
1093 Path outputPath = new Path(fs.getStoreDir(family.getNameAsString()), compactionOutput);
1094 outputPath = outputPath.makeQualified(fs.getFileSystem());
1095 for (StoreFile sf : this.getStorefiles()) {
1096 if (sf.getPath().makeQualified(sf.getPath().getFileSystem(conf)).equals(outputPath)) {
1097 found = true;
1098 break;
1099 }
1100 }
1101 if (!found) {
1102 if (getFileSystem().exists(outputPath)) {
1103 outputStoreFiles.add(createStoreFileAndReader(outputPath));
1104 }
1105 }
1106 }
1107
1108 List<Path> inputPaths = new ArrayList<Path>(compactionInputs.size());
1109 for (String compactionInput : compactionInputs) {
1110 Path inputPath = new Path(fs.getStoreDir(family.getNameAsString()), compactionInput);
1111 inputPath = inputPath.makeQualified(fs.getFileSystem());
1112 inputPaths.add(inputPath);
1113 }
1114
1115
1116 List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1117 for (StoreFile sf : this.getStorefiles()) {
1118 if (inputPaths.contains(sf.getPath().makeQualified(fs.getFileSystem()))) {
1119 inputStoreFiles.add(sf);
1120 }
1121 }
1122
1123 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1124 this.completeCompaction(inputStoreFiles);
1125 }
1126
1127
1128
1129
1130
1131
1132
1133
1134 public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1135 List<StoreFile> filesToCompact;
1136 boolean isMajor;
1137
1138 this.lock.readLock().lock();
1139 try {
1140 synchronized (filesCompacting) {
1141 filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1142 if (!filesCompacting.isEmpty()) {
1143
1144
1145 StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1146 int idx = filesToCompact.indexOf(last);
1147 Preconditions.checkArgument(idx != -1);
1148 filesToCompact.subList(0, idx + 1).clear();
1149 }
1150 int count = filesToCompact.size();
1151 if (N > count) {
1152 throw new RuntimeException("Not enough files");
1153 }
1154
1155 filesToCompact = filesToCompact.subList(count - N, count);
1156 isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1157 filesCompacting.addAll(filesToCompact);
1158 Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1159 }
1160 } finally {
1161 this.lock.readLock().unlock();
1162 }
1163
1164 try {
1165
1166 List<Path> newFiles =
1167 this.storeEngine.getCompactor().compactForTesting(filesToCompact, isMajor);
1168 for (Path newFile: newFiles) {
1169
1170 StoreFile sf = moveFileIntoPlace(newFile);
1171 if (this.getCoprocessorHost() != null) {
1172 this.getCoprocessorHost().postCompact(this, sf, null);
1173 }
1174 replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1175 completeCompaction(filesToCompact);
1176 }
1177 } finally {
1178 synchronized (filesCompacting) {
1179 filesCompacting.removeAll(filesToCompact);
1180 }
1181 }
1182 }
1183
1184 @Override
1185 public boolean hasReferences() {
1186 return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1187 }
1188
1189 @Override
1190 public CompactionProgress getCompactionProgress() {
1191 return this.storeEngine.getCompactor().getProgress();
1192 }
1193
1194 @Override
1195 public boolean isMajorCompaction() throws IOException {
1196 for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1197
1198 if (sf.getReader() == null) {
1199 LOG.debug("StoreFile " + sf + " has null Reader");
1200 return false;
1201 }
1202 }
1203 return storeEngine.getCompactionPolicy().isMajorCompaction(
1204 this.storeEngine.getStoreFileManager().getStorefiles());
1205 }
1206
1207 @Override
1208 public CompactionContext requestCompaction() throws IOException {
1209 return requestCompaction(Store.NO_PRIORITY, null);
1210 }
1211
1212 @Override
1213 public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1214 throws IOException {
1215
1216 if (!this.areWritesEnabled()) {
1217 return null;
1218 }
1219
1220 CompactionContext compaction = storeEngine.createCompaction();
1221 this.lock.readLock().lock();
1222 try {
1223 synchronized (filesCompacting) {
1224
1225 if (this.getCoprocessorHost() != null) {
1226 List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1227 boolean override = this.getCoprocessorHost().preCompactSelection(
1228 this, candidatesForCoproc, baseRequest);
1229 if (override) {
1230
1231 compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1232 }
1233 }
1234
1235
1236 if (!compaction.hasSelection()) {
1237 boolean isUserCompaction = priority == Store.PRIORITY_USER;
1238 boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1239 offPeakCompactionTracker.compareAndSet(false, true);
1240 try {
1241 compaction.select(this.filesCompacting, isUserCompaction,
1242 mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1243 } catch (IOException e) {
1244 if (mayUseOffPeak) {
1245 offPeakCompactionTracker.set(false);
1246 }
1247 throw e;
1248 }
1249 assert compaction.hasSelection();
1250 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1251
1252 offPeakCompactionTracker.set(false);
1253 }
1254 }
1255 if (this.getCoprocessorHost() != null) {
1256 this.getCoprocessorHost().postCompactSelection(
1257 this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
1258 }
1259
1260
1261 if (baseRequest != null) {
1262
1263
1264 compaction.forceSelect(
1265 baseRequest.combineWith(compaction.getRequest()));
1266 }
1267
1268
1269 final Collection<StoreFile> selectedFiles = compaction.getRequest().getFiles();
1270 if (selectedFiles.isEmpty()) {
1271 return null;
1272 }
1273
1274
1275 if (!Collections.disjoint(filesCompacting, selectedFiles)) {
1276 Preconditions.checkArgument(false, "%s overlaps with %s",
1277 selectedFiles, filesCompacting);
1278 }
1279 filesCompacting.addAll(selectedFiles);
1280 Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1281
1282
1283 boolean isMajor = selectedFiles.size() == this.getStorefilesCount();
1284 this.forceMajor = this.forceMajor && !isMajor;
1285
1286
1287
1288 compaction.getRequest().setPriority(
1289 (priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1290 compaction.getRequest().setIsMajor(isMajor);
1291 compaction.getRequest().setDescription(
1292 getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1293 }
1294 } finally {
1295 this.lock.readLock().unlock();
1296 }
1297
1298 LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating "
1299 + (compaction.getRequest().isMajor() ? "major" : "minor") + " compaction");
1300 this.region.reportCompactionRequestStart(compaction.getRequest().isMajor());
1301 return compaction;
1302 }
1303
1304 public void cancelRequestedCompaction(CompactionContext compaction) {
1305 finishCompactionRequest(compaction.getRequest());
1306 }
1307
1308 private void finishCompactionRequest(CompactionRequest cr) {
1309 this.region.reportCompactionRequestEnd(cr.isMajor());
1310 if (cr.isOffPeak()) {
1311 offPeakCompactionTracker.set(false);
1312 cr.setOffPeak(false);
1313 }
1314 synchronized (filesCompacting) {
1315 filesCompacting.removeAll(cr.getFiles());
1316 }
1317 }
1318
1319
1320
1321
1322
1323
1324
1325 private void validateStoreFile(Path path)
1326 throws IOException {
1327 StoreFile storeFile = null;
1328 try {
1329 createStoreFileAndReader(path, NoOpDataBlockEncoder.INSTANCE);
1330 } catch (IOException e) {
1331 LOG.error("Failed to open store file : " + path
1332 + ", keeping it in tmp location", e);
1333 throw e;
1334 } finally {
1335 if (storeFile != null) {
1336 storeFile.closeReader(false);
1337 }
1338 }
1339 }
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356 @VisibleForTesting
1357 protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1358 throws IOException {
1359 try {
1360
1361
1362
1363
1364 notifyChangedReadersObservers();
1365
1366
1367
1368 LOG.debug("Removing store files after compaction...");
1369 for (StoreFile compactedFile : compactedFiles) {
1370 compactedFile.closeReader(true);
1371 }
1372 this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
1373 } catch (IOException e) {
1374 e = RemoteExceptionHandler.checkIOException(e);
1375 LOG.error("Failed removing compacted files in " + this +
1376 ". Files we were trying to remove are " + compactedFiles.toString() +
1377 "; some of them may have been already removed", e);
1378 }
1379
1380
1381 this.storeSize = 0L;
1382 this.totalUncompressedBytes = 0L;
1383 for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1384 StoreFile.Reader r = hsf.getReader();
1385 if (r == null) {
1386 LOG.warn("StoreFile " + hsf + " has a null Reader");
1387 continue;
1388 }
1389 this.storeSize += r.length();
1390 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1391 }
1392 }
1393
1394
1395
1396
1397
1398 int versionsToReturn(final int wantedVersions) {
1399 if (wantedVersions <= 0) {
1400 throw new IllegalArgumentException("Number of versions must be > 0");
1401 }
1402
1403 int maxVersions = this.family.getMaxVersions();
1404 return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1405 }
1406
1407 static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
1408 return key.getTimestamp() < oldestTimestamp;
1409 }
1410
1411 @Override
1412 public KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException {
1413
1414
1415
1416
1417
1418
1419 long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.scanInfo.getTtl();
1420
1421 KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1422
1423 GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1424 this.comparator, kv, ttlToUse, this.getRegionInfo().isMetaRegion());
1425 this.lock.readLock().lock();
1426 try {
1427
1428 this.memstore.getRowKeyAtOrBefore(state);
1429
1430
1431 Iterator<StoreFile> sfIterator = this.storeEngine.getStoreFileManager()
1432 .getCandidateFilesForRowKeyBefore(state.getTargetKey());
1433 while (sfIterator.hasNext()) {
1434 StoreFile sf = sfIterator.next();
1435 sfIterator.remove();
1436 boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
1437 if (haveNewCandidate) {
1438
1439 sfIterator = this.storeEngine.getStoreFileManager().updateCandidateFilesForRowKeyBefore(
1440 sfIterator, state.getTargetKey(), state.getCandidate());
1441 }
1442 }
1443 return state.getCandidate();
1444 } finally {
1445 this.lock.readLock().unlock();
1446 }
1447 }
1448
1449
1450
1451
1452
1453
1454
1455
1456 private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
1457 final GetClosestRowBeforeTracker state)
1458 throws IOException {
1459 StoreFile.Reader r = f.getReader();
1460 if (r == null) {
1461 LOG.warn("StoreFile " + f + " has a null Reader");
1462 return false;
1463 }
1464 if (r.getEntries() == 0) {
1465 LOG.warn("StoreFile " + f + " is a empty store file");
1466 return false;
1467 }
1468
1469 byte [] fk = r.getFirstKey();
1470 if (fk == null) return false;
1471 KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1472 byte [] lk = r.getLastKey();
1473 KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1474 KeyValue firstOnRow = state.getTargetKey();
1475 if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1476
1477
1478 if (!state.isTargetTable(lastKV)) return false;
1479
1480
1481 firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1482 }
1483
1484 HFileScanner scanner = r.getScanner(true, true, false);
1485
1486 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1487
1488
1489 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1490
1491 while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1492 firstOnRow.getKeyLength())) {
1493 KeyValue kv = scanner.getKeyValue();
1494 if (!state.isTargetTable(kv)) break;
1495 if (!state.isBetterCandidate(kv)) break;
1496
1497 firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1498
1499 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1500
1501 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1502 }
1503 return false;
1504 }
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514 private boolean seekToScanner(final HFileScanner scanner,
1515 final KeyValue firstOnRow,
1516 final KeyValue firstKV)
1517 throws IOException {
1518 KeyValue kv = firstOnRow;
1519
1520 if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1521 int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(),
1522 kv.getKeyLength());
1523 return result >= 0;
1524 }
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536 private boolean walkForwardInSingleRow(final HFileScanner scanner,
1537 final KeyValue firstOnRow,
1538 final GetClosestRowBeforeTracker state)
1539 throws IOException {
1540 boolean foundCandidate = false;
1541 do {
1542 KeyValue kv = scanner.getKeyValue();
1543
1544 if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
1545
1546 if (state.isTooFar(kv, firstOnRow)) break;
1547 if (state.isExpired(kv)) {
1548 continue;
1549 }
1550
1551 if (state.handle(kv)) {
1552 foundCandidate = true;
1553 break;
1554 }
1555 } while(scanner.next());
1556 return foundCandidate;
1557 }
1558
1559 public boolean canSplit() {
1560 this.lock.readLock().lock();
1561 try {
1562
1563 boolean result = !hasReferences();
1564 if (!result && LOG.isDebugEnabled()) {
1565 LOG.debug("Cannot split region due to reference files being there");
1566 }
1567 return result;
1568 } finally {
1569 this.lock.readLock().unlock();
1570 }
1571 }
1572
1573 @Override
1574 public byte[] getSplitPoint() {
1575 this.lock.readLock().lock();
1576 try {
1577
1578 assert !this.getRegionInfo().isMetaRegion();
1579
1580 if (hasReferences()) {
1581 assert false : "getSplitPoint() called on a region that can't split!";
1582 return null;
1583 }
1584 return this.storeEngine.getStoreFileManager().getSplitPoint();
1585 } catch(IOException e) {
1586 LOG.warn("Failed getting store size for " + this, e);
1587 } finally {
1588 this.lock.readLock().unlock();
1589 }
1590 return null;
1591 }
1592
1593 @Override
1594 public long getLastCompactSize() {
1595 return this.lastCompactSize;
1596 }
1597
1598 @Override
1599 public long getSize() {
1600 return storeSize;
1601 }
1602
1603 public void triggerMajorCompaction() {
1604 this.forceMajor = true;
1605 }
1606
1607 boolean getForceMajorCompaction() {
1608 return this.forceMajor;
1609 }
1610
1611
1612
1613
1614
1615 @Override
1616 public KeyValueScanner getScanner(Scan scan,
1617 final NavigableSet<byte []> targetCols) throws IOException {
1618 lock.readLock().lock();
1619 try {
1620 KeyValueScanner scanner = null;
1621 if (this.getCoprocessorHost() != null) {
1622 scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
1623 }
1624 if (scanner == null) {
1625 scanner = new StoreScanner(this, getScanInfo(), scan, targetCols);
1626 }
1627 return scanner;
1628 } finally {
1629 lock.readLock().unlock();
1630 }
1631 }
1632
1633 @Override
1634 public String toString() {
1635 return this.getColumnFamilyName();
1636 }
1637
1638 @Override
1639
1640 public int getStorefilesCount() {
1641 return this.storeEngine.getStoreFileManager().getStorefileCount();
1642 }
1643
1644 @Override
1645 public long getStoreSizeUncompressed() {
1646 return this.totalUncompressedBytes;
1647 }
1648
1649 @Override
1650 public long getStorefilesSize() {
1651 long size = 0;
1652 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1653 StoreFile.Reader r = s.getReader();
1654 if (r == null) {
1655 LOG.warn("StoreFile " + s + " has a null Reader");
1656 continue;
1657 }
1658 size += r.length();
1659 }
1660 return size;
1661 }
1662
1663 @Override
1664 public long getStorefilesIndexSize() {
1665 long size = 0;
1666 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1667 StoreFile.Reader r = s.getReader();
1668 if (r == null) {
1669 LOG.warn("StoreFile " + s + " has a null Reader");
1670 continue;
1671 }
1672 size += r.indexSize();
1673 }
1674 return size;
1675 }
1676
1677 @Override
1678 public long getTotalStaticIndexSize() {
1679 long size = 0;
1680 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1681 size += s.getReader().getUncompressedDataIndexSize();
1682 }
1683 return size;
1684 }
1685
1686 @Override
1687 public long getTotalStaticBloomSize() {
1688 long size = 0;
1689 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1690 StoreFile.Reader r = s.getReader();
1691 size += r.getTotalBloomSize();
1692 }
1693 return size;
1694 }
1695
1696 @Override
1697 public long getMemStoreSize() {
1698 return this.memstore.heapSize();
1699 }
1700
1701 @Override
1702 public int getCompactPriority() {
1703 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
1704 if (priority == PRIORITY_USER) {
1705 LOG.warn("Compaction priority is USER despite there being no user compaction");
1706 }
1707 return priority;
1708 }
1709
1710 @Override
1711 public boolean throttleCompaction(long compactionSize) {
1712 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
1713 }
1714
1715 public HRegion getHRegion() {
1716 return this.region;
1717 }
1718
1719 @Override
1720 public RegionCoprocessorHost getCoprocessorHost() {
1721 return this.region.getCoprocessorHost();
1722 }
1723
1724 @Override
1725 public HRegionInfo getRegionInfo() {
1726 return this.fs.getRegionInfo();
1727 }
1728
1729 @Override
1730 public boolean areWritesEnabled() {
1731 return this.region.areWritesEnabled();
1732 }
1733
1734 @Override
1735 public long getSmallestReadPoint() {
1736 return this.region.getSmallestReadPoint();
1737 }
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752 public long updateColumnValue(byte [] row, byte [] f,
1753 byte [] qualifier, long newValue)
1754 throws IOException {
1755
1756 this.lock.readLock().lock();
1757 try {
1758 long now = EnvironmentEdgeManager.currentTimeMillis();
1759
1760 return this.memstore.updateColumnValue(row,
1761 f,
1762 qualifier,
1763 newValue,
1764 now);
1765
1766 } finally {
1767 this.lock.readLock().unlock();
1768 }
1769 }
1770
1771 @Override
1772 public long upsert(Iterable<? extends Cell> cells, long readpoint) throws IOException {
1773 this.lock.readLock().lock();
1774 try {
1775 return this.memstore.upsert(cells, readpoint);
1776 } finally {
1777 this.lock.readLock().unlock();
1778 }
1779 }
1780
1781 public StoreFlushContext createFlushContext(long cacheFlushId) {
1782 return new StoreFlusherImpl(cacheFlushId);
1783 }
1784
1785 private class StoreFlusherImpl implements StoreFlushContext {
1786
1787 private long cacheFlushSeqNum;
1788 private SortedSet<KeyValue> snapshot;
1789 private List<Path> tempFiles;
1790 private TimeRangeTracker snapshotTimeRangeTracker;
1791 private final AtomicLong flushedSize = new AtomicLong();
1792
1793 private StoreFlusherImpl(long cacheFlushSeqNum) {
1794 this.cacheFlushSeqNum = cacheFlushSeqNum;
1795 }
1796
1797 @Override
1798 public void prepare() {
1799 memstore.snapshot();
1800 this.snapshot = memstore.getSnapshot();
1801 this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
1802 }
1803
1804 @Override
1805 public void flushCache(MonitoredTask status) throws IOException {
1806 tempFiles = HStore.this.flushCache(
1807 cacheFlushSeqNum, snapshot, snapshotTimeRangeTracker, flushedSize, status);
1808 }
1809
1810 @Override
1811 public boolean commit(MonitoredTask status) throws IOException {
1812 if (this.tempFiles == null || this.tempFiles.isEmpty()) {
1813 return false;
1814 }
1815 List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
1816 for (Path storeFilePath : tempFiles) {
1817 try {
1818 storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum,
1819 snapshotTimeRangeTracker, flushedSize, status));
1820 } catch (IOException ex) {
1821 LOG.error("Failed to commit store file " + storeFilePath, ex);
1822
1823 for (StoreFile sf : storeFiles) {
1824 Path pathToDelete = sf.getPath();
1825 try {
1826 sf.deleteReader();
1827 } catch (IOException deleteEx) {
1828 LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
1829 Runtime.getRuntime().halt(1);
1830 }
1831 }
1832 throw new IOException("Failed to commit the flush", ex);
1833 }
1834 }
1835
1836 if (HStore.this.getCoprocessorHost() != null) {
1837 for (StoreFile sf : storeFiles) {
1838 HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
1839 }
1840 }
1841
1842 return HStore.this.updateStorefiles(storeFiles, snapshot);
1843 }
1844 }
1845
1846 @Override
1847 public boolean needsCompaction() {
1848 return storeEngine.getCompactionPolicy().needsCompaction(
1849 this.storeEngine.getStoreFileManager().getStorefiles(), filesCompacting);
1850 }
1851
1852 @Override
1853 public CacheConfig getCacheConfig() {
1854 return this.cacheConf;
1855 }
1856
1857 public static final long FIXED_OVERHEAD =
1858 ClassSize.align(ClassSize.OBJECT + (15 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
1859 + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
1860
1861 public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
1862 + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
1863 + ClassSize.CONCURRENT_SKIPLISTMAP
1864 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
1865 + ScanInfo.FIXED_OVERHEAD);
1866
1867 @Override
1868 public long heapSize() {
1869 return DEEP_OVERHEAD + this.memstore.heapSize();
1870 }
1871
1872 public KeyValue.KVComparator getComparator() {
1873 return comparator;
1874 }
1875
1876 @Override
1877 public ScanInfo getScanInfo() {
1878 return scanInfo;
1879 }
1880
1881
1882
1883
1884
1885 void setScanInfo(ScanInfo scanInfo) {
1886 this.scanInfo = scanInfo;
1887 }
1888
1889 @Override
1890 public boolean hasTooManyStoreFiles() {
1891 return getStorefilesCount() > this.blockingFileCount;
1892 }
1893 }