1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.Collections;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.NavigableSet;
29  import java.util.SortedSet;
30  import java.util.concurrent.Callable;
31  import java.util.concurrent.CompletionService;
32  import java.util.concurrent.CopyOnWriteArraySet;
33  import java.util.concurrent.ExecutionException;
34  import java.util.concurrent.ExecutorCompletionService;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicLong;
39  import java.util.concurrent.locks.ReentrantReadWriteLock;
40  
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.hadoop.classification.InterfaceAudience;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.hbase.Cell;
48  import org.apache.hadoop.hbase.CompoundConfiguration;
49  import org.apache.hadoop.hbase.HColumnDescriptor;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.RemoteExceptionHandler;
54  import org.apache.hadoop.hbase.client.Scan;
55  import org.apache.hadoop.hbase.exceptions.InvalidHFileException;
56  import org.apache.hadoop.hbase.exceptions.WrongRegionException;
57  import org.apache.hadoop.hbase.io.compress.Compression;
58  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
59  import org.apache.hadoop.hbase.io.hfile.HFile;
60  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
61  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
62  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
63  import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
64  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
65  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
66  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
67  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
68  import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
69  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
70  import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
71  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
72  import org.apache.hadoop.hbase.util.Bytes;
73  import org.apache.hadoop.hbase.util.ChecksumType;
74  import org.apache.hadoop.hbase.util.ClassSize;
75  import org.apache.hadoop.hbase.util.CollectionBackedScanner;
76  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77  import org.apache.hadoop.util.StringUtils;
78  
79  import com.google.common.annotations.VisibleForTesting;
80  import com.google.common.base.Preconditions;
81  import com.google.common.collect.ImmutableCollection;
82  import com.google.common.collect.ImmutableList;
83  import com.google.common.collect.Lists;
84  
85  /**
86   * A Store holds a column family in a Region.  Its a memstore and a set of zero
87   * or more StoreFiles, which stretch backwards over time.
88   *
89   * <p>There's no reason to consider append-logging at this level; all logging
90   * and locking is handled at the HRegion level.  Store just provides
91   * services to manage sets of StoreFiles.  One of the most important of those
92   * services is compaction services where files are aggregated once they pass
93   * a configurable threshold.
94   *
95   * <p>The only thing having to do with logs that Store needs to deal with is
96   * the reconstructionLog.  This is a segment of an HRegion's log that might
97   * NOT be present upon startup.  If the param is NULL, there's nothing to do.
98   * If the param is non-NULL, we need to process the log to reconstruct
99   * a TreeMap that might not have been written to disk before the process
100  * died.
101  *
102  * <p>It's assumed that after this constructor returns, the reconstructionLog
103  * file will be deleted (by whoever has instantiated the Store).
104  *
105  * <p>Locking and transactions are handled at a higher level.  This API should
106  * not be called directly but by an HRegion manager.
107  */
108 @InterfaceAudience.Private
109 public class HStore implements Store {
110   public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
111       "hbase.server.compactchecker.interval.multiplier";
112   public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
113   public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
114   public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
115 
116   static final Log LOG = LogFactory.getLog(HStore.class);
117 
118   protected final MemStore memstore;
119   // This stores directory in the filesystem.
120   private final HRegion region;
121   private final HColumnDescriptor family;
122   private final HRegionFileSystem fs;
123   private final Configuration conf;
124   private final CacheConfig cacheConf;
125   private long lastCompactSize = 0;
126   volatile boolean forceMajor = false;
127   /* how many bytes to write between status checks */
128   static int closeCheckInterval = 0;
129   private volatile long storeSize = 0L;
130   private volatile long totalUncompressedBytes = 0L;
131 
132   /**
133    * RWLock for store operations.
134    * Locked in shared mode when the list of component stores is looked at:
135    *   - all reads/writes to table data
136    *   - checking for split
137    * Locked in exclusive mode when the list of component stores is modified:
138    *   - closing
139    *   - completing a compaction
140    */
141   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
142   private final boolean verifyBulkLoads;
143 
144   private ScanInfo scanInfo;
145 
146   final List<StoreFile> filesCompacting = Lists.newArrayList();
147 
148   // All access must be synchronized.
149   private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
150     new CopyOnWriteArraySet<ChangedReadersObserver>();
151 
152   private final int blocksize;
153   private HFileDataBlockEncoder dataBlockEncoder;
154 
155   /** Checksum configuration */
156   private ChecksumType checksumType;
157   private int bytesPerChecksum;
158 
159   // Comparing KeyValues
160   private final KeyValue.KVComparator comparator;
161 
162   final StoreEngine<?, ?, ?, ?> storeEngine;
163 
164   private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
165   private final OffPeakHours offPeakHours;
166 
167   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
168   private int flushRetriesNumber;
169   private int pauseTime;
170 
171   private long blockingFileCount;
172   private int compactionCheckMultiplier;
173 
174   /**
175    * Constructor
176    * @param region
177    * @param family HColumnDescriptor for this column
178    * @param confParam configuration object
179    * failed.  Can be null.
180    * @throws IOException
181    */
182   protected HStore(final HRegion region, final HColumnDescriptor family,
183       final Configuration confParam) throws IOException {
184 
185     HRegionInfo info = region.getRegionInfo();
186     this.fs = region.getRegionFileSystem();
187 
188     // Assemble the store's home directory and Ensure it exists.
189     fs.createStoreDir(family.getNameAsString());
190     this.region = region;
191     this.family = family;
192     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
193     // CompoundConfiguration will look for keys in reverse order of addition, so we'd
194     // add global config first, then table and cf overrides, then cf metadata.
195     this.conf = new CompoundConfiguration()
196       .add(confParam)
197       .addStringMap(region.getTableDesc().getConfiguration())
198       .addStringMap(family.getConfiguration())
199       .addWritableMap(family.getValues());
200     this.blocksize = family.getBlocksize();
201 
202     this.dataBlockEncoder =
203         new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
204             family.getDataBlockEncoding());
205 
206     this.comparator = info.getComparator();
207     // used by ScanQueryMatcher
208     long timeToPurgeDeletes =
209         Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
210     LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
211         "ms in store " + this);
212     // Get TTL
213     long ttl = determineTTLFromFamily(family);
214     // Why not just pass a HColumnDescriptor in here altogether?  Even if have
215     // to clone it?
216     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
217     this.memstore = new MemStore(conf, this.comparator);
218     this.offPeakHours = OffPeakHours.getInstance(conf);
219 
220     // Setting up cache configuration for this family
221     this.cacheConf = new CacheConfig(conf, family);
222 
223     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
224 
225     this.blockingFileCount =
226         conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
227     this.compactionCheckMultiplier = conf.getInt(
228         COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
229     if (this.compactionCheckMultiplier <= 0) {
230       LOG.error("Compaction check period multiplier must be positive, setting default: "
231           + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
232       this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
233     }
234 
235     if (HStore.closeCheckInterval == 0) {
236       HStore.closeCheckInterval = conf.getInt(
237           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
238     }
239 
240     this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
241     this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
242 
243     // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
244     this.checksumType = getChecksumType(conf);
245     // initilize bytes per checksum
246     this.bytesPerChecksum = getBytesPerChecksum(conf);
247     flushRetriesNumber = conf.getInt(
248         "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
249     pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
250     if (flushRetriesNumber <= 0) {
251       throw new IllegalArgumentException(
252           "hbase.hstore.flush.retries.number must be > 0, not "
253               + flushRetriesNumber);
254     }
255   }
256 
257   /**
258    * @param family
259    * @return TTL in seconds of the specified family
260    */
261   private static long determineTTLFromFamily(final HColumnDescriptor family) {
262     // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
263     long ttl = family.getTimeToLive();
264     if (ttl == HConstants.FOREVER) {
265       // Default is unlimited ttl.
266       ttl = Long.MAX_VALUE;
267     } else if (ttl == -1) {
268       ttl = Long.MAX_VALUE;
269     } else {
270       // Second -> ms adjust for user data
271       ttl *= 1000;
272     }
273     return ttl;
274   }
275 
276   public String getColumnFamilyName() {
277     return this.family.getNameAsString();
278   }
279 
280   @Override
281   public String getTableName() {
282     return this.getRegionInfo().getTableNameAsString();
283   }
284 
285   @Override
286   public FileSystem getFileSystem() {
287     return this.fs.getFileSystem();
288   }
289 
290   public HRegionFileSystem getRegionFileSystem() {
291     return this.fs;
292   }
293 
294   /* Implementation of StoreConfigInformation */
295   @Override
296   public long getStoreFileTtl() {
297     // TTL only applies if there's no MIN_VERSIONs setting on the column.
298     return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
299   }
300 
301   @Override
302   public long getMemstoreFlushSize() {
303     return this.region.memstoreFlushSize;
304   }
305 
306   @Override
307   public long getCompactionCheckMultiplier() {
308     return this.compactionCheckMultiplier;
309   }
310 
311   public long getBlockingFileCount() {
312     return blockingFileCount;
313   }
314   /* End implementation of StoreConfigInformation */
315 
316   /**
317    * Returns the configured bytesPerChecksum value.
318    * @param conf The configuration
319    * @return The bytesPerChecksum that is set in the configuration
320    */
321   public static int getBytesPerChecksum(Configuration conf) {
322     return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
323                        HFile.DEFAULT_BYTES_PER_CHECKSUM);
324   }
325 
326   /**
327    * Returns the configured checksum algorithm.
328    * @param conf The configuration
329    * @return The checksum algorithm that is set in the configuration
330    */
331   public static ChecksumType getChecksumType(Configuration conf) {
332     String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
333     if (checksumName == null) {
334       return HFile.DEFAULT_CHECKSUM_TYPE;
335     } else {
336       return ChecksumType.nameToType(checksumName);
337     }
338   }
339 
340   /**
341    * @return how many bytes to write between status checks
342    */
343   public static int getCloseCheckInterval() {
344     return closeCheckInterval;
345   }
346 
347   public HColumnDescriptor getFamily() {
348     return this.family;
349   }
350 
351   /**
352    * @return The maximum sequence id in all store files. Used for log replay.
353    */
354   long getMaxSequenceId(boolean includeBulkFiles) {
355     return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles);
356   }
357 
358   @Override
359   public long getMaxMemstoreTS() {
360     return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
361   }
362 
363   /**
364    * @param tabledir {@link Path} to where the table is being stored
365    * @param hri {@link HRegionInfo} for the region.
366    * @param family {@link HColumnDescriptor} describing the column family
367    * @return Path to family/Store home directory.
368    */
369   @Deprecated
370   public static Path getStoreHomedir(final Path tabledir,
371       final HRegionInfo hri, final byte[] family) {
372     return getStoreHomedir(tabledir, hri.getEncodedName(), family);
373   }
374 
375   /**
376    * @param tabledir {@link Path} to where the table is being stored
377    * @param encodedName Encoded region name.
378    * @param family {@link HColumnDescriptor} describing the column family
379    * @return Path to family/Store home directory.
380    */
381   @Deprecated
382   public static Path getStoreHomedir(final Path tabledir,
383       final String encodedName, final byte[] family) {
384     return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
385   }
386 
387   @Override
388   public HFileDataBlockEncoder getDataBlockEncoder() {
389     return dataBlockEncoder;
390   }
391 
392   /**
393    * Should be used only in tests.
394    * @param blockEncoder the block delta encoder to use
395    */
396   void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
397     this.dataBlockEncoder = blockEncoder;
398   }
399 
400   /**
401    * Creates an unsorted list of StoreFile loaded in parallel
402    * from the given directory.
403    * @throws IOException
404    */
405   private List<StoreFile> loadStoreFiles() throws IOException {
406     Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
407     if (files == null || files.size() == 0) {
408       return new ArrayList<StoreFile>();
409     }
410 
411     // initialize the thread pool for opening store files in parallel..
412     ThreadPoolExecutor storeFileOpenerThreadPool =
413       this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
414           this.getColumnFamilyName());
415     CompletionService<StoreFile> completionService =
416       new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
417 
418     int totalValidStoreFile = 0;
419     for (final StoreFileInfo storeFileInfo: files) {
420       // open each store file in parallel
421       completionService.submit(new Callable<StoreFile>() {
422         public StoreFile call() throws IOException {
423           StoreFile storeFile = createStoreFileAndReader(storeFileInfo.getPath());
424           return storeFile;
425         }
426       });
427       totalValidStoreFile++;
428     }
429 
430     ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
431     IOException ioe = null;
432     try {
433       for (int i = 0; i < totalValidStoreFile; i++) {
434         try {
435           Future<StoreFile> future = completionService.take();
436           StoreFile storeFile = future.get();
437           long length = storeFile.getReader().length();
438           this.storeSize += length;
439           this.totalUncompressedBytes +=
440               storeFile.getReader().getTotalUncompressedBytes();
441           if (LOG.isDebugEnabled()) {
442             LOG.debug("loaded " + storeFile.toStringDetailed());
443           }
444           results.add(storeFile);
445         } catch (InterruptedException e) {
446           if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
447         } catch (ExecutionException e) {
448           if (ioe == null) ioe = new IOException(e.getCause());
449         }
450       }
451     } finally {
452       storeFileOpenerThreadPool.shutdownNow();
453     }
454     if (ioe != null) {
455       // close StoreFile readers
456       try {
457         for (StoreFile file : results) {
458           if (file != null) file.closeReader(true);
459         }
460       } catch (IOException e) { }
461       throw ioe;
462     }
463 
464     return results;
465   }
466 
467   private StoreFile createStoreFileAndReader(final Path p) throws IOException {
468     return createStoreFileAndReader(p, this.dataBlockEncoder);
469   }
470 
471   private StoreFile createStoreFileAndReader(final Path p, final HFileDataBlockEncoder encoder) throws IOException {
472     StoreFile storeFile = new StoreFile(this.getFileSystem(), p, this.conf, this.cacheConf,
473         this.family.getBloomFilterType(), encoder);
474     storeFile.createReader();
475     return storeFile;
476   }
477 
478   @Override
479   public long add(final KeyValue kv) {
480     lock.readLock().lock();
481     try {
482       return this.memstore.add(kv);
483     } finally {
484       lock.readLock().unlock();
485     }
486   }
487 
488   @Override
489   public long timeOfOldestEdit() {
490     return memstore.timeOfOldestEdit();
491   }
492 
493   /**
494    * Adds a value to the memstore
495    *
496    * @param kv
497    * @return memstore size delta
498    */
499   protected long delete(final KeyValue kv) {
500     lock.readLock().lock();
501     try {
502       return this.memstore.delete(kv);
503     } finally {
504       lock.readLock().unlock();
505     }
506   }
507 
508   @Override
509   public void rollback(final KeyValue kv) {
510     lock.readLock().lock();
511     try {
512       this.memstore.rollback(kv);
513     } finally {
514       lock.readLock().unlock();
515     }
516   }
517 
518   /**
519    * @return All store files.
520    */
521   @Override
522   public Collection<StoreFile> getStorefiles() {
523     return this.storeEngine.getStoreFileManager().getStorefiles();
524   }
525 
526   @Override
527   public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
528     HFile.Reader reader  = null;
529     try {
530       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
531           + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
532       reader = HFile.createReader(srcPath.getFileSystem(conf),
533           srcPath, cacheConf);
534       reader.loadFileInfo();
535 
536       byte[] firstKey = reader.getFirstRowKey();
537       Preconditions.checkState(firstKey != null, "First key can not be null");
538       byte[] lk = reader.getLastKey();
539       Preconditions.checkState(lk != null, "Last key can not be null");
540       byte[] lastKey =  KeyValue.createKeyValueFromKey(lk).getRow();
541 
542       LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
543           " last=" + Bytes.toStringBinary(lastKey));
544       LOG.debug("Region bounds: first=" +
545           Bytes.toStringBinary(getRegionInfo().getStartKey()) +
546           " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
547 
548       if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
549         throw new WrongRegionException(
550             "Bulk load file " + srcPath.toString() + " does not fit inside region "
551             + this.getRegionInfo().getRegionNameAsString());
552       }
553 
554       if (verifyBulkLoads) {
555         KeyValue prevKV = null;
556         HFileScanner scanner = reader.getScanner(false, false, false);
557         scanner.seekTo();
558         do {
559           KeyValue kv = scanner.getKeyValue();
560           if (prevKV != null) {
561             if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getRowOffset(),
562                 prevKV.getRowLength(), kv.getBuffer(), kv.getRowOffset(),
563                 kv.getRowLength()) > 0) {
564               throw new InvalidHFileException("Previous row is greater than"
565                   + " current row: path=" + srcPath + " previous="
566                   + Bytes.toStringBinary(prevKV.getKey()) + " current="
567                   + Bytes.toStringBinary(kv.getKey()));
568             }
569             if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getFamilyOffset(),
570                 prevKV.getFamilyLength(), kv.getBuffer(), kv.getFamilyOffset(),
571                 kv.getFamilyLength()) != 0) {
572               throw new InvalidHFileException("Previous key had different"
573                   + " family compared to current key: path=" + srcPath
574                   + " previous=" + Bytes.toStringBinary(prevKV.getFamily())
575                   + " current=" + Bytes.toStringBinary(kv.getFamily()));
576             }
577           }
578           prevKV = kv;
579         } while (scanner.next());
580       }
581     } finally {
582       if (reader != null) reader.close();
583     }
584   }
585 
586   @Override
587   public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
588     Path srcPath = new Path(srcPathStr);
589     Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
590 
591     StoreFile sf = createStoreFileAndReader(dstPath);
592 
593     StoreFile.Reader r = sf.getReader();
594     this.storeSize += r.length();
595     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
596 
597     LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() +
598         "' as " + dstPath + " - updating store file list.");
599 
600     // Append the new storefile into the list
601     this.lock.writeLock().lock();
602     try {
603       this.storeEngine.getStoreFileManager().insertNewFile(sf);
604     } finally {
605       // We need the lock, as long as we are updating the storeFiles
606       // or changing the memstore. Let us release it before calling
607       // notifyChangeReadersObservers. See HBASE-4485 for a possible
608       // deadlock scenario that could have happened if continue to hold
609       // the lock.
610       this.lock.writeLock().unlock();
611     }
612     notifyChangedReadersObservers();
613     LOG.info("Successfully loaded store file " + srcPath
614         + " into store " + this + " (new location: " + dstPath + ")");
615   }
616 
617   @Override
618   public ImmutableCollection<StoreFile> close() throws IOException {
619     this.lock.writeLock().lock();
620     try {
621       // Clear so metrics doesn't find them.
622       ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
623 
624       if (!result.isEmpty()) {
625         // initialize the thread pool for closing store files in parallel.
626         ThreadPoolExecutor storeFileCloserThreadPool = this.region
627             .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
628                 + this.getColumnFamilyName());
629 
630         // close each store file in parallel
631         CompletionService<Void> completionService =
632           new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
633         for (final StoreFile f : result) {
634           completionService.submit(new Callable<Void>() {
635             public Void call() throws IOException {
636               f.closeReader(true);
637               return null;
638             }
639           });
640         }
641 
642         IOException ioe = null;
643         try {
644           for (int i = 0; i < result.size(); i++) {
645             try {
646               Future<Void> future = completionService.take();
647               future.get();
648             } catch (InterruptedException e) {
649               if (ioe == null) {
650                 ioe = new InterruptedIOException();
651                 ioe.initCause(e);
652               }
653             } catch (ExecutionException e) {
654               if (ioe == null) ioe = new IOException(e.getCause());
655             }
656           }
657         } finally {
658           storeFileCloserThreadPool.shutdownNow();
659         }
660         if (ioe != null) throw ioe;
661       }
662       LOG.info("Closed " + this);
663       return result;
664     } finally {
665       this.lock.writeLock().unlock();
666     }
667   }
668 
669   /**
670    * Snapshot this stores memstore. Call before running
671    * {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)}
672    *  so it has some work to do.
673    */
674   void snapshot() {
675     this.memstore.snapshot();
676   }
677 
678   /**
679    * Write out current snapshot.  Presumes {@link #snapshot()} has been called
680    * previously.
681    * @param logCacheFlushId flush sequence number
682    * @param snapshot
683    * @param snapshotTimeRangeTracker
684    * @param flushedSize The number of bytes flushed
685    * @param status
686    * @return The path name of the tmp file to which the store was flushed
687    * @throws IOException
688    */
689   protected List<Path> flushCache(final long logCacheFlushId,
690       SortedSet<KeyValue> snapshot,
691       TimeRangeTracker snapshotTimeRangeTracker,
692       AtomicLong flushedSize,
693       MonitoredTask status) throws IOException {
694     // If an exception happens flushing, we let it out without clearing
695     // the memstore snapshot.  The old snapshot will be returned when we say
696     // 'snapshot', the next time flush comes around.
697     // Retry after catching exception when flushing, otherwise server will abort
698     // itself
699     StoreFlusher flusher = storeEngine.getStoreFlusher();
700     IOException lastException = null;
701     for (int i = 0; i < flushRetriesNumber; i++) {
702       try {
703         List<Path> pathNames = flusher.flushSnapshot(
704             snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
705         Path lastPathName = null;
706         try {
707           for (Path pathName : pathNames) {
708             lastPathName = pathName;
709             validateStoreFile(pathName);
710           }
711           return pathNames;
712         } catch (Exception e) {
713           LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
714           if (e instanceof IOException) {
715             lastException = (IOException) e;
716           } else {
717             lastException = new IOException(e);
718           }
719         }
720       } catch (IOException e) {
721         LOG.warn("Failed flushing store file, retring num=" + i, e);
722         lastException = e;
723       }
724       if (lastException != null) {
725         try {
726           Thread.sleep(pauseTime);
727         } catch (InterruptedException e) {
728           IOException iie = new InterruptedIOException();
729           iie.initCause(e);
730           throw iie;
731         }
732       }
733     }
734     throw lastException;
735   }
736 
737   /*
738    * @param path The pathname of the tmp file into which the store was flushed
739    * @param logCacheFlushId
740    * @return StoreFile created.
741    * @throws IOException
742    */
743   private StoreFile commitFile(final Path path,
744       final long logCacheFlushId,
745       TimeRangeTracker snapshotTimeRangeTracker,
746       AtomicLong flushedSize,
747       MonitoredTask status)
748       throws IOException {
749     // Write-out finished successfully, move into the right spot
750     Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
751 
752     status.setStatus("Flushing " + this + ": reopening flushed file");
753     StoreFile sf = createStoreFileAndReader(dstPath);
754 
755     StoreFile.Reader r = sf.getReader();
756     this.storeSize += r.length();
757     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
758 
759     if (LOG.isInfoEnabled()) {
760       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
761         ", sequenceid=" + logCacheFlushId +
762         ", filesize=" + StringUtils.humanReadableInt(r.length()));
763     }
764     return sf;
765   }
766 
767   /*
768    * @param maxKeyCount
769    * @return Writer for a new StoreFile in the tmp dir.
770    */
771   private StoreFile.Writer createWriterInTmp(long maxKeyCount)
772   throws IOException {
773     return createWriterInTmp(maxKeyCount, this.family.getCompression(), false, true);
774   }
775 
776   /*
777    * @param maxKeyCount
778    * @param compression Compression algorithm to use
779    * @param isCompaction whether we are creating a new file in a compaction
780    * @return Writer for a new StoreFile in the tmp dir.
781    */
782   public StoreFile.Writer createWriterInTmp(long maxKeyCount,
783     Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint)
784   throws IOException {
785     final CacheConfig writerCacheConf;
786     if (isCompaction) {
787       // Don't cache data on write on compactions.
788       writerCacheConf = new CacheConfig(cacheConf);
789       writerCacheConf.setCacheDataOnWrite(false);
790     } else {
791       writerCacheConf = cacheConf;
792     }
793     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
794         this.getFileSystem(), blocksize)
795             .withFilePath(fs.createTempName())
796             .withDataBlockEncoder(dataBlockEncoder)
797             .withComparator(comparator)
798             .withBloomType(family.getBloomFilterType())
799             .withMaxKeyCount(maxKeyCount)
800             .withChecksumType(checksumType)
801             .withBytesPerChecksum(bytesPerChecksum)
802             .withCompression(compression)
803             .includeMVCCReadpoint(includeMVCCReadpoint)
804             .build();
805     return w;
806   }
807 
808   /*
809    * Change storeFiles adding into place the Reader produced by this new flush.
810    * @param sfs Store files
811    * @param set That was used to make the passed file.
812    * @throws IOException
813    * @return Whether compaction is required.
814    */
815   private boolean updateStorefiles(
816       final List<StoreFile> sfs, final SortedSet<KeyValue> set) throws IOException {
817     this.lock.writeLock().lock();
818     try {
819       for (StoreFile sf : sfs) {
820         this.storeEngine.getStoreFileManager().insertNewFile(sf);
821       }
822       this.memstore.clearSnapshot(set);
823     } finally {
824       // We need the lock, as long as we are updating the storeFiles
825       // or changing the memstore. Let us release it before calling
826       // notifyChangeReadersObservers. See HBASE-4485 for a possible
827       // deadlock scenario that could have happened if continue to hold
828       // the lock.
829       this.lock.writeLock().unlock();
830     }
831 
832     // Tell listeners of the change in readers.
833     notifyChangedReadersObservers();
834 
835     return needsCompaction();
836   }
837 
838   /*
839    * Notify all observers that set of Readers has changed.
840    * @throws IOException
841    */
842   private void notifyChangedReadersObservers() throws IOException {
843     for (ChangedReadersObserver o: this.changedReaderObservers) {
844       o.updateReaders();
845     }
846   }
847 
848   /**
849    * Get all scanners with no filtering based on TTL (that happens further down
850    * the line).
851    * @return all scanners for this store
852    */
853   @Override
854   public List<KeyValueScanner> getScanners(boolean cacheBlocks,
855       boolean isGet, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
856       byte[] stopRow) throws IOException {
857     Collection<StoreFile> storeFilesToScan;
858     List<KeyValueScanner> memStoreScanners;
859     this.lock.readLock().lock();
860     try {
861       storeFilesToScan =
862           this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
863       memStoreScanners = this.memstore.getScanners();
864     } finally {
865       this.lock.readLock().unlock();
866     }
867 
868     // First the store file scanners
869 
870     // TODO this used to get the store files in descending order,
871     // but now we get them in ascending order, which I think is
872     // actually more correct, since memstore get put at the end.
873     List<StoreFileScanner> sfScanners = StoreFileScanner
874       .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, isGet, isCompaction, matcher);
875     List<KeyValueScanner> scanners =
876       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
877     scanners.addAll(sfScanners);
878     // Then the memstore scanners
879     scanners.addAll(memStoreScanners);
880     return scanners;
881   }
882 
883   @Override
884   public void addChangedReaderObserver(ChangedReadersObserver o) {
885     this.changedReaderObservers.add(o);
886   }
887 
888   @Override
889   public void deleteChangedReaderObserver(ChangedReadersObserver o) {
890     // We don't check if observer present; it may not be (legitimately)
891     this.changedReaderObservers.remove(o);
892   }
893 
894   //////////////////////////////////////////////////////////////////////////////
895   // Compaction
896   //////////////////////////////////////////////////////////////////////////////
897 
898   /**
899    * Compact the StoreFiles.  This method may take some time, so the calling
900    * thread must be able to block for long periods.
901    *
902    * <p>During this time, the Store can work as usual, getting values from
903    * StoreFiles and writing new StoreFiles from the memstore.
904    *
905    * Existing StoreFiles are not destroyed until the new compacted StoreFile is
906    * completely written-out to disk.
907    *
908    * <p>The compactLock prevents multiple simultaneous compactions.
909    * The structureLock prevents us from interfering with other write operations.
910    *
911    * <p>We don't want to hold the structureLock for the whole time, as a compact()
912    * can be lengthy and we want to allow cache-flushes during this period.
913    *
914    * <p> Compaction event should be idempotent, since there is no IO Fencing for
915    * the region directory in hdfs. A region server might still try to complete the
916    * compaction after it lost the region. That is why the following events are carefully
917    * ordered for a compaction:
918    *  1. Compaction writes new files under region/.tmp directory (compaction output)
919    *  2. Compaction atomically moves the temporary file under region directory
920    *  3. Compaction appends a WAL edit containing the compaction input and output files.
921    *  Forces sync on WAL.
922    *  4. Compaction deletes the input files from the region directory.
923    *
924    * Failure conditions are handled like this:
925    *  - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes
926    *  the compaction later, it will only write the new data file to the region directory.
927    *  Since we already have this data, this will be idempotent but we will have a redundant
928    *  copy of the data.
929    *  - If RS fails between 2 and 3, the region will have a redundant copy of the data. The
930    *  RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL.
931    *  - If RS fails after 3, the region region server who opens the region will pick up the
932    *  the compaction marker from the WAL and replay it by removing the compaction input files.
933    *  Failed RS can also attempt to delete those files, but the operation will be idempotent
934    *
935    * See HBASE-2231 for details.
936    *
937    * @param compaction compaction details obtained from requestCompaction()
938    * @throws IOException
939    * @return Storefile we compacted into or null if we failed or opted out early.
940    */
941   public List<StoreFile> compact(CompactionContext compaction) throws IOException {
942     assert compaction != null && compaction.hasSelection();
943     CompactionRequest cr = compaction.getRequest();
944     Collection<StoreFile> filesToCompact = cr.getFiles();
945     assert !filesToCompact.isEmpty();
946     synchronized (filesCompacting) {
947       // sanity check: we're compacting files that this store knows about
948       // TODO: change this to LOG.error() after more debugging
949       Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
950     }
951 
952     // Ready to go. Have list of files to compact.
953     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
954         + this + " of " + this.getRegionInfo().getRegionNameAsString()
955         + " into tmpdir=" + fs.getTempDir() + ", totalSize="
956         + StringUtils.humanReadableInt(cr.getSize()));
957 
958     long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
959     List<StoreFile> sfs = null;
960     try {
961       // Commence the compaction.
962       List<Path> newFiles = compaction.compact();
963       // TODO: get rid of this!
964       if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
965         LOG.warn("hbase.hstore.compaction.complete is set to false");
966         sfs = new ArrayList<StoreFile>();
967         for (Path newFile : newFiles) {
968           // Create storefile around what we wrote with a reader on it.
969           StoreFile sf = createStoreFileAndReader(newFile);
970           sfs.add(sf);
971         }
972         return sfs;
973       }
974       // Do the steps necessary to complete the compaction.
975       sfs = moveCompatedFilesIntoPlace(cr, newFiles);
976       writeCompactionWalRecord(filesToCompact, sfs);
977       replaceStoreFiles(filesToCompact, sfs);
978       // At this point the store will use new files for all new scanners.
979       completeCompaction(filesToCompact); // Archive old files & update store size.
980     } finally {
981       finishCompactionRequest(cr);
982     }
983     logCompactionEndMessage(cr, sfs, compactionStartTime);
984     return sfs;
985   }
986 
987   private List<StoreFile> moveCompatedFilesIntoPlace(
988       CompactionRequest cr, List<Path> newFiles) throws IOException {
989     List<StoreFile> sfs = new ArrayList<StoreFile>();
990     for (Path newFile : newFiles) {
991       assert newFile != null;
992       StoreFile sf = moveFileIntoPlace(newFile);
993       if (this.getCoprocessorHost() != null) {
994         this.getCoprocessorHost().postCompact(this, sf, cr);
995       }
996       assert sf != null;
997       sfs.add(sf);
998     }
999     return sfs;
1000   }
1001 
1002   // Package-visible for tests
1003   StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1004     validateStoreFile(newFile);
1005     // Move the file into the right spot
1006     Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1007     return createStoreFileAndReader(destPath);
1008   }
1009 
1010   /**
1011    * Writes the compaction WAL record.
1012    * @param filesCompacted Files compacted (input).
1013    * @param newFiles Files from compaction.
1014    */
1015   private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1016       Collection<StoreFile> newFiles) throws IOException {
1017     if (region.getLog() == null) return;
1018     List<Path> inputPaths = new ArrayList<Path>();
1019     for (StoreFile f : filesCompacted) {
1020       inputPaths.add(f.getPath());
1021     }
1022     List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1023     for (StoreFile f : newFiles) {
1024       outputPaths.add(f.getPath());
1025     }
1026     HRegionInfo info = this.region.getRegionInfo();
1027     CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1028         family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1029     HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
1030         this.region.getRegionInfo(), compactionDescriptor);
1031   }
1032 
1033   private void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1034       final Collection<StoreFile> result) throws IOException {
1035     this.lock.writeLock().lock();
1036     try {
1037       this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1038       filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock();
1039     } finally {
1040       this.lock.writeLock().unlock();
1041     }
1042   }
1043 
1044   /**
1045    * Log a very elaborate compaction completion message.
1046    * @param cr Request.
1047    * @param sfs Resulting files.
1048    * @param compactionStartTime Start time.
1049    */
1050   private void logCompactionEndMessage(
1051       CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
1052     long now = EnvironmentEdgeManager.currentTimeMillis();
1053     StringBuilder message = new StringBuilder(
1054       "Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
1055       + cr.getFiles().size() + " file(s) in " + this + " of "
1056       + this.getRegionInfo().getRegionNameAsString()
1057       + " into ");
1058     if (sfs.isEmpty()) {
1059       message.append("none, ");
1060     } else {
1061       for (StoreFile sf: sfs) {
1062         message.append(sf.getPath().getName());
1063         message.append("(size=");
1064         message.append(StringUtils.humanReadableInt(sf.getReader().length()));
1065         message.append("), ");
1066       }
1067     }
1068     message.append("total size for store is ")
1069       .append(StringUtils.humanReadableInt(storeSize))
1070       .append(". This selection was in queue for ")
1071       .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1072       .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1073       .append(" to execute.");
1074     LOG.info(message.toString());
1075   }
1076 
1077   /**
1078    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
1079    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
1080    * See HBASE-2331.
1081    * @param compaction
1082    */
1083   public void completeCompactionMarker(CompactionDescriptor compaction)
1084       throws IOException {
1085     LOG.debug("Completing compaction from the WAL marker");
1086     List<String> compactionInputs = compaction.getCompactionInputList();
1087     List<String> compactionOutputs = compaction.getCompactionOutputList();
1088 
1089     List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
1090     for (String compactionOutput : compactionOutputs) {
1091       //we should have this store file already
1092       boolean found = false;
1093       Path outputPath = new Path(fs.getStoreDir(family.getNameAsString()), compactionOutput);
1094       outputPath = outputPath.makeQualified(fs.getFileSystem());
1095       for (StoreFile sf : this.getStorefiles()) {
1096         if (sf.getPath().makeQualified(sf.getPath().getFileSystem(conf)).equals(outputPath)) {
1097           found = true;
1098           break;
1099         }
1100       }
1101       if (!found) {
1102         if (getFileSystem().exists(outputPath)) {
1103           outputStoreFiles.add(createStoreFileAndReader(outputPath));
1104         }
1105       }
1106     }
1107 
1108     List<Path> inputPaths = new ArrayList<Path>(compactionInputs.size());
1109     for (String compactionInput : compactionInputs) {
1110       Path inputPath = new Path(fs.getStoreDir(family.getNameAsString()), compactionInput);
1111       inputPath = inputPath.makeQualified(fs.getFileSystem());
1112       inputPaths.add(inputPath);
1113     }
1114 
1115     //some of the input files might already be deleted
1116     List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1117     for (StoreFile sf : this.getStorefiles()) {
1118       if (inputPaths.contains(sf.getPath().makeQualified(fs.getFileSystem()))) {
1119         inputStoreFiles.add(sf);
1120       }
1121     }
1122 
1123     this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1124     this.completeCompaction(inputStoreFiles);
1125   }
1126 
1127   /**
1128    * This method tries to compact N recent files for testing.
1129    * Note that because compacting "recent" files only makes sense for some policies,
1130    * e.g. the default one, it assumes default policy is used. It doesn't use policy,
1131    * but instead makes a compaction candidate list by itself.
1132    * @param N Number of files.
1133    */
1134   public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1135     List<StoreFile> filesToCompact;
1136     boolean isMajor;
1137 
1138     this.lock.readLock().lock();
1139     try {
1140       synchronized (filesCompacting) {
1141         filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1142         if (!filesCompacting.isEmpty()) {
1143           // exclude all files older than the newest file we're currently
1144           // compacting. this allows us to preserve contiguity (HBASE-2856)
1145           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1146           int idx = filesToCompact.indexOf(last);
1147           Preconditions.checkArgument(idx != -1);
1148           filesToCompact.subList(0, idx + 1).clear();
1149         }
1150         int count = filesToCompact.size();
1151         if (N > count) {
1152           throw new RuntimeException("Not enough files");
1153         }
1154 
1155         filesToCompact = filesToCompact.subList(count - N, count);
1156         isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1157         filesCompacting.addAll(filesToCompact);
1158         Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1159       }
1160     } finally {
1161       this.lock.readLock().unlock();
1162     }
1163 
1164     try {
1165       // Ready to go. Have list of files to compact.
1166       List<Path> newFiles =
1167           this.storeEngine.getCompactor().compactForTesting(filesToCompact, isMajor);
1168       for (Path newFile: newFiles) {
1169         // Move the compaction into place.
1170         StoreFile sf = moveFileIntoPlace(newFile);
1171         if (this.getCoprocessorHost() != null) {
1172           this.getCoprocessorHost().postCompact(this, sf, null);
1173         }
1174         replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1175         completeCompaction(filesToCompact);
1176       }
1177     } finally {
1178       synchronized (filesCompacting) {
1179         filesCompacting.removeAll(filesToCompact);
1180       }
1181     }
1182   }
1183 
1184   @Override
1185   public boolean hasReferences() {
1186     return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1187   }
1188 
1189   @Override
1190   public CompactionProgress getCompactionProgress() {
1191     return this.storeEngine.getCompactor().getProgress();
1192   }
1193 
1194   @Override
1195   public boolean isMajorCompaction() throws IOException {
1196     for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1197       // TODO: what are these reader checks all over the place?
1198       if (sf.getReader() == null) {
1199         LOG.debug("StoreFile " + sf + " has null Reader");
1200         return false;
1201       }
1202     }
1203     return storeEngine.getCompactionPolicy().isMajorCompaction(
1204         this.storeEngine.getStoreFileManager().getStorefiles());
1205   }
1206 
1207   @Override
1208   public CompactionContext requestCompaction() throws IOException {
1209     return requestCompaction(Store.NO_PRIORITY, null);
1210   }
1211 
1212   @Override
1213   public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1214       throws IOException {
1215     // don't even select for compaction if writes are disabled
1216     if (!this.areWritesEnabled()) {
1217       return null;
1218     }
1219 
1220     CompactionContext compaction = storeEngine.createCompaction();
1221     this.lock.readLock().lock();
1222     try {
1223       synchronized (filesCompacting) {
1224         // First, see if coprocessor would want to override selection.
1225         if (this.getCoprocessorHost() != null) {
1226           List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1227           boolean override = this.getCoprocessorHost().preCompactSelection(
1228               this, candidatesForCoproc, baseRequest);
1229           if (override) {
1230             // Coprocessor is overriding normal file selection.
1231             compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1232           }
1233         }
1234 
1235         // Normal case - coprocessor is not overriding file selection.
1236         if (!compaction.hasSelection()) {
1237           boolean isUserCompaction = priority == Store.PRIORITY_USER;
1238           boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1239               offPeakCompactionTracker.compareAndSet(false, true);
1240           try {
1241             compaction.select(this.filesCompacting, isUserCompaction,
1242               mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1243           } catch (IOException e) {
1244             if (mayUseOffPeak) {
1245               offPeakCompactionTracker.set(false);
1246             }
1247             throw e;
1248           }
1249           assert compaction.hasSelection();
1250           if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1251             // Compaction policy doesn't want to take advantage of off-peak.
1252             offPeakCompactionTracker.set(false);
1253           }
1254         }
1255         if (this.getCoprocessorHost() != null) {
1256           this.getCoprocessorHost().postCompactSelection(
1257               this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
1258         }
1259 
1260         // Selected files; see if we have a compaction with some custom base request.
1261         if (baseRequest != null) {
1262           // Update the request with what the system thinks the request should be;
1263           // its up to the request if it wants to listen.
1264           compaction.forceSelect(
1265               baseRequest.combineWith(compaction.getRequest()));
1266         }
1267 
1268         // Finally, we have the resulting files list. Check if we have any files at all.
1269         final Collection<StoreFile> selectedFiles = compaction.getRequest().getFiles();
1270         if (selectedFiles.isEmpty()) {
1271           return null;
1272         }
1273 
1274         // Update filesCompacting (check that we do not try to compact the same StoreFile twice).
1275         if (!Collections.disjoint(filesCompacting, selectedFiles)) {
1276           Preconditions.checkArgument(false, "%s overlaps with %s",
1277               selectedFiles, filesCompacting);
1278         }
1279         filesCompacting.addAll(selectedFiles);
1280         Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1281 
1282         // If we're enqueuing a major, clear the force flag.
1283         boolean isMajor = selectedFiles.size() == this.getStorefilesCount();
1284         this.forceMajor = this.forceMajor && !isMajor;
1285 
1286         // Set common request properties.
1287         // Set priority, either override value supplied by caller or from store.
1288         compaction.getRequest().setPriority(
1289             (priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1290         compaction.getRequest().setIsMajor(isMajor);
1291         compaction.getRequest().setDescription(
1292             getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1293       }
1294     } finally {
1295       this.lock.readLock().unlock();
1296     }
1297 
1298     LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating "
1299         + (compaction.getRequest().isMajor() ? "major" : "minor") + " compaction");
1300     this.region.reportCompactionRequestStart(compaction.getRequest().isMajor());
1301     return compaction;
1302   }
1303 
1304   public void cancelRequestedCompaction(CompactionContext compaction) {
1305     finishCompactionRequest(compaction.getRequest());
1306   }
1307 
1308   private void finishCompactionRequest(CompactionRequest cr) {
1309     this.region.reportCompactionRequestEnd(cr.isMajor());
1310     if (cr.isOffPeak()) {
1311       offPeakCompactionTracker.set(false);
1312       cr.setOffPeak(false);
1313     }
1314     synchronized (filesCompacting) {
1315       filesCompacting.removeAll(cr.getFiles());
1316     }
1317   }
1318 
1319   /**
1320    * Validates a store file by opening and closing it. In HFileV2 this should
1321    * not be an expensive operation.
1322    *
1323    * @param path the path to the store file
1324    */
1325   private void validateStoreFile(Path path)
1326       throws IOException {
1327     StoreFile storeFile = null;
1328     try {
1329       createStoreFileAndReader(path, NoOpDataBlockEncoder.INSTANCE);
1330     } catch (IOException e) {
1331       LOG.error("Failed to open store file : " + path
1332           + ", keeping it in tmp location", e);
1333       throw e;
1334     } finally {
1335       if (storeFile != null) {
1336         storeFile.closeReader(false);
1337       }
1338     }
1339   }
1340 
1341   /*
1342    * <p>It works by processing a compaction that's been written to disk.
1343    *
1344    * <p>It is usually invoked at the end of a compaction, but might also be
1345    * invoked at HStore startup, if the prior execution died midway through.
1346    *
1347    * <p>Moving the compacted TreeMap into place means:
1348    * <pre>
1349    * 1) Unload all replaced StoreFile, close and collect list to delete.
1350    * 2) Compute new store size
1351    * </pre>
1352    *
1353    * @param compactedFiles list of files that were compacted
1354    * @param newFile StoreFile that is the result of the compaction
1355    */
1356   @VisibleForTesting
1357   protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1358       throws IOException {
1359     try {
1360       // Do not delete old store files until we have sent out notification of
1361       // change in case old files are still being accessed by outstanding scanners.
1362       // Don't do this under writeLock; see HBASE-4485 for a possible deadlock
1363       // scenario that could have happened if continue to hold the lock.
1364       notifyChangedReadersObservers();
1365       // At this point the store will use new files for all scanners.
1366 
1367       // let the archive util decide if we should archive or delete the files
1368       LOG.debug("Removing store files after compaction...");
1369       for (StoreFile compactedFile : compactedFiles) {
1370         compactedFile.closeReader(true);
1371       }
1372       this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
1373     } catch (IOException e) {
1374       e = RemoteExceptionHandler.checkIOException(e);
1375       LOG.error("Failed removing compacted files in " + this +
1376         ". Files we were trying to remove are " + compactedFiles.toString() +
1377         "; some of them may have been already removed", e);
1378     }
1379 
1380     // 4. Compute new store size
1381     this.storeSize = 0L;
1382     this.totalUncompressedBytes = 0L;
1383     for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1384       StoreFile.Reader r = hsf.getReader();
1385       if (r == null) {
1386         LOG.warn("StoreFile " + hsf + " has a null Reader");
1387         continue;
1388       }
1389       this.storeSize += r.length();
1390       this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1391     }
1392   }
1393 
1394   /*
1395    * @param wantedVersions How many versions were asked for.
1396    * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1397    */
1398   int versionsToReturn(final int wantedVersions) {
1399     if (wantedVersions <= 0) {
1400       throw new IllegalArgumentException("Number of versions must be > 0");
1401     }
1402     // Make sure we do not return more than maximum versions for this store.
1403     int maxVersions = this.family.getMaxVersions();
1404     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1405   }
1406 
1407   static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
1408     return key.getTimestamp() < oldestTimestamp;
1409   }
1410 
1411   @Override
1412   public KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException {
1413     // If minVersions is set, we will not ignore expired KVs.
1414     // As we're only looking for the latest matches, that should be OK.
1415     // With minVersions > 0 we guarantee that any KV that has any version
1416     // at all (expired or not) has at least one version that will not expire.
1417     // Note that this method used to take a KeyValue as arguments. KeyValue
1418     // can be back-dated, a row key cannot.
1419     long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.scanInfo.getTtl();
1420 
1421     KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1422 
1423     GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1424       this.comparator, kv, ttlToUse, this.getRegionInfo().isMetaRegion());
1425     this.lock.readLock().lock();
1426     try {
1427       // First go to the memstore.  Pick up deletes and candidates.
1428       this.memstore.getRowKeyAtOrBefore(state);
1429       // Check if match, if we got a candidate on the asked for 'kv' row.
1430       // Process each relevant store file. Run through from newest to oldest.
1431       Iterator<StoreFile> sfIterator = this.storeEngine.getStoreFileManager()
1432           .getCandidateFilesForRowKeyBefore(state.getTargetKey());
1433       while (sfIterator.hasNext()) {
1434         StoreFile sf = sfIterator.next();
1435         sfIterator.remove(); // Remove sf from iterator.
1436         boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
1437         if (haveNewCandidate) {
1438           // TODO: we may have an optimization here which stops the search if we find exact match.
1439           sfIterator = this.storeEngine.getStoreFileManager().updateCandidateFilesForRowKeyBefore(
1440               sfIterator, state.getTargetKey(), state.getCandidate());
1441         }
1442       }
1443       return state.getCandidate();
1444     } finally {
1445       this.lock.readLock().unlock();
1446     }
1447   }
1448 
1449   /*
1450    * Check an individual MapFile for the row at or before a given row.
1451    * @param f
1452    * @param state
1453    * @throws IOException
1454    * @return True iff the candidate has been updated in the state.
1455    */
1456   private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
1457                                           final GetClosestRowBeforeTracker state)
1458       throws IOException {
1459     StoreFile.Reader r = f.getReader();
1460     if (r == null) {
1461       LOG.warn("StoreFile " + f + " has a null Reader");
1462       return false;
1463     }
1464     if (r.getEntries() == 0) {
1465       LOG.warn("StoreFile " + f + " is a empty store file");
1466       return false;
1467     }
1468     // TODO: Cache these keys rather than make each time?
1469     byte [] fk = r.getFirstKey();
1470     if (fk == null) return false;
1471     KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1472     byte [] lk = r.getLastKey();
1473     KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1474     KeyValue firstOnRow = state.getTargetKey();
1475     if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1476       // If last key in file is not of the target table, no candidates in this
1477       // file.  Return.
1478       if (!state.isTargetTable(lastKV)) return false;
1479       // If the row we're looking for is past the end of file, set search key to
1480       // last key. TODO: Cache last and first key rather than make each time.
1481       firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1482     }
1483     // Get a scanner that caches blocks and that uses pread.
1484     HFileScanner scanner = r.getScanner(true, true, false);
1485     // Seek scanner.  If can't seek it, return.
1486     if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1487     // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
1488     // Unlikely that there'll be an instance of actual first row in table.
1489     if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1490     // If here, need to start backing up.
1491     while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1492        firstOnRow.getKeyLength())) {
1493       KeyValue kv = scanner.getKeyValue();
1494       if (!state.isTargetTable(kv)) break;
1495       if (!state.isBetterCandidate(kv)) break;
1496       // Make new first on row.
1497       firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1498       // Seek scanner.  If can't seek it, break.
1499       if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1500       // If we find something, break;
1501       if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1502     }
1503     return false;
1504   }
1505 
1506   /*
1507    * Seek the file scanner to firstOnRow or first entry in file.
1508    * @param scanner
1509    * @param firstOnRow
1510    * @param firstKV
1511    * @return True if we successfully seeked scanner.
1512    * @throws IOException
1513    */
1514   private boolean seekToScanner(final HFileScanner scanner,
1515                                 final KeyValue firstOnRow,
1516                                 final KeyValue firstKV)
1517       throws IOException {
1518     KeyValue kv = firstOnRow;
1519     // If firstOnRow < firstKV, set to firstKV
1520     if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1521     int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(),
1522       kv.getKeyLength());
1523     return result >= 0;
1524   }
1525 
1526   /*
1527    * When we come in here, we are probably at the kv just before we break into
1528    * the row that firstOnRow is on.  Usually need to increment one time to get
1529    * on to the row we are interested in.
1530    * @param scanner
1531    * @param firstOnRow
1532    * @param state
1533    * @return True we found a candidate.
1534    * @throws IOException
1535    */
1536   private boolean walkForwardInSingleRow(final HFileScanner scanner,
1537                                          final KeyValue firstOnRow,
1538                                          final GetClosestRowBeforeTracker state)
1539       throws IOException {
1540     boolean foundCandidate = false;
1541     do {
1542       KeyValue kv = scanner.getKeyValue();
1543       // If we are not in the row, skip.
1544       if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
1545       // Did we go beyond the target row? If so break.
1546       if (state.isTooFar(kv, firstOnRow)) break;
1547       if (state.isExpired(kv)) {
1548         continue;
1549       }
1550       // If we added something, this row is a contender. break.
1551       if (state.handle(kv)) {
1552         foundCandidate = true;
1553         break;
1554       }
1555     } while(scanner.next());
1556     return foundCandidate;
1557   }
1558 
1559   public boolean canSplit() {
1560     this.lock.readLock().lock();
1561     try {
1562       // Not split-able if we find a reference store file present in the store.
1563       boolean result = !hasReferences();
1564       if (!result && LOG.isDebugEnabled()) {
1565         LOG.debug("Cannot split region due to reference files being there");
1566       }
1567       return result;
1568     } finally {
1569       this.lock.readLock().unlock();
1570     }
1571   }
1572 
1573   @Override
1574   public byte[] getSplitPoint() {
1575     this.lock.readLock().lock();
1576     try {
1577       // Should already be enforced by the split policy!
1578       assert !this.getRegionInfo().isMetaRegion();
1579       // Not split-able if we find a reference store file present in the store.
1580       if (hasReferences()) {
1581         assert false : "getSplitPoint() called on a region that can't split!";
1582         return null;
1583       }
1584       return this.storeEngine.getStoreFileManager().getSplitPoint();
1585     } catch(IOException e) {
1586       LOG.warn("Failed getting store size for " + this, e);
1587     } finally {
1588       this.lock.readLock().unlock();
1589     }
1590     return null;
1591   }
1592 
1593   @Override
1594   public long getLastCompactSize() {
1595     return this.lastCompactSize;
1596   }
1597 
1598   @Override
1599   public long getSize() {
1600     return storeSize;
1601   }
1602 
1603   public void triggerMajorCompaction() {
1604     this.forceMajor = true;
1605   }
1606 
1607   boolean getForceMajorCompaction() {
1608     return this.forceMajor;
1609   }
1610 
1611   //////////////////////////////////////////////////////////////////////////////
1612   // File administration
1613   //////////////////////////////////////////////////////////////////////////////
1614 
1615   @Override
1616   public KeyValueScanner getScanner(Scan scan,
1617       final NavigableSet<byte []> targetCols) throws IOException {
1618     lock.readLock().lock();
1619     try {
1620       KeyValueScanner scanner = null;
1621       if (this.getCoprocessorHost() != null) {
1622         scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
1623       }
1624       if (scanner == null) {
1625         scanner = new StoreScanner(this, getScanInfo(), scan, targetCols);
1626       }
1627       return scanner;
1628     } finally {
1629       lock.readLock().unlock();
1630     }
1631   }
1632 
1633   @Override
1634   public String toString() {
1635     return this.getColumnFamilyName();
1636   }
1637 
1638   @Override
1639   // TODO: why is there this and also getNumberOfStorefiles?! Remove one.
1640   public int getStorefilesCount() {
1641     return this.storeEngine.getStoreFileManager().getStorefileCount();
1642   }
1643 
1644   @Override
1645   public long getStoreSizeUncompressed() {
1646     return this.totalUncompressedBytes;
1647   }
1648 
1649   @Override
1650   public long getStorefilesSize() {
1651     long size = 0;
1652     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1653       StoreFile.Reader r = s.getReader();
1654       if (r == null) {
1655         LOG.warn("StoreFile " + s + " has a null Reader");
1656         continue;
1657       }
1658       size += r.length();
1659     }
1660     return size;
1661   }
1662 
1663   @Override
1664   public long getStorefilesIndexSize() {
1665     long size = 0;
1666     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1667       StoreFile.Reader r = s.getReader();
1668       if (r == null) {
1669         LOG.warn("StoreFile " + s + " has a null Reader");
1670         continue;
1671       }
1672       size += r.indexSize();
1673     }
1674     return size;
1675   }
1676 
1677   @Override
1678   public long getTotalStaticIndexSize() {
1679     long size = 0;
1680     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1681       size += s.getReader().getUncompressedDataIndexSize();
1682     }
1683     return size;
1684   }
1685 
1686   @Override
1687   public long getTotalStaticBloomSize() {
1688     long size = 0;
1689     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1690       StoreFile.Reader r = s.getReader();
1691       size += r.getTotalBloomSize();
1692     }
1693     return size;
1694   }
1695 
1696   @Override
1697   public long getMemStoreSize() {
1698     return this.memstore.heapSize();
1699   }
1700 
1701   @Override
1702   public int getCompactPriority() {
1703     int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
1704     if (priority == PRIORITY_USER) {
1705       LOG.warn("Compaction priority is USER despite there being no user compaction");
1706     }
1707     return priority;
1708   }
1709 
1710   @Override
1711   public boolean throttleCompaction(long compactionSize) {
1712     return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
1713   }
1714 
1715   public HRegion getHRegion() {
1716     return this.region;
1717   }
1718 
1719   @Override
1720   public RegionCoprocessorHost getCoprocessorHost() {
1721     return this.region.getCoprocessorHost();
1722   }
1723 
1724   @Override
1725   public HRegionInfo getRegionInfo() {
1726     return this.fs.getRegionInfo();
1727   }
1728 
1729   @Override
1730   public boolean areWritesEnabled() {
1731     return this.region.areWritesEnabled();
1732   }
1733 
1734   @Override
1735   public long getSmallestReadPoint() {
1736     return this.region.getSmallestReadPoint();
1737   }
1738 
1739   /**
1740    * Used in tests. TODO: Remove
1741    *
1742    * Updates the value for the given row/family/qualifier. This function will always be seen as
1743    * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
1744    * control necessary.
1745    * @param row row to update
1746    * @param f family to update
1747    * @param qualifier qualifier to update
1748    * @param newValue the new value to set into memstore
1749    * @return memstore size delta
1750    * @throws IOException
1751    */
1752   public long updateColumnValue(byte [] row, byte [] f,
1753                                 byte [] qualifier, long newValue)
1754       throws IOException {
1755 
1756     this.lock.readLock().lock();
1757     try {
1758       long now = EnvironmentEdgeManager.currentTimeMillis();
1759 
1760       return this.memstore.updateColumnValue(row,
1761           f,
1762           qualifier,
1763           newValue,
1764           now);
1765 
1766     } finally {
1767       this.lock.readLock().unlock();
1768     }
1769   }
1770 
1771   @Override
1772   public long upsert(Iterable<? extends Cell> cells, long readpoint) throws IOException {
1773     this.lock.readLock().lock();
1774     try {
1775       return this.memstore.upsert(cells, readpoint);
1776     } finally {
1777       this.lock.readLock().unlock();
1778     }
1779   }
1780 
1781   public StoreFlushContext createFlushContext(long cacheFlushId) {
1782     return new StoreFlusherImpl(cacheFlushId);
1783   }
1784 
1785   private class StoreFlusherImpl implements StoreFlushContext {
1786 
1787     private long cacheFlushSeqNum;
1788     private SortedSet<KeyValue> snapshot;
1789     private List<Path> tempFiles;
1790     private TimeRangeTracker snapshotTimeRangeTracker;
1791     private final AtomicLong flushedSize = new AtomicLong();
1792 
1793     private StoreFlusherImpl(long cacheFlushSeqNum) {
1794       this.cacheFlushSeqNum = cacheFlushSeqNum;
1795     }
1796 
1797     @Override
1798     public void prepare() {
1799       memstore.snapshot();
1800       this.snapshot = memstore.getSnapshot();
1801       this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
1802     }
1803 
1804     @Override
1805     public void flushCache(MonitoredTask status) throws IOException {
1806       tempFiles = HStore.this.flushCache(
1807         cacheFlushSeqNum, snapshot, snapshotTimeRangeTracker, flushedSize, status);
1808     }
1809 
1810     @Override
1811     public boolean commit(MonitoredTask status) throws IOException {
1812       if (this.tempFiles == null || this.tempFiles.isEmpty()) {
1813         return false;
1814       }
1815       List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
1816       for (Path storeFilePath : tempFiles) {
1817         try {
1818           storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum,
1819               snapshotTimeRangeTracker, flushedSize, status));
1820         } catch (IOException ex) {
1821           LOG.error("Failed to commit store file " + storeFilePath, ex);
1822           // Try to delete the files we have committed before.
1823           for (StoreFile sf : storeFiles) {
1824             Path pathToDelete = sf.getPath();
1825             try {
1826               sf.deleteReader();
1827             } catch (IOException deleteEx) {
1828               LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
1829               Runtime.getRuntime().halt(1);
1830             }
1831           }
1832           throw new IOException("Failed to commit the flush", ex);
1833         }
1834       }
1835 
1836       if (HStore.this.getCoprocessorHost() != null) {
1837         for (StoreFile sf : storeFiles) {
1838           HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
1839         }
1840       }
1841       // Add new file to store files.  Clear snapshot too while we have the Store write lock.
1842       return HStore.this.updateStorefiles(storeFiles, snapshot);
1843     }
1844   }
1845 
1846   @Override
1847   public boolean needsCompaction() {
1848     return storeEngine.getCompactionPolicy().needsCompaction(
1849         this.storeEngine.getStoreFileManager().getStorefiles(), filesCompacting);
1850   }
1851 
1852   @Override
1853   public CacheConfig getCacheConfig() {
1854     return this.cacheConf;
1855   }
1856 
1857   public static final long FIXED_OVERHEAD =
1858       ClassSize.align(ClassSize.OBJECT + (15 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
1859               + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
1860 
1861   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
1862       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
1863       + ClassSize.CONCURRENT_SKIPLISTMAP
1864       + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
1865       + ScanInfo.FIXED_OVERHEAD);
1866 
1867   @Override
1868   public long heapSize() {
1869     return DEEP_OVERHEAD + this.memstore.heapSize();
1870   }
1871 
1872   public KeyValue.KVComparator getComparator() {
1873     return comparator;
1874   }
1875 
1876   @Override
1877   public ScanInfo getScanInfo() {
1878     return scanInfo;
1879   }
1880 
1881   /**
1882    * Set scan info, used by test
1883    * @param scanInfo new scan info to use for test
1884    */
1885   void setScanInfo(ScanInfo scanInfo) {
1886     this.scanInfo = scanInfo;
1887   }
1888 
1889   @Override
1890   public boolean hasTooManyStoreFiles() {
1891     return getStorefilesCount() > this.blockingFileCount;
1892   }
1893 }