View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.List;
29  import java.util.NavigableSet;
30  import java.util.Random;
31  import java.util.Set;
32  import java.util.SortedSet;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.CompletionService;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.ExecutorCompletionService;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.atomic.AtomicLong;
41  import java.util.concurrent.locks.ReentrantReadWriteLock;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.fs.FileStatus;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.FileUtil;
49  import org.apache.hadoop.fs.Path;
50  import org.apache.hadoop.hbase.HBaseFileSystem;
51  import org.apache.hadoop.hbase.HColumnDescriptor;
52  import org.apache.hadoop.hbase.HConstants;
53  import org.apache.hadoop.hbase.HRegionInfo;
54  import org.apache.hadoop.hbase.KeyValue;
55  import org.apache.hadoop.hbase.KeyValue.KVComparator;
56  import org.apache.hadoop.hbase.RemoteExceptionHandler;
57  import org.apache.hadoop.hbase.backup.HFileArchiver;
58  import org.apache.hadoop.hbase.client.Scan;
59  import org.apache.hadoop.hbase.fs.HFileSystem;
60  import org.apache.hadoop.hbase.io.HFileLink;
61  import org.apache.hadoop.hbase.io.HeapSize;
62  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
63  import org.apache.hadoop.hbase.io.hfile.Compression;
64  import org.apache.hadoop.hbase.io.hfile.HFile;
65  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
66  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
67  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
68  import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
69  import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
70  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
71  import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
72  import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
73  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
74  import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
75  import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
76  import org.apache.hadoop.hbase.util.*;
77  import org.apache.hadoop.util.StringUtils;
78  
79  import com.google.common.base.Preconditions;
80  import com.google.common.base.Predicate;
81  import com.google.common.collect.Collections2;
82  import com.google.common.collect.ImmutableList;
83  import com.google.common.collect.Lists;
84  
85  /**
86   * A Store holds a column family in a Region.  Its a memstore and a set of zero
87   * or more StoreFiles, which stretch backwards over time.
88   *
89   * <p>There's no reason to consider append-logging at this level; all logging
90   * and locking is handled at the HRegion level.  Store just provides
91   * services to manage sets of StoreFiles.  One of the most important of those
92   * services is compaction services where files are aggregated once they pass
93   * a configurable threshold.
94   *
95   * <p>The only thing having to do with logs that Store needs to deal with is
96   * the reconstructionLog.  This is a segment of an HRegion's log that might
97   * NOT be present upon startup.  If the param is NULL, there's nothing to do.
98   * If the param is non-NULL, we need to process the log to reconstruct
99   * a TreeMap that might not have been written to disk before the process
100  * died.
101  *
102  * <p>It's assumed that after this constructor returns, the reconstructionLog
103  * file will be deleted (by whoever has instantiated the Store).
104  *
105  * <p>Locking and transactions are handled at a higher level.  This API should
106  * not be called directly but by an HRegion manager.
107  */
108 public class Store extends SchemaConfigured implements HeapSize {
109   static final Log LOG = LogFactory.getLog(Store.class);
110 
111   public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
112   public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
113 
114   protected final MemStore memstore;
115   // This stores directory in the filesystem.
116   private final Path homedir;
117   private final HRegion region;
118   private final HColumnDescriptor family;
119   final FileSystem fs;
120   final Configuration conf;
121   final CacheConfig cacheConf;
122   // ttl in milliseconds.
123   private long ttl;
124   private final int minFilesToCompact;
125   private final int maxFilesToCompact;
126   private final long minCompactSize;
127   private final long maxCompactSize;
128   private final float minStoreFileLocalitySkipCompact;
129   private long lastCompactSize = 0;
130   volatile boolean forceMajor = false;
131   /* how many bytes to write between status checks */
132   static int closeCheckInterval = 0;
133   private final int blockingStoreFileCount;
134   private volatile long storeSize = 0L;
135   private volatile long totalUncompressedBytes = 0L;
136   private final Object flushLock = new Object();
137   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
138   private final boolean verifyBulkLoads;
139   
140   private long blockingFileCount;
141 
142   /* The default priority for user-specified compaction requests.
143    * The user gets top priority unless we have blocking compactions. (Pri <= 0)
144    */
145   public static final int PRIORITY_USER = 1;
146   public static final int NO_PRIORITY = Integer.MIN_VALUE;
147 
148   // not private for testing
149   /* package */ScanInfo scanInfo;
150   /*
151    * List of store files inside this store. This is an immutable list that
152    * is atomically replaced when its contents change.
153    */
154   private volatile ImmutableList<StoreFile> storefiles = null;
155 
156   List<StoreFile> filesCompacting = Lists.newArrayList();
157 
158   // All access must be synchronized.
159   private final Set<ChangedReadersObserver> changedReaderObservers =
160       Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
161 
162   private final int blocksize;
163   private HFileDataBlockEncoder dataBlockEncoder;
164 
165   /** Checksum configuration */
166   private ChecksumType checksumType;
167   private int bytesPerChecksum;
168 
169   // Comparing KeyValues
170   final KeyValue.KVComparator comparator;
171 
172   private final Compactor compactor;
173 
174   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
175   private static int flush_retries_number;
176   private static int pauseTime;
177 
178   /**
179    * Constructor
180    * @param basedir qualified path under which the region directory lives;
181    * generally the table subdirectory
182    * @param region
183    * @param family HColumnDescriptor for this column
184    * @param fs file system object
185    * @param confParam configuration object
186    * failed.  Can be null.
187    * @throws IOException
188    */
189   protected Store(Path basedir, HRegion region, HColumnDescriptor family,
190       FileSystem fs, Configuration confParam)
191   throws IOException {
192     super(new CompoundConfiguration().add(confParam).add(
193         family.getValues()), region.getTableDesc().getNameAsString(),
194         Bytes.toString(family.getName()));
195     HRegionInfo info = region.getRegionInfo();
196     this.fs = fs;
197     Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
198     this.homedir = createStoreHomeDir(this.fs, p);
199     this.region = region;
200     this.family = family;
201     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
202     this.conf = new CompoundConfiguration().add(confParam).add(family.getValues());
203     this.blocksize = family.getBlocksize();
204 
205     this.dataBlockEncoder =
206         new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
207             family.getDataBlockEncoding());
208 
209     this.comparator = info.getComparator();
210     // getTimeToLive returns ttl in seconds.  Convert to milliseconds.
211     this.ttl = family.getTimeToLive();
212     if (ttl == HConstants.FOREVER) {
213       // default is unlimited ttl.
214       ttl = Long.MAX_VALUE;
215     } else if (ttl == -1) {
216       ttl = Long.MAX_VALUE;
217     } else {
218       // second -> ms adjust for user data
219       this.ttl *= 1000;
220     }
221     // used by ScanQueryMatcher
222     long timeToPurgeDeletes =
223         Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
224     LOG.info("time to purge deletes set to " + timeToPurgeDeletes +
225         "ms in store " + this);
226     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
227     this.memstore = new MemStore(conf, this.comparator);
228 
229     // By default, compact if storefile.count >= minFilesToCompact
230     this.minFilesToCompact = Math.max(2,
231       conf.getInt("hbase.hstore.compaction.min",
232         /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
233 
234     LOG.info("hbase.hstore.compaction.min = " + this.minFilesToCompact);
235     
236     // Setting up cache configuration for this family
237     this.cacheConf = new CacheConfig(conf, family);
238     this.blockingStoreFileCount =
239       conf.getInt("hbase.hstore.blockingStoreFiles", 7);
240 
241     this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
242     this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
243       this.region.memstoreFlushSize);
244     this.maxCompactSize
245       = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
246     this.minStoreFileLocalitySkipCompact
247       = conf.getFloat("hbase.hstore.min.locality.to.skip.major.compact", 0f);
248 
249     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
250     
251     this.blockingFileCount =
252                 conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
253     
254     if (Store.closeCheckInterval == 0) {
255       Store.closeCheckInterval = conf.getInt(
256           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
257     }
258     this.storefiles = sortAndClone(loadStoreFiles());
259 
260     // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
261     this.checksumType = getChecksumType(conf);
262     // initilize bytes per checksum
263     this.bytesPerChecksum = getBytesPerChecksum(conf);
264     // Create a compaction tool instance
265     this.compactor = new Compactor(this.conf);
266     if (Store.flush_retries_number == 0) {
267       Store.flush_retries_number = conf.getInt(
268           "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
269       Store.pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE,
270           HConstants.DEFAULT_HBASE_SERVER_PAUSE);
271       if (Store.flush_retries_number <= 0) {
272         throw new IllegalArgumentException(
273             "hbase.hstore.flush.retries.number must be > 0, not "
274                 + Store.flush_retries_number);
275       }
276     }
277   }
278 
279   /**
280    * @param family
281    * @return
282    */
283   long getTTL(final HColumnDescriptor family) {
284     // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
285     long ttl = family.getTimeToLive();
286     if (ttl == HConstants.FOREVER) {
287       // Default is unlimited ttl.
288       ttl = Long.MAX_VALUE;
289     } else if (ttl == -1) {
290       ttl = Long.MAX_VALUE;
291     } else {
292       // Second -> ms adjust for user data
293       ttl *= 1000;
294     }
295     return ttl;
296   }
297 
298   /**
299    * Create this store's homedir
300    * @param fs
301    * @param homedir
302    * @return Return <code>homedir</code>
303    * @throws IOException
304    */
305   Path createStoreHomeDir(final FileSystem fs,
306       final Path homedir) throws IOException {
307     if (!fs.exists(homedir) && !HBaseFileSystem.makeDirOnFileSystem(fs, homedir)) {
308         throw new IOException("Failed create of: " + homedir.toString());
309     }
310     return homedir;
311   }
312 
313   FileSystem getFileSystem() {
314     return this.fs;
315   }
316 
317   /**
318    * Returns the configured bytesPerChecksum value.
319    * @param conf The configuration
320    * @return The bytesPerChecksum that is set in the configuration
321    */
322   public static int getBytesPerChecksum(Configuration conf) {
323     return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
324                        HFile.DEFAULT_BYTES_PER_CHECKSUM);
325   }
326 
327   /**
328    * Returns the configured checksum algorithm.
329    * @param conf The configuration
330    * @return The checksum algorithm that is set in the configuration
331    */
332   public static ChecksumType getChecksumType(Configuration conf) {
333     String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
334     if (checksumName == null) {
335       return HFile.DEFAULT_CHECKSUM_TYPE;
336     } else {
337       return ChecksumType.nameToType(checksumName);
338     }
339   }
340 
341   public HColumnDescriptor getFamily() {
342     return this.family;
343   }
344 
345   /**
346    * @return The maximum sequence id in all store files.
347    */
348   long getMaxSequenceId() {
349     return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
350   }
351 
352   /**
353    * @return The maximum memstoreTS in all store files.
354    */
355   public long getMaxMemstoreTS() {
356     return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
357   }
358 
359   /**
360    * @param tabledir
361    * @param encodedName Encoded region name.
362    * @param family
363    * @return Path to family/Store home directory.
364    */
365   public static Path getStoreHomedir(final Path tabledir,
366       final String encodedName, final byte [] family) {
367      return getStoreHomedir(tabledir, encodedName, Bytes.toString(family));
368    }
369 
370   public long getFlushableSize() {
371     return this.memstore.getFlushableSize();
372   }
373 
374 
375   /**
376    * @param tabledir
377    * @param encodedName Encoded region name.
378    * @param family
379    * @return Path to family/Store home directory.
380    */
381   public static Path getStoreHomedir(final Path tabledir,
382       final String encodedName, final String family) {
383     return new Path(tabledir, new Path(encodedName, new Path(family)));
384   }
385 
386   /**
387    * @param parentRegionDirectory directory for the parent region
388    * @param family family name of this store
389    * @return Path to the family/Store home directory
390    */
391   public static Path getStoreHomedir(final Path parentRegionDirectory, final byte[] family) {
392     return new Path(parentRegionDirectory, new Path(Bytes.toString(family)));
393   }
394 
395   /**
396    * Return the directory in which this store stores its
397    * StoreFiles
398    */
399   Path getHomedir() {
400     return homedir;
401   }
402 
403   /**
404    * @return the data block encoder
405    */
406   public HFileDataBlockEncoder getDataBlockEncoder() {
407     return dataBlockEncoder;
408   }
409 
410   /**
411    * Should be used only in tests.
412    * @param blockEncoder the block delta encoder to use
413    */
414   void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
415     this.dataBlockEncoder = blockEncoder;
416   }
417 
418   FileStatus [] getStoreFiles() throws IOException {
419     return FSUtils.listStatus(this.fs, this.homedir, null);
420   }
421 
422   /**
423    * Creates an unsorted list of StoreFile loaded in parallel
424    * from the given directory.
425    * @throws IOException
426    */
427   private List<StoreFile> loadStoreFiles() throws IOException {
428     ArrayList<StoreFile> results = new ArrayList<StoreFile>();
429     FileStatus files[] = getStoreFiles();
430 
431     if (files == null || files.length == 0) {
432       return results;
433     }
434     // initialize the thread pool for opening store files in parallel..
435     ThreadPoolExecutor storeFileOpenerThreadPool =
436       this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
437           this.family.getNameAsString());
438     CompletionService<StoreFile> completionService =
439       new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
440 
441     int totalValidStoreFile = 0;
442     for (int i = 0; i < files.length; i++) {
443       // Skip directories.
444       if (files[i].isDir()) {
445         continue;
446       }
447       final Path p = files[i].getPath();
448       // Check for empty hfile. Should never be the case but can happen
449       // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
450       // NOTE: that the HFileLink is just a name, so it's an empty file.
451       if (!HFileLink.isHFileLink(p) && this.fs.getFileStatus(p).getLen() <= 0) {
452         LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
453         continue;
454       }
455 
456       // open each store file in parallel
457       completionService.submit(new Callable<StoreFile>() {
458         public StoreFile call() throws IOException {
459           StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf,
460               family.getBloomFilterType(), dataBlockEncoder);
461           passSchemaMetricsTo(storeFile);
462           storeFile.createReader();
463           return storeFile;
464         }
465       });
466       totalValidStoreFile++;
467     }
468 
469     IOException ioe = null;
470     try {
471       for (int i = 0; i < totalValidStoreFile; i++) {
472         try {
473           Future<StoreFile> future = completionService.take();
474           StoreFile storeFile = future.get();
475           long length = storeFile.getReader().length();
476           this.storeSize += length;
477           this.totalUncompressedBytes +=
478               storeFile.getReader().getTotalUncompressedBytes();
479           if (LOG.isDebugEnabled()) {
480             LOG.debug("loaded " + storeFile.toStringDetailed());
481           }
482           results.add(storeFile);
483         } catch (InterruptedException e) {
484           if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
485         } catch (ExecutionException e) {
486           if (ioe == null) ioe = new IOException(e.getCause());
487         } 
488       } 
489     } finally {
490       storeFileOpenerThreadPool.shutdownNow();
491     }
492     if (ioe != null) {
493       // close StoreFile readers
494       for (StoreFile file : results) {
495         try {
496           if (file != null) file.closeReader(true);
497         } catch (IOException e) { 
498           LOG.warn(e.getMessage());
499         }
500       }
501       throw ioe;
502     }
503 
504     return results;
505   }
506 
507   /**
508    * Adds a value to the memstore
509    *
510    * @param kv
511    * @return memstore size delta
512    */
513   protected long add(final KeyValue kv) {
514     lock.readLock().lock();
515     try {
516       return this.memstore.add(kv);
517     } finally {
518       lock.readLock().unlock();
519     }
520   }
521 
522   /**
523    * When was the oldest edit done in the memstore
524    */
525   public long timeOfOldestEdit() {
526     return memstore.timeOfOldestEdit();
527   }
528 
529   /**
530    * Adds a value to the memstore
531    *
532    * @param kv
533    * @return memstore size delta
534    */
535   protected long delete(final KeyValue kv) {
536     lock.readLock().lock();
537     try {
538       return this.memstore.delete(kv);
539     } finally {
540       lock.readLock().unlock();
541     }
542   }
543 
544   /**
545    * Removes a kv from the memstore. The KeyValue is removed only
546    * if its key & memstoreTS matches the key & memstoreTS value of the
547    * kv parameter.
548    *
549    * @param kv
550    */
551   protected void rollback(final KeyValue kv) {
552     lock.readLock().lock();
553     try {
554       this.memstore.rollback(kv);
555     } finally {
556       lock.readLock().unlock();
557     }
558   }
559 
560   /**
561    * @return All store files.
562    */
563   public List<StoreFile> getStorefiles() {
564     return this.storefiles;
565   }
566 
567   /**
568    * This throws a WrongRegionException if the HFile does not fit in this
569    * region, or an InvalidHFileException if the HFile is not valid.
570    */
571   void assertBulkLoadHFileOk(Path srcPath) throws IOException {
572     HFile.Reader reader  = null;
573     try {
574       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
575           + "store " + this + " region " + this.region);
576       reader = HFile.createReader(srcPath.getFileSystem(conf),
577           srcPath, cacheConf);
578       reader.loadFileInfo();
579 
580       byte[] firstKey = reader.getFirstRowKey();
581       byte[] lk = reader.getLastKey();
582       byte[] lastKey =
583           (lk == null) ? null :
584               KeyValue.createKeyValueFromKey(lk).getRow();
585 
586       LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
587           " last=" + Bytes.toStringBinary(lastKey));
588       LOG.debug("Region bounds: first=" +
589           Bytes.toStringBinary(region.getStartKey()) +
590           " last=" + Bytes.toStringBinary(region.getEndKey()));
591 
592       HRegionInfo hri = region.getRegionInfo();
593       if (!hri.containsRange(firstKey, lastKey)) {
594         throw new WrongRegionException(
595             "Bulk load file " + srcPath.toString() + " does not fit inside region "
596             + this.region);
597       }
598 
599       if (verifyBulkLoads) {
600         KeyValue prevKV = null;
601         HFileScanner scanner = reader.getScanner(false, false, false);
602         scanner.seekTo();
603         do {
604           KeyValue kv = scanner.getKeyValue();
605           if (prevKV != null) {
606             if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getRowOffset(),
607                 prevKV.getRowLength(), kv.getBuffer(), kv.getRowOffset(),
608                 kv.getRowLength()) > 0) {
609               throw new InvalidHFileException("Previous row is greater than"
610                   + " current row: path=" + srcPath + " previous="
611                   + Bytes.toStringBinary(prevKV.getKey()) + " current="
612                   + Bytes.toStringBinary(kv.getKey()));
613             }
614             if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getFamilyOffset(),
615                 prevKV.getFamilyLength(), kv.getBuffer(), kv.getFamilyOffset(),
616                 kv.getFamilyLength()) != 0) {
617               throw new InvalidHFileException("Previous key had different"
618                   + " family compared to current key: path=" + srcPath
619                   + " previous=" + Bytes.toStringBinary(prevKV.getFamily())
620                   + " current=" + Bytes.toStringBinary(kv.getFamily()));
621             }
622           }
623           prevKV = kv;
624         } while (scanner.next());
625       }
626     } finally {
627       if (reader != null) reader.close();
628     }
629   }
630 
631   /**
632    * This method should only be called from HRegion.  It is assumed that the
633    * ranges of values in the HFile fit within the stores assigned region.
634    * (assertBulkLoadHFileOk checks this)
635    */
636   public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
637     Path srcPath = new Path(srcPathStr);
638 
639     // Move the file if it's on another filesystem
640     FileSystem srcFs = srcPath.getFileSystem(conf);
641     FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs;
642     //We can't compare FileSystem instances as
643     //equals() includes UGI instance as part of the comparison
644     //and won't work when doing SecureBulkLoad
645     //TODO deal with viewFS
646     if (!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) {
647       LOG.info("File " + srcPath + " on different filesystem than " +
648           "destination store - moving to this filesystem.");
649       Path tmpPath = getTmpPath();
650       FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
651       LOG.info("Copied to temporary path on dst filesystem: " + tmpPath);
652       srcPath = tmpPath;
653     }
654 
655     Path dstPath =
656         StoreFile.getRandomFilename(fs, homedir, (seqNum == -1) ? null : "_SeqId_" + seqNum + "_");
657     LOG.debug("Renaming bulk load file " + srcPath + " to " + dstPath);
658     StoreFile.rename(fs, srcPath, dstPath);
659 
660     StoreFile sf = new StoreFile(fs, dstPath, this.conf, this.cacheConf,
661         this.family.getBloomFilterType(), this.dataBlockEncoder);
662     passSchemaMetricsTo(sf);
663 
664     StoreFile.Reader r = sf.createReader();
665     this.storeSize += r.length();
666     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
667 
668     LOG.info("Moved hfile " + srcPath + " into store directory " +
669         homedir + " - updating store file list.");
670 
671     // Append the new storefile into the list
672     this.lock.writeLock().lock();
673     try {
674       ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
675       newFiles.add(sf);
676       this.storefiles = sortAndClone(newFiles);
677     } finally {
678       // We need the lock, as long as we are updating the storefiles
679       // or changing the memstore. Let us release it before calling
680       // notifyChangeReadersObservers. See HBASE-4485 for a possible
681       // deadlock scenario that could have happened if continue to hold
682       // the lock.
683       this.lock.writeLock().unlock();
684     }
685     notifyChangedReadersObservers();
686     LOG.info("Successfully loaded store file " + srcPath
687         + " into store " + this + " (new location: " + dstPath + ")");
688   }
689 
690   /**
691    * Get a temporary path in this region. These temporary files
692    * will get cleaned up when the region is re-opened if they are
693    * still around.
694    */
695   private Path getTmpPath() throws IOException {
696     return StoreFile.getRandomFilename(
697         fs, region.getTmpDir());
698   }
699 
700   /**
701    * Close all the readers
702    *
703    * We don't need to worry about subsequent requests because the HRegion holds
704    * a write lock that will prevent any more reads or writes.
705    *
706    * @throws IOException
707    */
708   ImmutableList<StoreFile> close() throws IOException {
709     this.lock.writeLock().lock();
710     try {
711       ImmutableList<StoreFile> result = storefiles;
712 
713       // Clear so metrics doesn't find them.
714       storefiles = ImmutableList.of();
715 
716       if (!result.isEmpty()) {
717         // initialize the thread pool for closing store files in parallel.
718         ThreadPoolExecutor storeFileCloserThreadPool = this.region
719             .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
720                 + this.family.getNameAsString());
721 
722         // close each store file in parallel
723         CompletionService<Void> completionService =
724           new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
725         for (final StoreFile f : result) {
726           completionService.submit(new Callable<Void>() {
727             public Void call() throws IOException {
728               f.closeReader(true);
729               return null;
730             }
731           });
732         }
733 
734         IOException ioe = null;
735         try {
736           for (int i = 0; i < result.size(); i++) {
737             try {
738               Future<Void> future = completionService.take();
739               future.get();
740             } catch (InterruptedException e) {
741               if (ioe == null) {
742                 ioe = new InterruptedIOException();
743                 ioe.initCause(e);
744               }
745             } catch (ExecutionException e) {
746               if (ioe == null) ioe = new IOException(e.getCause());
747             }
748           }
749         } finally {
750           storeFileCloserThreadPool.shutdownNow();
751         }
752         if (ioe != null) throw ioe;
753       }
754       LOG.info("Closed " + this);
755       return result;
756     } finally {
757       this.lock.writeLock().unlock();
758     }
759   }
760 
761   /**
762    * Snapshot this stores memstore.  Call before running
763    * {@link #flushCache(long, SortedSet<KeyValue>)} so it has some work to do.
764    */
765   void snapshot() {
766     this.lock.writeLock().lock();
767     try {
768       this.memstore.snapshot();
769     } finally {
770       this.lock.writeLock().unlock();
771     }
772   }
773 
774   /**
775    * Write out current snapshot.  Presumes {@link #snapshot()} has been called
776    * previously.
777    * @param logCacheFlushId flush sequence number
778    * @param snapshot
779    * @param snapshotTimeRangeTracker
780    * @param flushedSize The number of bytes flushed
781    * @param status
782    * @return Path The path name of the tmp file to which the store was flushed
783    * @throws IOException
784    */
785   protected Path flushCache(final long logCacheFlushId,
786       SortedSet<KeyValue> snapshot,
787       TimeRangeTracker snapshotTimeRangeTracker,
788       AtomicLong flushedSize,
789       MonitoredTask status) throws IOException {
790     // If an exception happens flushing, we let it out without clearing
791     // the memstore snapshot.  The old snapshot will be returned when we say
792     // 'snapshot', the next time flush comes around.
793     // Retry after catching exception when flushing, otherwise server will abort
794     // itself
795     IOException lastException = null;
796     for (int i = 0; i < Store.flush_retries_number; i++) {
797       try {
798         Path pathName = internalFlushCache(snapshot, logCacheFlushId,
799             snapshotTimeRangeTracker, flushedSize, status);
800         try {
801           // Path name is null if there is no entry to flush
802           if (pathName != null) {
803             validateStoreFile(pathName);
804           }
805           return pathName;
806         } catch (Exception e) {
807           LOG.warn("Failed validating store file " + pathName
808               + ", retrying num=" + i, e);
809           if (e instanceof IOException) {
810             lastException = (IOException) e;
811           } else {
812             lastException = new IOException(e);
813           }
814         }
815       } catch (IOException e) {
816         LOG.warn("Failed flushing store file, retrying num=" + i, e);
817         lastException = e;
818       }
819       if (lastException != null && i < (flush_retries_number - 1)) {
820         try {
821           Thread.sleep(pauseTime);
822         } catch (InterruptedException e) {
823           IOException iie = new InterruptedIOException();
824           iie.initCause(e);
825           throw iie;
826         }
827       }
828     }
829     throw lastException;
830   }
831 
832   /*
833    * @param cache
834    * @param logCacheFlushId
835    * @param snapshotTimeRangeTracker
836    * @param flushedSize The number of bytes flushed
837    * @return Path The path name of the tmp file to which the store was flushed
838    * @throws IOException
839    */
840   private Path internalFlushCache(final SortedSet<KeyValue> set,
841       final long logCacheFlushId,
842       TimeRangeTracker snapshotTimeRangeTracker,
843       AtomicLong flushedSize,
844       MonitoredTask status)
845       throws IOException {
846     StoreFile.Writer writer;
847     // Find the smallest read point across all the Scanners.
848     long smallestReadPoint = region.getSmallestReadPoint();
849     long flushed = 0;
850     Path pathName;
851     // Don't flush if there are no entries.
852     if (set.size() == 0) {
853       return null;
854     }
855     // Use a store scanner to find which rows to flush.
856     // Note that we need to retain deletes, hence
857     // treat this as a minor compaction.
858     InternalScanner scanner = null;
859     KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);
860     if (getHRegion().getCoprocessorHost() != null) {
861       scanner = getHRegion().getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner);
862     }
863     if (scanner == null) {
864       Scan scan = new Scan();
865       scan.setMaxVersions(scanInfo.getMaxVersions());
866       scanner = new StoreScanner(this, scanInfo, scan,
867           Collections.singletonList(memstoreScanner), ScanType.MINOR_COMPACT,
868           this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
869     }
870     if (getHRegion().getCoprocessorHost() != null) {
871       InternalScanner cpScanner =
872         getHRegion().getCoprocessorHost().preFlush(this, scanner);
873       // NULL scanner returned from coprocessor hooks means skip normal processing
874       if (cpScanner == null) {
875         return null;
876       }
877       scanner = cpScanner;
878     }
879     try {
880       int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
881       // TODO:  We can fail in the below block before we complete adding this
882       // flush to list of store files.  Add cleanup of anything put on filesystem
883       // if we fail.
884       synchronized (flushLock) {
885         status.setStatus("Flushing " + this + ": creating writer");
886         // A. Write the map out to the disk
887         writer = createWriterInTmp(set.size());
888         writer.setTimeRangeTracker(snapshotTimeRangeTracker);
889         pathName = writer.getPath();
890         try {
891           List<KeyValue> kvs = new ArrayList<KeyValue>();
892           boolean hasMore;
893           do {
894             hasMore = scanner.next(kvs, compactionKVMax);
895             if (!kvs.isEmpty()) {
896               for (KeyValue kv : kvs) {
897                 // If we know that this KV is going to be included always, then let us
898                 // set its memstoreTS to 0. This will help us save space when writing to disk.
899                 if (kv.getMemstoreTS() <= smallestReadPoint) {
900                   // let us not change the original KV. It could be in the memstore
901                   // changing its memstoreTS could affect other threads/scanners.
902                   kv = kv.shallowCopy();
903                   kv.setMemstoreTS(0);
904                 }
905                 writer.append(kv);
906                 flushed += this.memstore.heapSizeChange(kv, true);
907               }
908               kvs.clear();
909             }
910           } while (hasMore);
911         } finally {
912           // Write out the log sequence number that corresponds to this output
913           // hfile.  The hfile is current up to and including logCacheFlushId.
914           status.setStatus("Flushing " + this + ": appending metadata");
915           writer.appendMetadata(logCacheFlushId, false);
916           status.setStatus("Flushing " + this + ": closing flushed file");
917           writer.close();
918         }
919       }
920     } finally {
921       flushedSize.set(flushed);
922       scanner.close();
923     }
924     if (LOG.isInfoEnabled()) {
925       LOG.info("Flushed " +
926                ", sequenceid=" + logCacheFlushId +
927                ", memsize=" + StringUtils.humanReadableInt(flushed) +
928                ", into tmp file " + pathName);
929     }
930     return pathName;
931   }
932 
933   /*
934    * @param path The pathname of the tmp file into which the store was flushed
935    * @param logCacheFlushId
936    * @return StoreFile created.
937    * @throws IOException
938    */
939   private StoreFile commitFile(final Path path,
940       final long logCacheFlushId,
941       TimeRangeTracker snapshotTimeRangeTracker,
942       AtomicLong flushedSize,
943       MonitoredTask status)
944       throws IOException {
945     // Write-out finished successfully, move into the right spot
946     String fileName = path.getName();
947     Path dstPath = new Path(homedir, fileName);
948     String msg = "Renaming flushed file at " + path + " to " + dstPath;
949     LOG.debug(msg);
950     status.setStatus("Flushing " + this + ": " + msg);
951     if (!HBaseFileSystem.renameDirForFileSystem(fs, path, dstPath)) {
952       LOG.warn("Unable to rename " + path + " to " + dstPath);
953     }
954 
955     status.setStatus("Flushing " + this + ": reopening flushed file");
956     StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf,
957         this.family.getBloomFilterType(), this.dataBlockEncoder);
958     passSchemaMetricsTo(sf);
959 
960     StoreFile.Reader r = sf.createReader();
961     this.storeSize += r.length();
962     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
963 
964     // This increments the metrics associated with total flushed bytes for this
965     // family. The overall flush count is stored in the static metrics and
966     // retrieved from HRegion.recentFlushes, which is set within
967     // HRegion.internalFlushcache, which indirectly calls this to actually do
968     // the flushing through the StoreFlusherImpl class
969     getSchemaMetrics().updatePersistentStoreMetric(
970         SchemaMetrics.StoreMetricType.FLUSH_SIZE, flushedSize.longValue());
971     if (LOG.isInfoEnabled()) {
972       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
973         ", sequenceid=" + logCacheFlushId +
974         ", filesize=" + StringUtils.humanReadableInt(r.length()));
975     }
976     return sf;
977   }
978 
979   /*
980    * @param maxKeyCount
981    * @return Writer for a new StoreFile in the tmp dir.
982    */
983   private StoreFile.Writer createWriterInTmp(int maxKeyCount)
984   throws IOException {
985     return createWriterInTmp(maxKeyCount, this.family.getCompression(), false, true);
986   }
987 
988   /*
989    * @param maxKeyCount
990    * @param compression Compression algorithm to use
991    * @param isCompaction whether we are creating a new file in a compaction
992    * @return Writer for a new StoreFile in the tmp dir.
993    */
994   public StoreFile.Writer createWriterInTmp(int maxKeyCount,
995     Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint)
996   throws IOException {
997     final CacheConfig writerCacheConf;
998     if (isCompaction) {
999       // Don't cache data on write on compactions.
1000       writerCacheConf = new CacheConfig(cacheConf);
1001       writerCacheConf.setCacheDataOnWrite(false);
1002     } else {
1003       writerCacheConf = cacheConf;
1004     }
1005     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
1006         fs, blocksize)
1007             .withOutputDir(region.getTmpDir())
1008             .withDataBlockEncoder(dataBlockEncoder)
1009             .withComparator(comparator)
1010             .withBloomType(family.getBloomFilterType())
1011             .withMaxKeyCount(maxKeyCount)
1012             .withChecksumType(checksumType)
1013             .withBytesPerChecksum(bytesPerChecksum)
1014             .withCompression(compression)
1015             .includeMVCCReadpoint(includeMVCCReadpoint)
1016             .build();
1017     // The store file writer's path does not include the CF name, so we need
1018     // to configure the HFile writer directly.
1019     SchemaConfigured sc = (SchemaConfigured) w.writer;
1020     SchemaConfigured.resetSchemaMetricsConf(sc);
1021     passSchemaMetricsTo(sc);
1022     return w;
1023   }
1024 
1025   /*
1026    * Change storefiles adding into place the Reader produced by this new flush.
1027    * @param sf
1028    * @param set That was used to make the passed file <code>p</code>.
1029    * @throws IOException
1030    * @return Whether compaction is required.
1031    */
1032   private boolean updateStorefiles(final StoreFile sf,
1033                                    final SortedSet<KeyValue> set)
1034   throws IOException {
1035     this.lock.writeLock().lock();
1036     try {
1037       ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
1038       newList.add(sf);
1039       storefiles = sortAndClone(newList);
1040 
1041       this.memstore.clearSnapshot(set);
1042     } finally {
1043       // We need the lock, as long as we are updating the storefiles
1044       // or changing the memstore. Let us release it before calling
1045       // notifyChangeReadersObservers. See HBASE-4485 for a possible
1046       // deadlock scenario that could have happened if continue to hold
1047       // the lock.
1048       this.lock.writeLock().unlock();
1049     }
1050 
1051     // Tell listeners of the change in readers.
1052     notifyChangedReadersObservers();
1053 
1054     return needsCompaction();
1055   }
1056 
1057   /*
1058    * Notify all observers that set of Readers has changed.
1059    * @throws IOException
1060    */
1061   private void notifyChangedReadersObservers() throws IOException {
1062     for (ChangedReadersObserver o: this.changedReaderObservers) {
1063       o.updateReaders();
1064     }
1065   }
1066 
1067   /**
1068    * Get all scanners with no filtering based on TTL (that happens further down
1069    * the line).
1070    * @return all scanners for this store
1071    */
1072   protected List<KeyValueScanner> getScanners(boolean cacheBlocks,
1073       boolean usePread,
1074       boolean isCompaction,
1075       ScanQueryMatcher matcher) throws IOException {
1076     List<StoreFile> storeFiles;
1077     List<KeyValueScanner> memStoreScanners;
1078     this.lock.readLock().lock();
1079     try {
1080       storeFiles = this.getStorefiles();
1081       memStoreScanners = this.memstore.getScanners();
1082     } finally {
1083       this.lock.readLock().unlock();
1084     }
1085 
1086     // First the store file scanners
1087 
1088     // TODO this used to get the store files in descending order,
1089     // but now we get them in ascending order, which I think is
1090     // actually more correct, since memstore get put at the end.
1091     List<StoreFileScanner> sfScanners = StoreFileScanner
1092       .getScannersForStoreFiles(storeFiles, cacheBlocks, usePread, isCompaction, matcher);
1093     List<KeyValueScanner> scanners =
1094       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
1095     scanners.addAll(sfScanners);
1096     // Then the memstore scanners
1097     scanners.addAll(memStoreScanners);
1098     return scanners;
1099   }
1100 
1101   /*
1102    * @param o Observer who wants to know about changes in set of Readers
1103    */
1104   void addChangedReaderObserver(ChangedReadersObserver o) {
1105     this.changedReaderObservers.add(o);
1106   }
1107 
1108   /*
1109    * @param o Observer no longer interested in changes in set of Readers.
1110    */
1111   void deleteChangedReaderObserver(ChangedReadersObserver o) {
1112     // We don't check if observer present; it may not be (legitimately)
1113     this.changedReaderObservers.remove(o);
1114   }
1115 
1116   //////////////////////////////////////////////////////////////////////////////
1117   // Compaction
1118   //////////////////////////////////////////////////////////////////////////////
1119 
1120   /**
1121    * Compact the StoreFiles.  This method may take some time, so the calling
1122    * thread must be able to block for long periods.
1123    *
1124    * <p>During this time, the Store can work as usual, getting values from
1125    * StoreFiles and writing new StoreFiles from the memstore.
1126    *
1127    * Existing StoreFiles are not destroyed until the new compacted StoreFile is
1128    * completely written-out to disk.
1129    *
1130    * <p>The compactLock prevents multiple simultaneous compactions.
1131    * The structureLock prevents us from interfering with other write operations.
1132    *
1133    * <p>We don't want to hold the structureLock for the whole time, as a compact()
1134    * can be lengthy and we want to allow cache-flushes during this period.
1135    *
1136    * @param cr
1137    *          compaction details obtained from requestCompaction()
1138    * @throws IOException
1139    * @return Storefile we compacted into or null if we failed or opted out early.
1140    */
1141   StoreFile compact(CompactionRequest cr) throws IOException {
1142     if (cr == null || cr.getFiles().isEmpty()) return null;
1143     Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
1144     List<StoreFile> filesToCompact = cr.getFiles();
1145     synchronized (filesCompacting) {
1146       // sanity check: we're compacting files that this store knows about
1147       // TODO: change this to LOG.error() after more debugging
1148       Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1149     }
1150 
1151     // Max-sequenceID is the last key in the files we're compacting
1152     long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
1153 
1154     // Ready to go. Have list of files to compact.
1155     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1156         + this + " of "
1157         + this.region.getRegionInfo().getRegionNameAsString()
1158         + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
1159         + StringUtils.humanReadableInt(cr.getSize()));
1160 
1161     StoreFile sf = null;
1162     try {
1163       StoreFile.Writer writer = this.compactor.compact(cr, maxId);
1164       // Move the compaction into place.
1165       if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1166         sf = completeCompaction(filesToCompact, writer);
1167         if (region.getCoprocessorHost() != null) {
1168           region.getCoprocessorHost().postCompact(this, sf, cr);
1169         }
1170       } else {
1171         // Create storefile around what we wrote with a reader on it.
1172         sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
1173           this.family.getBloomFilterType(), this.dataBlockEncoder);
1174         sf.createReader();
1175       }
1176     } finally {
1177       synchronized (filesCompacting) {
1178         filesCompacting.removeAll(filesToCompact);
1179       }
1180     }
1181 
1182     LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
1183         + filesToCompact.size() + " file(s) in " + this + " of "
1184         + this.region.getRegionInfo().getRegionNameAsString()
1185         + " into " +
1186         (sf == null ? "none" : sf.getPath().getName()) +
1187         ", size=" + (sf == null ? "none" :
1188           StringUtils.humanReadableInt(sf.getReader().length()))
1189         + "; total size for store is "
1190         + StringUtils.humanReadableInt(storeSize));
1191     return sf;
1192   }
1193 
1194   /**
1195    * Compact the most recent N files. Used in testing.
1196    */
1197   public void compactRecentForTesting(int N) throws IOException {
1198     List<StoreFile> filesToCompact;
1199     long maxId;
1200     boolean isMajor;
1201 
1202     this.lock.readLock().lock();
1203     try {
1204       synchronized (filesCompacting) {
1205         filesToCompact = Lists.newArrayList(storefiles);
1206         if (!filesCompacting.isEmpty()) {
1207           // exclude all files older than the newest file we're currently
1208           // compacting. this allows us to preserve contiguity (HBASE-2856)
1209           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1210           int idx = filesToCompact.indexOf(last);
1211           Preconditions.checkArgument(idx != -1);
1212           filesToCompact.subList(0, idx + 1).clear();
1213         }
1214         int count = filesToCompact.size();
1215         if (N > count) {
1216           throw new RuntimeException("Not enough files");
1217         }
1218 
1219         filesToCompact = filesToCompact.subList(count - N, count);
1220         maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
1221         isMajor = (filesToCompact.size() == storefiles.size());
1222         filesCompacting.addAll(filesToCompact);
1223         Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1224       }
1225     } finally {
1226       this.lock.readLock().unlock();
1227     }
1228 
1229     try {
1230       // Ready to go. Have list of files to compact.
1231       StoreFile.Writer writer = this.compactor.compactForTesting(this, conf, filesToCompact,
1232         isMajor, maxId);
1233       // Move the compaction into place.
1234       StoreFile sf = completeCompaction(filesToCompact, writer);
1235       if (region.getCoprocessorHost() != null) {
1236         region.getCoprocessorHost().postCompact(this, sf, null);
1237       }
1238     } finally {
1239       synchronized (filesCompacting) {
1240         filesCompacting.removeAll(filesToCompact);
1241       }
1242     }
1243   }
1244 
1245   boolean hasReferences() {
1246     return hasReferences(this.storefiles);
1247   }
1248 
1249   /*
1250    * @param files
1251    * @return True if any of the files in <code>files</code> are References.
1252    */
1253   private boolean hasReferences(Collection<StoreFile> files) {
1254     if (files != null && files.size() > 0) {
1255       for (StoreFile hsf: files) {
1256         if (hsf.isReference()) {
1257           return true;
1258         }
1259       }
1260     }
1261     return false;
1262   }
1263 
1264   /*
1265    * Gets lowest timestamp from candidate StoreFiles
1266    *
1267    * @param fs
1268    * @param dir
1269    * @throws IOException
1270    */
1271   public static long getLowestTimestamp(final List<StoreFile> candidates)
1272       throws IOException {
1273     long minTs = Long.MAX_VALUE;
1274     for (StoreFile storeFile : candidates) {
1275       minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
1276     }
1277     return minTs;
1278   }
1279 
1280   /** getter for CompactionProgress object
1281    * @return CompactionProgress object; can be null
1282    */
1283   public CompactionProgress getCompactionProgress() {
1284     return this.compactor.getProgress();
1285   }
1286 
1287   /*
1288    * @return True if we should run a major compaction.
1289    */
1290   boolean isMajorCompaction() throws IOException {
1291     for (StoreFile sf : this.storefiles) {
1292       if (sf.getReader() == null) {
1293         LOG.debug("StoreFile " + sf + " has null Reader");
1294         return false;
1295       }
1296     }
1297 
1298     List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
1299 
1300     // exclude files above the max compaction threshold
1301     // except: save all references. we MUST compact them
1302     int pos = 0;
1303     while (pos < candidates.size() &&
1304            candidates.get(pos).getReader().length() > this.maxCompactSize &&
1305            !candidates.get(pos).isReference()) ++pos;
1306     candidates.subList(0, pos).clear();
1307 
1308     return isMajorCompaction(candidates);
1309   }
1310 
1311   /*
1312    * @param filesToCompact Files to compact. Can be null.
1313    * @return True if we should run a major compaction.
1314    */
1315   private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
1316     boolean result = false;
1317     long mcTime = getNextMajorCompactTime();
1318     if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
1319       return result;
1320     }
1321     // TODO: Use better method for determining stamp of last major (HBASE-2990)
1322     long lowTimestamp = getLowestTimestamp(filesToCompact);
1323     long now = EnvironmentEdgeManager.currentTimeMillis();
1324     if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
1325       // Major compaction time has elapsed.
1326       if (filesToCompact.size() == 1) {
1327         // Single file
1328         StoreFile sf = filesToCompact.get(0);
1329         long oldest =
1330             (sf.getReader().timeRangeTracker == null) ?
1331                 Long.MIN_VALUE :
1332                 now - sf.getReader().timeRangeTracker.minimumTimestamp;
1333         if (sf.isMajorCompaction() && (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
1334           // if there is only one old store file, only compact if dfs blocks are not local.
1335           float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex(
1336               region.getRegionServerServices().getServerName().getHostname()
1337           );
1338           if (blockLocalityIndex < minStoreFileLocalitySkipCompact) {
1339             if (LOG.isDebugEnabled()) {
1340               LOG.debug("Major compaction triggered on only store " + this +
1341                       "; to make hdfs blocks local, current locality: " + blockLocalityIndex
1342               );
1343             }
1344             result = true;
1345           } else {
1346             if (LOG.isDebugEnabled()) {
1347               LOG.debug("Skipping major compaction of " + this +
1348                   " because one (major) compacted file only and oldestTime " +
1349                   oldest + "ms is < ttl=" + this.ttl);
1350             }
1351           }
1352         } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
1353           if (LOG.isDebugEnabled()) {
1354             LOG.debug("Major compaction triggered on store " + this +
1355                     ", because keyvalues outdated; time since last major compaction " +
1356                     (now - lowTimestamp) + "ms");
1357           }
1358           result = true;
1359         }
1360       } else {
1361         if (LOG.isDebugEnabled()) {
1362           LOG.debug("Major compaction triggered on store " + this +
1363               "; time since last major compaction " + (now - lowTimestamp) + "ms");
1364         }
1365         result = true;
1366       }
1367     }
1368     return result;
1369   }
1370 
1371   long getNextMajorCompactTime() {
1372     // default = 24hrs
1373     long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
1374     if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
1375       String strCompactionTime =
1376         family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
1377       ret = (new Long(strCompactionTime)).longValue();
1378     }
1379 
1380     if (ret > 0) {
1381       // default = 20% = +/- 4.8 hrs
1382       double jitterPct =  conf.getFloat("hbase.hregion.majorcompaction.jitter",
1383           0.20F);
1384       if (jitterPct > 0) {
1385         long jitter = Math.round(ret * jitterPct);
1386         // deterministic jitter avoids a major compaction storm on restart
1387         ImmutableList<StoreFile> snapshot = storefiles;
1388         if (snapshot != null && !snapshot.isEmpty()) {
1389           String seed = snapshot.get(0).getPath().getName();
1390           double curRand = new Random(seed.hashCode()).nextDouble();
1391           ret += jitter - Math.round(2L * jitter * curRand);
1392         } else {
1393           ret = 0; // no storefiles == no major compaction
1394         }
1395       }
1396     }
1397     return ret;
1398   }
1399 
1400   public CompactionRequest requestCompaction() throws IOException {
1401     return requestCompaction(NO_PRIORITY, null);
1402   }
1403 
1404   public CompactionRequest requestCompaction(int priority, CompactionRequest request)
1405       throws IOException {
1406     // don't even select for compaction if writes are disabled
1407     if (!this.region.areWritesEnabled()) {
1408       return null;
1409     }
1410 
1411     this.lock.readLock().lock();
1412     try {
1413       synchronized (filesCompacting) {
1414         // candidates = all storefiles not already in compaction queue
1415         List<StoreFile> candidates = Lists.newArrayList(storefiles);
1416         if (!filesCompacting.isEmpty()) {
1417           // exclude all files older than the newest file we're currently
1418           // compacting. this allows us to preserve contiguity (HBASE-2856)
1419           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1420           int idx = candidates.indexOf(last);
1421           Preconditions.checkArgument(idx != -1);
1422           candidates.subList(0, idx + 1).clear();
1423         }
1424 
1425         boolean override = false;
1426         if (region.getCoprocessorHost() != null) {
1427           override = region.getCoprocessorHost().preCompactSelection(this, candidates, request);
1428         }
1429         CompactSelection filesToCompact;
1430         if (override) {
1431           // coprocessor is overriding normal file selection
1432           filesToCompact = new CompactSelection(conf, candidates);
1433         } else {
1434           filesToCompact = compactSelection(candidates, priority);
1435         }
1436 
1437         if (region.getCoprocessorHost() != null) {
1438           region.getCoprocessorHost().postCompactSelection(this,
1439             ImmutableList.copyOf(filesToCompact.getFilesToCompact()), request);
1440         }
1441 
1442         // no files to compact
1443         if (filesToCompact.getFilesToCompact().isEmpty()) {
1444           return null;
1445         }
1446 
1447         // basic sanity check: do not try to compact the same StoreFile twice.
1448         if (!Collections.disjoint(filesCompacting, filesToCompact.getFilesToCompact())) {
1449           // TODO: change this from an IAE to LOG.error after sufficient testing
1450           Preconditions.checkArgument(false, "%s overlaps with %s",
1451               filesToCompact, filesCompacting);
1452         }
1453         filesCompacting.addAll(filesToCompact.getFilesToCompact());
1454         Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1455 
1456         // major compaction iff all StoreFiles are included
1457         boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
1458         if (isMajor) {
1459           // since we're enqueuing a major, update the compaction wait interval
1460           this.forceMajor = false;
1461         }
1462 
1463         // everything went better than expected. create a compaction request
1464         int pri = getCompactPriority(priority);
1465         //not a special compaction request, so we need to make one
1466         if(request == null){
1467           request = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
1468         } else {
1469           // update the request with what the system thinks the request should be
1470           // its up to the request if it wants to listen
1471           request.setSelection(filesToCompact);
1472           request.setIsMajor(isMajor);
1473           request.setPriority(pri);
1474         }
1475       }
1476     } finally {
1477       this.lock.readLock().unlock();
1478     }
1479     if (request != null) {
1480       CompactionRequest.preRequest(request);
1481     }
1482     return request;
1483   }
1484 
1485   public void finishRequest(CompactionRequest cr) {
1486     CompactionRequest.postRequest(cr);
1487     cr.finishRequest();
1488     synchronized (filesCompacting) {
1489       filesCompacting.removeAll(cr.getFiles());
1490     }
1491   }
1492 
1493   /**
1494    * Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)}
1495    * @param candidates
1496    * @return
1497    * @throws IOException
1498    */
1499   CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
1500     return compactSelection(candidates,NO_PRIORITY);
1501   }
1502 
1503   /**
1504    * Algorithm to choose which files to compact
1505    *
1506    * Configuration knobs:
1507    *  "hbase.hstore.compaction.ratio"
1508    *    normal case: minor compact when file <= sum(smaller_files) * ratio
1509    *  "hbase.hstore.compaction.min.size"
1510    *    unconditionally compact individual files below this size
1511    *  "hbase.hstore.compaction.max.size"
1512    *    never compact individual files above this size (unless splitting)
1513    *  "hbase.hstore.compaction.min"
1514    *    min files needed to minor compact
1515    *  "hbase.hstore.compaction.max"
1516    *    max files to compact at once (avoids OOM)
1517    *
1518    * @param candidates candidate files, ordered from oldest to newest
1519    * @return subset copy of candidate list that meets compaction criteria
1520    * @throws IOException
1521    */
1522   CompactSelection compactSelection(List<StoreFile> candidates, int priority)
1523       throws IOException {
1524     // ASSUMPTION!!! filesCompacting is locked when calling this function
1525 
1526     /* normal skew:
1527      *
1528      *         older ----> newer
1529      *     _
1530      *    | |   _
1531      *    | |  | |   _
1532      *  --|-|- |-|- |-|---_-------_-------  minCompactSize
1533      *    | |  | |  | |  | |  _  | |
1534      *    | |  | |  | |  | | | | | |
1535      *    | |  | |  | |  | | | | | |
1536      */
1537     CompactSelection compactSelection = new CompactSelection(conf, candidates);
1538 
1539     boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
1540     if (!forcemajor) {
1541       // Delete the expired store files before the compaction selection.
1542       if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
1543           && (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
1544         CompactSelection expiredSelection = compactSelection
1545             .selectExpiredStoreFilesToCompact(
1546                 EnvironmentEdgeManager.currentTimeMillis() - this.ttl);
1547 
1548         // If there is any expired store files, delete them  by compaction.
1549         if (expiredSelection != null) {
1550           return expiredSelection;
1551         }
1552       }
1553       // do not compact old files above a configurable threshold
1554       // save all references. we MUST compact them
1555       int pos = 0;
1556       while (pos < compactSelection.getFilesToCompact().size() &&
1557              compactSelection.getFilesToCompact().get(pos).getReader().length()
1558                > maxCompactSize &&
1559              !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
1560       if (pos != 0) compactSelection.clearSubList(0, pos);
1561     }
1562 
1563     if (compactSelection.getFilesToCompact().isEmpty()) {
1564       LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
1565         this + ": no store files to compact");
1566       compactSelection.emptyFileList();
1567       return compactSelection;
1568     }
1569 
1570     // Force a major compaction if this is a user-requested major compaction,
1571     // or if we do not have too many files to compact and this was requested
1572     // as a major compaction
1573     boolean majorcompaction = (forcemajor && priority == PRIORITY_USER) ||
1574       (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
1575       (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
1576     );
1577     LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
1578       this.getColumnFamilyName() + ": Initiating " +
1579       (majorcompaction ? "major" : "minor") + "compaction");
1580 
1581     if (!majorcompaction &&
1582         !hasReferences(compactSelection.getFilesToCompact())) {
1583 
1584       // remove bulk import files that request to be excluded from minors
1585       compactSelection.getFilesToCompact().removeAll(Collections2.filter(
1586           compactSelection.getFilesToCompact(),
1587           new Predicate<StoreFile>() {
1588             public boolean apply(StoreFile input) {
1589               return input.excludeFromMinorCompaction();
1590             }
1591           }));
1592 
1593       // skip selection algorithm if we don't have enough files
1594       if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
1595         if(LOG.isDebugEnabled()) {
1596           LOG.debug("Not compacting files because we only have " +
1597             compactSelection.getFilesToCompact().size() +
1598             " files ready for compaction.  Need " + this.minFilesToCompact + " to initiate.");
1599         }
1600         compactSelection.emptyFileList();
1601         return compactSelection;
1602       }
1603       if (conf.getBoolean("hbase.hstore.useExploringCompation", false)) {
1604         compactSelection = exploringCompactionSelection(compactSelection);
1605       } else {
1606         compactSelection = defaultCompactionSelection(compactSelection);
1607       }
1608     } else {
1609       if(majorcompaction) {
1610         if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
1611           LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
1612             " files, probably because of a user-requested major compaction");
1613           if(priority != PRIORITY_USER) {
1614             LOG.error("Compacting more than max files on a non user-requested compaction");
1615           }
1616         }
1617       } else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
1618         // all files included in this compaction, up to max
1619         int excess = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
1620         LOG.debug("Too many admissible files. Excluding " + excess
1621           + " files from compaction candidates");
1622         candidates.subList(this.maxFilesToCompact, candidates.size()).clear();
1623       }
1624     }
1625     return compactSelection;
1626   }
1627 
1628   private CompactSelection defaultCompactionSelection(CompactSelection compactSelection) {
1629     // we're doing a minor compaction, let's see what files are applicable
1630     int start = 0;
1631 
1632     double r = compactSelection.getCompactSelectionRatio();
1633 
1634     // get store file sizes for incremental compacting selection.
1635     int countOfFiles = compactSelection.getFilesToCompact().size();
1636     long [] fileSizes = new long[countOfFiles];
1637     long [] sumSize = new long[countOfFiles];
1638     for (int i = countOfFiles-1; i >= 0; --i) {
1639       StoreFile file = compactSelection.getFilesToCompact().get(i);
1640       fileSizes[i] = file.getReader().length();
1641       // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
1642       int tooFar = i + this.maxFilesToCompact - 1;
1643       sumSize[i] = fileSizes[i]
1644           + ((i+1    < countOfFiles) ? sumSize[i+1]      : 0)
1645           - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
1646     }
1647 
1648       /* Start at the oldest file and stop when you find the first file that
1649        * meets compaction criteria:
1650        *   (1) a recently-flushed, small file (i.e. <= minCompactSize)
1651        *      OR
1652        *   (2) within the compactRatio of sum(newer_files)
1653        * Given normal skew, any newer files will also meet this criteria
1654        *
1655        * Additional Note:
1656        * If fileSizes.size() >> maxFilesToCompact, we will recurse on
1657        * compact().  Consider the oldest files first to avoid a
1658        * situation where we always compact [end-threshold,end).  Then, the
1659        * last file becomes an aggregate of the previous compactions.
1660        */
1661     while(countOfFiles - start >= this.minFilesToCompact &&
1662         fileSizes[start] >
1663             Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
1664       ++start;
1665     }
1666     int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
1667     long totalSize = fileSizes[start]
1668         + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
1669     compactSelection = compactSelection.getSubList(start, end);
1670 
1671     // if we don't have enough files to compact, just wait
1672     if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
1673       if (LOG.isDebugEnabled()) {
1674         LOG.debug("Skipped compaction of " + this
1675             + ".  Only " + (end - start) + " file(s) of size "
1676             + StringUtils.humanReadableInt(totalSize)
1677             + " have met compaction criteria.");
1678       }
1679       compactSelection.emptyFileList();
1680       return compactSelection;
1681     }
1682     return compactSelection;
1683   }
1684 
1685   private CompactSelection exploringCompactionSelection(CompactSelection compactSelection) {
1686 
1687     List<StoreFile> candidates = compactSelection.getFilesToCompact();
1688     int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
1689     boolean mayBeStuck = (candidates.size() - filesCompacting.size() + futureFiles)
1690         >= blockingStoreFileCount;
1691     // Start off choosing nothing.
1692     List<StoreFile> bestSelection = new ArrayList<StoreFile>(0);
1693     List<StoreFile> smallest = new ArrayList<StoreFile>(0);
1694     long bestSize = 0;
1695     long smallestSize = Long.MAX_VALUE;
1696     double r = compactSelection.getCompactSelectionRatio();
1697 
1698     // Consider every starting place.
1699     for (int startIndex = 0; startIndex < candidates.size(); startIndex++) {
1700       // Consider every different sub list permutation in between start and end with min files.
1701       for (int currentEnd = startIndex + minFilesToCompact - 1;
1702            currentEnd < candidates.size(); currentEnd++) {
1703         List<StoreFile> potentialMatchFiles = candidates.subList(startIndex, currentEnd + 1);
1704 
1705         // Sanity checks
1706         if (potentialMatchFiles.size() < minFilesToCompact) {
1707           continue;
1708         }
1709         if (potentialMatchFiles.size() > maxFilesToCompact) {
1710           continue;
1711         }
1712 
1713         // Compute the total size of files that will
1714         // have to be read if this set of files is compacted.
1715         long size = getCompactionSize(potentialMatchFiles);
1716 
1717         // Store the smallest set of files.  This stored set of files will be used
1718         // if it looks like the algorithm is stuck.
1719         if (size < smallestSize) {
1720           smallest = potentialMatchFiles;
1721           smallestSize = size;
1722         }
1723 
1724         if (size >= minCompactSize
1725             && !filesInRatio(potentialMatchFiles, r)) {
1726           continue;
1727         }
1728 
1729         if (size > maxCompactSize) {
1730           continue;
1731         }
1732 
1733         // Keep if this gets rid of more files.  Or the same number of files for less io.
1734         if (potentialMatchFiles.size() > bestSelection.size()
1735             || (potentialMatchFiles.size() == bestSelection.size() && size < bestSize)) {
1736           bestSelection = potentialMatchFiles;
1737           bestSize = size;
1738         }
1739       }
1740     }
1741 
1742     if (bestSelection.size() == 0 && mayBeStuck) {
1743       smallest = new ArrayList<StoreFile>(smallest);
1744       compactSelection.getFilesToCompact().clear();
1745       compactSelection.getFilesToCompact().addAll(smallest);
1746     } else {
1747       bestSelection = new ArrayList<StoreFile>(bestSelection);
1748       compactSelection.getFilesToCompact().clear();
1749       compactSelection.getFilesToCompact().addAll(bestSelection);
1750     }
1751 
1752     return compactSelection;
1753 
1754   }
1755 
1756   /**
1757    * Check that all files satisfy the ratio
1758    *
1759    * @param files set of files to examine.
1760    * @param currentRatio The raio
1761    * @return if all files are in ratio.
1762    */
1763   private boolean filesInRatio(final List<StoreFile> files, final double currentRatio) {
1764     if (files.size() < 2) {
1765       return true;
1766     }
1767     long totalFileSize = 0;
1768     for (int i = 0; i < files.size(); i++) {
1769       totalFileSize += files.get(i).getReader().length();
1770     }
1771     for (int i = 0; i < files.size(); i++) {
1772       long singleFileSize = files.get(i).getReader().length();
1773       long sumAllOtherFilesize = totalFileSize - singleFileSize;
1774 
1775       if ((singleFileSize > sumAllOtherFilesize * currentRatio)
1776           && (sumAllOtherFilesize >= this.minCompactSize)) {
1777         return false;
1778       }
1779     }
1780     return true;
1781   }
1782 
1783   /**
1784    * Get the number of bytes a proposed compaction would have to read.
1785    *
1786    * @param files Set of files in a proposed compaction.
1787    * @return size in bytes.
1788    */
1789   private long getCompactionSize(final List<StoreFile> files) {
1790     long size = 0;
1791     if (files == null) {
1792       return size;
1793     }
1794     for (StoreFile f : files) {
1795       size += f.getReader().length();
1796     }
1797     return size;
1798   }
1799 
1800   /**
1801    * Validates a store file by opening and closing it. In HFileV2 this should
1802    * not be an expensive operation.
1803    *
1804    * @param path the path to the store file
1805    */
1806   private void validateStoreFile(Path path)
1807       throws IOException {
1808     StoreFile storeFile = null;
1809     try {
1810       storeFile = new StoreFile(this.fs, path, this.conf,
1811           this.cacheConf, this.family.getBloomFilterType(),
1812           NoOpDataBlockEncoder.INSTANCE);
1813       passSchemaMetricsTo(storeFile);
1814       storeFile.createReader();
1815     } catch (IOException e) {
1816       LOG.error("Failed to open store file : " + path
1817           + ", keeping it in tmp location", e);
1818       throw e;
1819     } finally {
1820       if (storeFile != null) {
1821         storeFile.closeReader(false);
1822       }
1823     }
1824   }
1825 
1826   /*
1827    * <p>It works by processing a compaction that's been written to disk.
1828    *
1829    * <p>It is usually invoked at the end of a compaction, but might also be
1830    * invoked at HStore startup, if the prior execution died midway through.
1831    *
1832    * <p>Moving the compacted TreeMap into place means:
1833    * <pre>
1834    * 1) Moving the new compacted StoreFile into place
1835    * 2) Unload all replaced StoreFile, close and collect list to delete.
1836    * 3) Loading the new TreeMap.
1837    * 4) Compute new store size
1838    * </pre>
1839    *
1840    * @param compactedFiles list of files that were compacted
1841    * @param compactedFile StoreFile that is the result of the compaction
1842    * @return StoreFile created. May be null.
1843    * @throws IOException
1844    */
1845   StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
1846                                        final StoreFile.Writer compactedFile)
1847       throws IOException {
1848     // 1. Moving the new files into place -- if there is a new file (may not
1849     // be if all cells were expired or deleted).
1850     StoreFile result = null;
1851     if (compactedFile != null) {
1852       validateStoreFile(compactedFile.getPath());
1853       // Move the file into the right spot
1854       Path origPath = compactedFile.getPath();
1855       Path destPath = new Path(homedir, origPath.getName());
1856       LOG.info("Renaming compacted file at " + origPath + " to " + destPath);
1857       if (!HBaseFileSystem.renameDirForFileSystem(fs, origPath, destPath)) {
1858         LOG.error("Failed move of compacted file " + origPath + " to " +
1859             destPath);
1860         throw new IOException("Failed move of compacted file " + origPath +
1861             " to " + destPath);
1862       }
1863       result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
1864           this.family.getBloomFilterType(), this.dataBlockEncoder);
1865       passSchemaMetricsTo(result);
1866       result.createReader();
1867     }
1868     try {
1869       this.lock.writeLock().lock();
1870       try {
1871         // Change this.storefiles so it reflects new state but do not
1872         // delete old store files until we have sent out notification of
1873         // change in case old files are still being accessed by outstanding
1874         // scanners.
1875         ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
1876         newStoreFiles.removeAll(compactedFiles);
1877         filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
1878 
1879         // If a StoreFile result, move it into place.  May be null.
1880         if (result != null) {
1881           newStoreFiles.add(result);
1882         }
1883 
1884         this.storefiles = sortAndClone(newStoreFiles);
1885       } finally {
1886         // We need the lock, as long as we are updating the storefiles
1887         // or changing the memstore. Let us release it before calling
1888         // notifyChangeReadersObservers. See HBASE-4485 for a possible
1889         // deadlock scenario that could have happened if continue to hold
1890         // the lock.
1891         this.lock.writeLock().unlock();
1892       }
1893 
1894       // Tell observers that list of StoreFiles has changed.
1895       notifyChangedReadersObservers();
1896 
1897       // let the archive util decide if we should archive or delete the files
1898       LOG.debug("Removing store files after compaction...");
1899       HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.region, this.family.getName(),
1900         compactedFiles);
1901 
1902     } catch (IOException e) {
1903       e = RemoteExceptionHandler.checkIOException(e);
1904       LOG.error("Failed replacing compacted files in " + this +
1905         ". Compacted file is " + (result == null? "none": result.toString()) +
1906         ".  Files replaced " + compactedFiles.toString() +
1907         " some of which may have been already removed", e);
1908     }
1909 
1910     // 4. Compute new store size
1911     this.storeSize = 0L;
1912     this.totalUncompressedBytes = 0L;
1913     for (StoreFile hsf : this.storefiles) {
1914       StoreFile.Reader r = hsf.getReader();
1915       if (r == null) {
1916         LOG.warn("StoreFile " + hsf + " has a null Reader");
1917         continue;
1918       }
1919       this.storeSize += r.length();
1920       this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1921     }
1922     return result;
1923   }
1924 
1925   public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
1926     Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
1927     ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
1928     return newList;
1929   }
1930 
1931   // ////////////////////////////////////////////////////////////////////////////
1932   // Accessors.
1933   // (This is the only section that is directly useful!)
1934   //////////////////////////////////////////////////////////////////////////////
1935   /**
1936    * @return the number of files in this store
1937    */
1938   public int getNumberOfStoreFiles() {
1939     return this.storefiles.size();
1940   }
1941 
1942   /*
1943    * @param wantedVersions How many versions were asked for.
1944    * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1945    */
1946   int versionsToReturn(final int wantedVersions) {
1947     if (wantedVersions <= 0) {
1948       throw new IllegalArgumentException("Number of versions must be > 0");
1949     }
1950     // Make sure we do not return more than maximum versions for this store.
1951     int maxVersions = this.family.getMaxVersions();
1952     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1953   }
1954 
1955   static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
1956     return key.getTimestamp() < oldestTimestamp;
1957   }
1958 
1959   /**
1960    * Find the key that matches <i>row</i> exactly, or the one that immediately
1961    * precedes it. WARNING: Only use this method on a table where writes occur
1962    * with strictly increasing timestamps. This method assumes this pattern of
1963    * writes in order to make it reasonably performant.  Also our search is
1964    * dependent on the axiom that deletes are for cells that are in the container
1965    * that follows whether a memstore snapshot or a storefile, not for the
1966    * current container: i.e. we'll see deletes before we come across cells we
1967    * are to delete. Presumption is that the memstore#kvset is processed before
1968    * memstore#snapshot and so on.
1969    * @param row The row key of the targeted row.
1970    * @return Found keyvalue or null if none found.
1971    * @throws IOException
1972    */
1973   KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException {
1974     // If minVersions is set, we will not ignore expired KVs.
1975     // As we're only looking for the latest matches, that should be OK.
1976     // With minVersions > 0 we guarantee that any KV that has any version
1977     // at all (expired or not) has at least one version that will not expire.
1978     // Note that this method used to take a KeyValue as arguments. KeyValue
1979     // can be back-dated, a row key cannot.
1980     long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.ttl;
1981 
1982     KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1983 
1984     GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1985       this.comparator, kv, ttlToUse, this.region.getRegionInfo().isMetaRegion());
1986     this.lock.readLock().lock();
1987     try {
1988       // First go to the memstore.  Pick up deletes and candidates.
1989       this.memstore.getRowKeyAtOrBefore(state);
1990       // Check if match, if we got a candidate on the asked for 'kv' row.
1991       // Process each store file. Run through from newest to oldest.
1992       for (StoreFile sf : Lists.reverse(storefiles)) {
1993         // Update the candidate keys from the current map file
1994         rowAtOrBeforeFromStoreFile(sf, state);
1995       }
1996       return state.getCandidate();
1997     } finally {
1998       this.lock.readLock().unlock();
1999     }
2000   }
2001 
2002   /*
2003    * Check an individual MapFile for the row at or before a given row.
2004    * @param f
2005    * @param state
2006    * @throws IOException
2007    */
2008   private void rowAtOrBeforeFromStoreFile(final StoreFile f,
2009                                           final GetClosestRowBeforeTracker state)
2010       throws IOException {
2011     StoreFile.Reader r = f.getReader();
2012     if (r == null) {
2013       LOG.warn("StoreFile " + f + " has a null Reader");
2014       return;
2015     }
2016     if (r.getEntries() == 0) {
2017       LOG.warn("StoreFile " + f + " is a empty store file");
2018       return;
2019     }
2020     // TODO: Cache these keys rather than make each time?
2021     byte [] fk = r.getFirstKey();
2022     if (fk == null) return;
2023     KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
2024     byte [] lk = r.getLastKey();
2025     KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
2026     KeyValue firstOnRow = state.getTargetKey();
2027     if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
2028       // If last key in file is not of the target table, no candidates in this
2029       // file.  Return.
2030       if (!state.isTargetTable(lastKV)) return;
2031       // If the row we're looking for is past the end of file, set search key to
2032       // last key. TODO: Cache last and first key rather than make each time.
2033       firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
2034     }
2035     // Get a scanner that caches blocks and that uses pread.
2036     HFileScanner scanner = r.getScanner(true, true, false);
2037     // Seek scanner.  If can't seek it, return.
2038     if (!seekToScanner(scanner, firstOnRow, firstKV)) return;
2039     // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
2040     // Unlikely that there'll be an instance of actual first row in table.
2041     if (walkForwardInSingleRow(scanner, firstOnRow, state)) return;
2042     // If here, need to start backing up.
2043     while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
2044        firstOnRow.getKeyLength())) {
2045       KeyValue kv = scanner.getKeyValue();
2046       if (!state.isTargetTable(kv)) break;
2047       if (!state.isBetterCandidate(kv)) break;
2048       // Make new first on row.
2049       firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
2050       // Seek scanner.  If can't seek it, break.
2051       if (!seekToScanner(scanner, firstOnRow, firstKV)) break;
2052       // If we find something, break;
2053       if (walkForwardInSingleRow(scanner, firstOnRow, state)) break;
2054     }
2055   }
2056 
2057   /*
2058    * Seek the file scanner to firstOnRow or first entry in file.
2059    * @param scanner
2060    * @param firstOnRow
2061    * @param firstKV
2062    * @return True if we successfully seeked scanner.
2063    * @throws IOException
2064    */
2065   private boolean seekToScanner(final HFileScanner scanner,
2066                                 final KeyValue firstOnRow,
2067                                 final KeyValue firstKV)
2068       throws IOException {
2069     KeyValue kv = firstOnRow;
2070     // If firstOnRow < firstKV, set to firstKV
2071     if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
2072     int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(),
2073       kv.getKeyLength());
2074     return result >= 0;
2075   }
2076 
2077   /*
2078    * When we come in here, we are probably at the kv just before we break into
2079    * the row that firstOnRow is on.  Usually need to increment one time to get
2080    * on to the row we are interested in.
2081    * @param scanner
2082    * @param firstOnRow
2083    * @param state
2084    * @return True we found a candidate.
2085    * @throws IOException
2086    */
2087   private boolean walkForwardInSingleRow(final HFileScanner scanner,
2088                                          final KeyValue firstOnRow,
2089                                          final GetClosestRowBeforeTracker state)
2090       throws IOException {
2091     boolean foundCandidate = false;
2092     do {
2093       KeyValue kv = scanner.getKeyValue();
2094       // If we are not in the row, skip.
2095       if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
2096       // Did we go beyond the target row? If so break.
2097       if (state.isTooFar(kv, firstOnRow)) break;
2098       if (state.isExpired(kv)) {
2099         continue;
2100       }
2101       // If we added something, this row is a contender. break.
2102       if (state.handle(kv)) {
2103         foundCandidate = true;
2104         break;
2105       }
2106     } while(scanner.next());
2107     return foundCandidate;
2108   }
2109 
2110   public boolean canSplit() {
2111     this.lock.readLock().lock();
2112     try {
2113       // Not splitable if we find a reference store file present in the store.
2114       for (StoreFile sf : storefiles) {
2115         if (sf.isReference()) {
2116           if (LOG.isDebugEnabled()) {
2117             LOG.debug(sf + " is not splittable");
2118           }
2119           return false;
2120         }
2121       }
2122 
2123       return true;
2124     } finally {
2125       this.lock.readLock().unlock();
2126     }
2127   }
2128   /**
2129    * Determines if Store should be split
2130    * @return byte[] if store should be split, null otherwise.
2131    */
2132   public byte[] getSplitPoint() {
2133     this.lock.readLock().lock();
2134     try {
2135       // sanity checks
2136       if (this.storefiles.isEmpty()) {
2137         return null;
2138       }
2139       // Should already be enforced by the split policy!
2140       assert !this.region.getRegionInfo().isMetaRegion();
2141 
2142       // Not splitable if we find a reference store file present in the store.
2143       long maxSize = 0L;
2144       StoreFile largestSf = null;
2145       for (StoreFile sf : storefiles) {
2146         if (sf.isReference()) {
2147           // Should already be enforced since we return false in this case
2148           return null;
2149         }
2150 
2151         StoreFile.Reader r = sf.getReader();
2152         if (r == null) {
2153           LOG.warn("Storefile " + sf + " Reader is null");
2154           continue;
2155         }
2156 
2157         long size = r.length();
2158         if (size > maxSize) {
2159           // This is the largest one so far
2160           maxSize = size;
2161           largestSf = sf;
2162         }
2163       }
2164 
2165       StoreFile.Reader r = largestSf.getReader();
2166       if (r == null) {
2167         LOG.warn("Storefile " + largestSf + " Reader is null");
2168         return null;
2169       }
2170       // Get first, last, and mid keys.  Midkey is the key that starts block
2171       // in middle of hfile.  Has column and timestamp.  Need to return just
2172       // the row we want to split on as midkey.
2173       byte [] midkey = r.midkey();
2174       if (midkey != null) {
2175         KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
2176         byte [] fk = r.getFirstKey();
2177         KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
2178         byte [] lk = r.getLastKey();
2179         KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
2180         // if the midkey is the same as the first or last keys, then we cannot
2181         // (ever) split this region.
2182         if (this.comparator.compareRows(mk, firstKey) == 0 ||
2183             this.comparator.compareRows(mk, lastKey) == 0) {
2184           if (LOG.isDebugEnabled()) {
2185             LOG.debug("cannot split because midkey is the same as first or " +
2186               "last row");
2187           }
2188           return null;
2189         }
2190         return mk.getRow();
2191       }
2192     } catch(IOException e) {
2193       LOG.warn("Failed getting store size for " + this, e);
2194     } finally {
2195       this.lock.readLock().unlock();
2196     }
2197     return null;
2198   }
2199 
2200   /** @return aggregate size of all HStores used in the last compaction */
2201   public long getLastCompactSize() {
2202     return this.lastCompactSize;
2203   }
2204 
2205   /** @return aggregate size of HStore */
2206   public long getSize() {
2207     return storeSize;
2208   }
2209 
2210   public void triggerMajorCompaction() {
2211     this.forceMajor = true;
2212   }
2213 
2214   boolean getForceMajorCompaction() {
2215     return this.forceMajor;
2216   }
2217 
2218   //////////////////////////////////////////////////////////////////////////////
2219   // File administration
2220   //////////////////////////////////////////////////////////////////////////////
2221 
2222   /**
2223    * Return a scanner for both the memstore and the HStore files. Assumes we
2224    * are not in a compaction.
2225    * @throws IOException
2226    */
2227   public KeyValueScanner getScanner(Scan scan,
2228       final NavigableSet<byte []> targetCols) throws IOException {
2229     lock.readLock().lock();
2230     try {
2231       KeyValueScanner scanner = null;
2232       if (getHRegion().getCoprocessorHost() != null) {
2233         scanner = getHRegion().getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
2234       }
2235       if (scanner == null) {
2236         scanner = new StoreScanner(this, getScanInfo(), scan, targetCols);
2237       }
2238       return scanner;
2239     } finally {
2240       lock.readLock().unlock();
2241     }
2242   }
2243 
2244   @Override
2245   public String toString() {
2246     return getColumnFamilyName();
2247   }
2248 
2249   /**
2250    * @return Count of store files
2251    */
2252   int getStorefilesCount() {
2253     return this.storefiles.size();
2254   }
2255 
2256   /**
2257    * @return The size of the store files, in bytes, uncompressed.
2258    */
2259   long getStoreSizeUncompressed() {
2260     return this.totalUncompressedBytes;
2261   }
2262 
2263   /**
2264    * @return The size of the store files, in bytes.
2265    */
2266   long getStorefilesSize() {
2267     long size = 0;
2268     for (StoreFile s: storefiles) {
2269       StoreFile.Reader r = s.getReader();
2270       if (r == null) {
2271         LOG.warn("StoreFile " + s + " has a null Reader");
2272         continue;
2273       }
2274       size += r.length();
2275     }
2276     return size;
2277   }
2278 
2279   /**
2280    * @return The size of the store file indexes, in bytes.
2281    */
2282   long getStorefilesIndexSize() {
2283     long size = 0;
2284     for (StoreFile s: storefiles) {
2285       StoreFile.Reader r = s.getReader();
2286       if (r == null) {
2287         LOG.warn("StoreFile " + s + " has a null Reader");
2288         continue;
2289       }
2290       size += r.indexSize();
2291     }
2292     return size;
2293   }
2294 
2295   /**
2296    * Returns the total size of all index blocks in the data block indexes,
2297    * including the root level, intermediate levels, and the leaf level for
2298    * multi-level indexes, or just the root level for single-level indexes.
2299    *
2300    * @return the total size of block indexes in the store
2301    */
2302   long getTotalStaticIndexSize() {
2303     long size = 0;
2304     for (StoreFile s : storefiles) {
2305       size += s.getReader().getUncompressedDataIndexSize();
2306     }
2307     return size;
2308   }
2309 
2310   /**
2311    * Returns the total byte size of all Bloom filter bit arrays. For compound
2312    * Bloom filters even the Bloom blocks currently not loaded into the block
2313    * cache are counted.
2314    *
2315    * @return the total size of all Bloom filters in the store
2316    */
2317   long getTotalStaticBloomSize() {
2318     long size = 0;
2319     for (StoreFile s : storefiles) {
2320       StoreFile.Reader r = s.getReader();
2321       size += r.getTotalBloomSize();
2322     }
2323     return size;
2324   }
2325 
2326   /**
2327    * @return The size of this store's memstore, in bytes
2328    */
2329   long getMemStoreSize() {
2330     return this.memstore.heapSize();
2331   }
2332 
2333   public int getCompactPriority() {
2334     return getCompactPriority(NO_PRIORITY);
2335   }
2336 
2337   /**
2338    * @return The priority that this store should have in the compaction queue
2339    * @param priority
2340    */
2341   public int getCompactPriority(int priority) {
2342     // If this is a user-requested compaction, leave this at the highest priority
2343     if(priority == PRIORITY_USER) {
2344       return PRIORITY_USER;
2345     } else {
2346       return this.blockingStoreFileCount - this.storefiles.size();
2347     }
2348   }
2349 
2350   boolean throttleCompaction(long compactionSize) {
2351     long throttlePoint = conf.getLong(
2352         "hbase.regionserver.thread.compaction.throttle",
2353         2 * this.minFilesToCompact * this.region.memstoreFlushSize);
2354     return compactionSize > throttlePoint;
2355   }
2356 
2357   public HRegion getHRegion() {
2358     return this.region;
2359   }
2360 
2361   HRegionInfo getHRegionInfo() {
2362     return this.region.getRegionInfo();
2363   }
2364 
2365   /**
2366    * Increments the value for the given row/family/qualifier.
2367    *
2368    * This function will always be seen as atomic by other readers
2369    * because it only puts a single KV to memstore. Thus no
2370    * read/write control necessary.
2371    *
2372    * @param row
2373    * @param f
2374    * @param qualifier
2375    * @param newValue the new value to set into memstore
2376    * @return memstore size delta
2377    * @throws IOException
2378    */
2379   public long updateColumnValue(byte [] row, byte [] f,
2380                                 byte [] qualifier, long newValue)
2381       throws IOException {
2382 
2383     this.lock.readLock().lock();
2384     try {
2385       long now = EnvironmentEdgeManager.currentTimeMillis();
2386 
2387       return this.memstore.updateColumnValue(row,
2388           f,
2389           qualifier,
2390           newValue,
2391           now);
2392 
2393     } finally {
2394       this.lock.readLock().unlock();
2395     }
2396   }
2397 
2398   /**
2399    * Adds or replaces the specified KeyValues.
2400    * <p>
2401    * For each KeyValue specified, if a cell with the same row, family, and
2402    * qualifier exists in MemStore, it will be replaced.  Otherwise, it will just
2403    * be inserted to MemStore.
2404    * <p>
2405    * This operation is atomic on each KeyValue (row/family/qualifier) but not
2406    * necessarily atomic across all of them.
2407    * @param kvs
2408    * @return memstore size delta
2409    * @throws IOException
2410    */
2411   public long upsert(List<KeyValue> kvs)
2412       throws IOException {
2413     this.lock.readLock().lock();
2414     try {
2415       // TODO: Make this operation atomic w/ MVCC
2416       return this.memstore.upsert(kvs);
2417     } finally {
2418       this.lock.readLock().unlock();
2419     }
2420   }
2421 
2422   public StoreFlusher getStoreFlusher(long cacheFlushId) {
2423     return new StoreFlusherImpl(cacheFlushId);
2424   }
2425 
2426   private class StoreFlusherImpl implements StoreFlusher {
2427 
2428     private long cacheFlushId;
2429     private SortedSet<KeyValue> snapshot;
2430     private StoreFile storeFile;
2431     private Path storeFilePath;
2432     private TimeRangeTracker snapshotTimeRangeTracker;
2433     private AtomicLong flushedSize;
2434 
2435     private StoreFlusherImpl(long cacheFlushId) {
2436       this.cacheFlushId = cacheFlushId;
2437       this.flushedSize = new AtomicLong();
2438     }
2439 
2440     @Override
2441     public void prepare() {
2442       memstore.snapshot();
2443       this.snapshot = memstore.getSnapshot();
2444       this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
2445     }
2446 
2447     @Override
2448     public void flushCache(MonitoredTask status) throws IOException {
2449       storeFilePath = Store.this.flushCache(
2450         cacheFlushId, snapshot, snapshotTimeRangeTracker, flushedSize, status);
2451     }
2452 
2453     @Override
2454     public boolean commit(MonitoredTask status) throws IOException {
2455       if (storeFilePath == null) {
2456         return false;
2457       }
2458       storeFile = Store.this.commitFile(storeFilePath, cacheFlushId,
2459                                snapshotTimeRangeTracker, flushedSize, status);
2460       if (Store.this.getHRegion().getCoprocessorHost() != null) {
2461         Store.this.getHRegion()
2462             .getCoprocessorHost()
2463             .postFlush(Store.this, storeFile);
2464       }
2465 
2466       // Add new file to store files.  Clear snapshot too while we have
2467       // the Store write lock.
2468       return Store.this.updateStorefiles(storeFile, snapshot);
2469     }
2470   }
2471 
2472   /**
2473    * See if there's too much store files in this store
2474    * @return true if number of store files is greater than
2475    *  the number defined in minFilesToCompact
2476    */
2477   public boolean needsCompaction() {
2478     return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
2479   }
2480 
2481   /**
2482    * Used for tests. Get the cache configuration for this Store.
2483    */
2484   public CacheConfig getCacheConfig() {
2485     return this.cacheConf;
2486   }
2487 
2488   public static final long FIXED_OVERHEAD =
2489       ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
2490           + (17 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
2491           + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN + Bytes.SIZEOF_FLOAT);
2492 
2493   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2494       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2495       + ClassSize.CONCURRENT_SKIPLISTMAP
2496       + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2497       + ScanInfo.FIXED_OVERHEAD);
2498 
2499   @Override
2500   public long heapSize() {
2501     return DEEP_OVERHEAD + this.memstore.heapSize();
2502   }
2503 
2504   public KeyValue.KVComparator getComparator() {
2505     return comparator;
2506   }
2507 
2508   public ScanInfo getScanInfo() {
2509     return scanInfo;
2510   }
2511   
2512   public boolean hasTooManyStoreFiles() {
2513     return getStorefilesCount() > this.blockingFileCount;
2514   }
2515 
2516   /**
2517    * Immutable information for scans over a store.
2518    */
2519   public static class ScanInfo {
2520     private byte[] family;
2521     private int minVersions;
2522     private int maxVersions;
2523     private long ttl;
2524     private boolean keepDeletedCells;
2525     private long timeToPurgeDeletes;
2526     private KVComparator comparator;
2527 
2528     public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
2529         + (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT)
2530         + Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN);
2531 
2532     /**
2533      * @param family {@link HColumnDescriptor} describing the column family
2534      * @param ttl Store's TTL (in ms)
2535      * @param timeToPurgeDeletes duration in ms after which a delete marker can
2536      *        be purged during a major compaction.
2537      * @param comparator The store's comparator
2538      */
2539     public ScanInfo(HColumnDescriptor family, long ttl, long timeToPurgeDeletes, KVComparator comparator) {
2540       this(family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family
2541           .getKeepDeletedCells(), timeToPurgeDeletes, comparator);
2542     }
2543     /**
2544      * @param family Name of this store's column family
2545      * @param minVersions Store's MIN_VERSIONS setting
2546      * @param maxVersions Store's VERSIONS setting
2547      * @param ttl Store's TTL (in ms)
2548      * @param timeToPurgeDeletes duration in ms after which a delete marker can
2549      *        be purged during a major compaction.
2550      * @param keepDeletedCells Store's keepDeletedCells setting
2551      * @param comparator The store's comparator
2552      */
2553     public ScanInfo(byte[] family, int minVersions, int maxVersions, long ttl,
2554         boolean keepDeletedCells, long timeToPurgeDeletes,
2555         KVComparator comparator) {
2556 
2557       this.family = family;
2558       this.minVersions = minVersions;
2559       this.maxVersions = maxVersions;
2560       this.ttl = ttl;
2561       this.keepDeletedCells = keepDeletedCells;
2562       this.timeToPurgeDeletes = timeToPurgeDeletes;
2563       this.comparator = comparator;
2564     }
2565 
2566     public byte[] getFamily() {
2567       return family;
2568     }
2569 
2570     public int getMinVersions() {
2571       return minVersions;
2572     }
2573 
2574     public int getMaxVersions() {
2575       return maxVersions;
2576     }
2577 
2578     public long getTtl() {
2579       return ttl;
2580     }
2581 
2582     public boolean getKeepDeletedCells() {
2583       return keepDeletedCells;
2584     }
2585 
2586     public long getTimeToPurgeDeletes() {
2587       return timeToPurgeDeletes;
2588     }
2589 
2590     public KVComparator getComparator() {
2591       return comparator;
2592     }
2593   }
2594 
2595 }