View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.base.Preconditions;
23  import com.google.common.collect.ImmutableCollection;
24  import com.google.common.collect.ImmutableList;
25  import com.google.common.collect.Lists;
26  import com.google.common.collect.Sets;
27  
28  import java.io.IOException;
29  import java.io.InterruptedIOException;
30  import java.net.InetSocketAddress;
31  import java.security.PrivilegedExceptionAction;
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.HashSet;
37  import java.util.Iterator;
38  import java.util.List;
39  import java.util.NavigableSet;
40  import java.util.Set;
41  import java.util.concurrent.Callable;
42  import java.util.concurrent.CompletionService;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ExecutionException;
45  import java.util.concurrent.ExecutorCompletionService;
46  import java.util.concurrent.Future;
47  import java.util.concurrent.ThreadPoolExecutor;
48  import java.util.concurrent.atomic.AtomicBoolean;
49  import java.util.concurrent.locks.ReentrantReadWriteLock;
50  
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.hbase.Cell;
57  import org.apache.hadoop.hbase.CellComparator;
58  import org.apache.hadoop.hbase.CellUtil;
59  import org.apache.hadoop.hbase.CompoundConfiguration;
60  import org.apache.hadoop.hbase.HColumnDescriptor;
61  import org.apache.hadoop.hbase.HConstants;
62  import org.apache.hadoop.hbase.HRegionInfo;
63  import org.apache.hadoop.hbase.KeyValue;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.Tag;
66  import org.apache.hadoop.hbase.TagType;
67  import org.apache.hadoop.hbase.TagUtil;
68  import org.apache.hadoop.hbase.classification.InterfaceAudience;
69  import org.apache.hadoop.hbase.client.Scan;
70  import org.apache.hadoop.hbase.conf.ConfigurationManager;
71  import org.apache.hadoop.hbase.io.compress.Compression;
72  import org.apache.hadoop.hbase.io.crypto.Encryption;
73  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
74  import org.apache.hadoop.hbase.io.hfile.HFile;
75  import org.apache.hadoop.hbase.io.hfile.HFileContext;
76  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
77  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
78  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
79  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
80  import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
81  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
82  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
83  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
84  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
85  import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
86  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
87  import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
88  import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
89  import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
90  import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
91  import org.apache.hadoop.hbase.security.EncryptionUtil;
92  import org.apache.hadoop.hbase.security.User;
93  import org.apache.hadoop.hbase.util.Bytes;
94  import org.apache.hadoop.hbase.util.ChecksumType;
95  import org.apache.hadoop.hbase.util.ClassSize;
96  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
97  import org.apache.hadoop.hbase.util.ReflectionUtils;
98  import org.apache.hadoop.util.StringUtils;
99  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
100 
101 /**
102  * A Store holds a column family in a Region.  Its a memstore and a set of zero
103  * or more StoreFiles, which stretch backwards over time.
104  *
105  * <p>There's no reason to consider append-logging at this level; all logging
106  * and locking is handled at the HRegion level.  Store just provides
107  * services to manage sets of StoreFiles.  One of the most important of those
108  * services is compaction services where files are aggregated once they pass
109  * a configurable threshold.
110  *
111  * <p>Locking and transactions are handled at a higher level.  This API should
112  * not be called directly but by an HRegion manager.
113  */
114 @InterfaceAudience.Private
115 public class HStore implements Store {
116   private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
117   public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
118       "hbase.server.compactchecker.interval.multiplier";
119   public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
120   public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
121   public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
122 
123   private static final Log LOG = LogFactory.getLog(HStore.class);
124 
125   protected final MemStore memstore;
126   // This stores directory in the filesystem.
127   protected final HRegion region;
128   private final HColumnDescriptor family;
129   private final HRegionFileSystem fs;
130   protected Configuration conf;
131   protected CacheConfig cacheConf;
132   private long lastCompactSize = 0;
133   volatile boolean forceMajor = false;
134   /* how many bytes to write between status checks */
135   static int closeCheckInterval = 0;
136   private volatile long storeSize = 0L;
137   private volatile long totalUncompressedBytes = 0L;
138 
139   /**
140    * RWLock for store operations.
141    * Locked in shared mode when the list of component stores is looked at:
142    *   - all reads/writes to table data
143    *   - checking for split
144    * Locked in exclusive mode when the list of component stores is modified:
145    *   - closing
146    *   - completing a compaction
147    */
148   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
149   private final boolean verifyBulkLoads;
150 
151   private ScanInfo scanInfo;
152 
153   // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
154   final List<StoreFile> filesCompacting = Lists.newArrayList();
155 
156   // All access must be synchronized.
157   private final Set<ChangedReadersObserver> changedReaderObservers =
158     Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
159 
160   protected final int blocksize;
161   private HFileDataBlockEncoder dataBlockEncoder;
162 
163   /** Checksum configuration */
164   protected ChecksumType checksumType;
165   protected int bytesPerChecksum;
166 
167   // Comparing KeyValues
168   private final CellComparator comparator;
169 
170   final StoreEngine<?, ?, ?, ?> storeEngine;
171 
172   private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
173   private volatile OffPeakHours offPeakHours;
174 
175   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
176   private int flushRetriesNumber;
177   private int pauseTime;
178 
179   private long blockingFileCount;
180   private int compactionCheckMultiplier;
181   protected Encryption.Context cryptoContext = Encryption.Context.NONE;
182 
183   private volatile long flushedCellsCount = 0;
184   private volatile long compactedCellsCount = 0;
185   private volatile long majorCompactedCellsCount = 0;
186   private volatile long flushedCellsSize = 0;
187   private volatile long flushedOutputFileSize = 0;
188   private volatile long compactedCellsSize = 0;
189   private volatile long majorCompactedCellsSize = 0;
190 
191   /**
192    * Constructor
193    * @param region
194    * @param family HColumnDescriptor for this column
195    * @param confParam configuration object
196    * failed.  Can be null.
197    * @throws IOException
198    */
199   protected HStore(final HRegion region, final HColumnDescriptor family,
200       final Configuration confParam) throws IOException {
201 
202     this.fs = region.getRegionFileSystem();
203 
204     // Assemble the store's home directory and Ensure it exists.
205     fs.createStoreDir(family.getNameAsString());
206     this.region = region;
207     this.family = family;
208     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
209     // CompoundConfiguration will look for keys in reverse order of addition, so we'd
210     // add global config first, then table and cf overrides, then cf metadata.
211     this.conf = new CompoundConfiguration()
212       .add(confParam)
213       .addStringMap(region.getTableDesc().getConfiguration())
214       .addStringMap(family.getConfiguration())
215       .addBytesMap(family.getValues());
216     this.blocksize = family.getBlocksize();
217 
218     this.dataBlockEncoder =
219         new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
220 
221     this.comparator = region.getCellCompartor();
222     // used by ScanQueryMatcher
223     long timeToPurgeDeletes =
224         Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
225     LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
226         "ms in store " + this);
227     // Get TTL
228     long ttl = determineTTLFromFamily(family);
229     // Why not just pass a HColumnDescriptor in here altogether?  Even if have
230     // to clone it?
231     scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
232     String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
233     if (family.isInMemoryCompaction()) {
234       className = CompactingMemStore.class.getName();
235       this.memstore = new CompactingMemStore(conf, this.comparator, this,
236           this.getHRegion().getRegionServicesForStores());
237     } else {
238       this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
239           Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator });
240     }
241     LOG.info("Memstore class name is " + className);
242     this.offPeakHours = OffPeakHours.getInstance(conf);
243
244     // Setting up cache configuration for this family
245     createCacheConf(family);
246
247     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
248
249     this.blockingFileCount =
250         conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
251     this.compactionCheckMultiplier = conf.getInt(
252         COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
253     if (this.compactionCheckMultiplier <= 0) {
254       LOG.error("Compaction check period multiplier must be positive, setting default: "
255           + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
256       this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
257     }
258
259     if (HStore.closeCheckInterval == 0) {
260       HStore.closeCheckInterval = conf.getInt(
261           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
262     }
263
264     this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
265     this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
266
267     // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
268     this.checksumType = getChecksumType(conf);
269     // initilize bytes per checksum
270     this.bytesPerChecksum = getBytesPerChecksum(conf);
271     flushRetriesNumber = conf.getInt(
272         "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
273     pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
274     if (flushRetriesNumber <= 0) {
275       throw new IllegalArgumentException(
276           "hbase.hstore.flush.retries.number must be > 0, not "
277               + flushRetriesNumber);
278     }
279     cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
280   }
281
282   /**
283    * Creates the cache config.
284    * @param family The current column family.
285    */
286   protected void createCacheConf(final HColumnDescriptor family) {
287     this.cacheConf = new CacheConfig(conf, family);
288   }
289
290   /**
291    * Creates the store engine configured for the given Store.
292    * @param store The store. An unfortunate dependency needed due to it
293    *              being passed to coprocessors via the compactor.
294    * @param conf Store configuration.
295    * @param kvComparator KVComparator for storeFileManager.
296    * @return StoreEngine to use.
297    */
298   protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
299       CellComparator kvComparator) throws IOException {
300     return StoreEngine.create(store, conf, comparator);
301   }
302
303   /**
304    * @param family
305    * @return TTL in seconds of the specified family
306    */
307   public static long determineTTLFromFamily(final HColumnDescriptor family) {
308     // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
309     long ttl = family.getTimeToLive();
310     if (ttl == HConstants.FOREVER) {
311       // Default is unlimited ttl.
312       ttl = Long.MAX_VALUE;
313     } else if (ttl == -1) {
314       ttl = Long.MAX_VALUE;
315     } else {
316       // Second -> ms adjust for user data
317       ttl *= 1000;
318     }
319     return ttl;
320   }
321
322   @Override
323   public String getColumnFamilyName() {
324     return this.family.getNameAsString();
325   }
326
327   @Override
328   public TableName getTableName() {
329     return this.getRegionInfo().getTable();
330   }
331
332   @Override
333   public FileSystem getFileSystem() {
334     return this.fs.getFileSystem();
335   }
336
337   public HRegionFileSystem getRegionFileSystem() {
338     return this.fs;
339   }
340 
341   /* Implementation of StoreConfigInformation */
342   @Override
343   public long getStoreFileTtl() {
344     // TTL only applies if there's no MIN_VERSIONs setting on the column.
345     return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
346   }
347
348   @Override
349   public long getMemstoreFlushSize() {
350     // TODO: Why is this in here?  The flushsize of the region rather than the store?  St.Ack
351     return this.region.memstoreFlushSize;
352   }
353
354   @Override
355   public long getFlushableSize() {
356     return this.memstore.getFlushableSize();
357   }
358
359   @Override
360   public long getSnapshotSize() {
361     return this.memstore.getSnapshotSize();
362   }
363
364   @Override
365   public long getCompactionCheckMultiplier() {
366     return this.compactionCheckMultiplier;
367   }
368
369   @Override
370   public long getBlockingFileCount() {
371     return blockingFileCount;
372   }
373   /* End implementation of StoreConfigInformation */
374
375   /**
376    * Returns the configured bytesPerChecksum value.
377    * @param conf The configuration
378    * @return The bytesPerChecksum that is set in the configuration
379    */
380   public static int getBytesPerChecksum(Configuration conf) {
381     return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
382                        HFile.DEFAULT_BYTES_PER_CHECKSUM);
383   }
384
385   /**
386    * Returns the configured checksum algorithm.
387    * @param conf The configuration
388    * @return The checksum algorithm that is set in the configuration
389    */
390   public static ChecksumType getChecksumType(Configuration conf) {
391     String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
392     if (checksumName == null) {
393       return ChecksumType.getDefaultChecksumType();
394     } else {
395       return ChecksumType.nameToType(checksumName);
396     }
397   }
398 
399   /**
400    * @return how many bytes to write between status checks
401    */
402   public static int getCloseCheckInterval() {
403     return closeCheckInterval;
404   }
405
406   @Override
407   public HColumnDescriptor getFamily() {
408     return this.family;
409   }
410
411   /**
412    * @return The maximum sequence id in all store files. Used for log replay.
413    */
414   @Override
415   public long getMaxSequenceId() {
416     return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
417   }
418
419   @Override
420   public long getMaxMemstoreTS() {
421     return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
422   }
423
424   /**
425    * @param tabledir {@link Path} to where the table is being stored
426    * @param hri {@link HRegionInfo} for the region.
427    * @param family {@link HColumnDescriptor} describing the column family
428    * @return Path to family/Store home directory.
429    */
430   @Deprecated
431   public static Path getStoreHomedir(final Path tabledir,
432       final HRegionInfo hri, final byte[] family) {
433     return getStoreHomedir(tabledir, hri.getEncodedName(), family);
434   }
435
436   /**
437    * @param tabledir {@link Path} to where the table is being stored
438    * @param encodedName Encoded region name.
439    * @param family {@link HColumnDescriptor} describing the column family
440    * @return Path to family/Store home directory.
441    */
442   @Deprecated
443   public static Path getStoreHomedir(final Path tabledir,
444       final String encodedName, final byte[] family) {
445     return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
446   }
447
448   @Override
449   public HFileDataBlockEncoder getDataBlockEncoder() {
450     return dataBlockEncoder;
451   }
452
453   /**
454    * Should be used only in tests.
455    * @param blockEncoder the block delta encoder to use
456    */
457   void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
458     this.dataBlockEncoder = blockEncoder;
459   }
460
461   /**
462    * Creates an unsorted list of StoreFile loaded in parallel
463    * from the given directory.
464    * @throws IOException
465    */
466   private List<StoreFile> loadStoreFiles() throws IOException {
467     Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
468     return openStoreFiles(files);
469   }
470
471   private List<StoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
472     if (files == null || files.size() == 0) {
473       return new ArrayList<StoreFile>();
474     }
475     // initialize the thread pool for opening store files in parallel..
476     ThreadPoolExecutor storeFileOpenerThreadPool =
477       this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
478           this.getColumnFamilyName());
479     CompletionService<StoreFile> completionService =
480       new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
481
482     int totalValidStoreFile = 0;
483     for (final StoreFileInfo storeFileInfo: files) {
484       // open each store file in parallel
485       completionService.submit(new Callable<StoreFile>() {
486         @Override
487         public StoreFile call() throws IOException {
488           StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
489           return storeFile;
490         }
491       });
492       totalValidStoreFile++;
493     }
494
495     ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
496     IOException ioe = null;
497     try {
498       for (int i = 0; i < totalValidStoreFile; i++) {
499         try {
500           Future<StoreFile> future = completionService.take();
501           StoreFile storeFile = future.get();
502           if (storeFile != null) {
503             long length = storeFile.getReader().length();
504             this.storeSize += length;
505             this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
506             if (LOG.isDebugEnabled()) {
507               LOG.debug("loaded " + storeFile.toStringDetailed());
508             }
509             results.add(storeFile);
510           }
511         } catch (InterruptedException e) {
512           if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
513         } catch (ExecutionException e) {
514           if (ioe == null) ioe = new IOException(e.getCause());
515         }
516       }
517     } finally {
518       storeFileOpenerThreadPool.shutdownNow();
519     }
520     if (ioe != null) {
521       // close StoreFile readers
522       boolean evictOnClose =
523           cacheConf != null? cacheConf.shouldEvictOnClose(): true;
524       for (StoreFile file : results) {
525         try {
526           if (file != null) file.closeReader(evictOnClose);
527         } catch (IOException e) {
528           LOG.warn(e.getMessage());
529         }
530       }
531       throw ioe;
532     }
533
534     return results;
535   }
536
537   /**
538    * Checks the underlying store files, and opens the files that  have not
539    * been opened, and removes the store file readers for store files no longer
540    * available. Mainly used by secondary region replicas to keep up to date with
541    * the primary region files.
542    * @throws IOException
543    */
544   @Override
545   public void refreshStoreFiles() throws IOException {
546     Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
547     refreshStoreFilesInternal(newFiles);
548   }
549
550   @Override
551   public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
552     List<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(newFiles.size());
553     for (String file : newFiles) {
554       storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file));
555     }
556     refreshStoreFilesInternal(storeFiles);
557   }
558
559   /**
560    * Checks the underlying store files, and opens the files that  have not
561    * been opened, and removes the store file readers for store files no longer
562    * available. Mainly used by secondary region replicas to keep up to date with
563    * the primary region files.
564    * @throws IOException
565    */
566   private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
567     StoreFileManager sfm = storeEngine.getStoreFileManager();
568     Collection<StoreFile> currentFiles = sfm.getStorefiles();
569     if (currentFiles == null) currentFiles = new ArrayList<StoreFile>(0);
570
571     if (newFiles == null) newFiles = new ArrayList<StoreFileInfo>(0);
572 
573     HashMap<StoreFileInfo, StoreFile> currentFilesSet =
574         new HashMap<StoreFileInfo, StoreFile>(currentFiles.size());
575     for (StoreFile sf : currentFiles) {
576       currentFilesSet.put(sf.getFileInfo(), sf);
577     }
578     HashSet<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
579 
580     Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
581     Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
582 
583     if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
584       return;
585     }
586
587     LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString()
588       + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
589
590     Set<StoreFile> toBeRemovedStoreFiles = new HashSet<StoreFile>(toBeRemovedFiles.size());
591     for (StoreFileInfo sfi : toBeRemovedFiles) {
592       toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
593     }
594
595     // try to open the files
596     List<StoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
597
598     // propogate the file changes to the underlying store file manager
599     replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception
600 
601     // Advance the memstore read point to be at least the new store files seqIds so that
602     // readers might pick it up. This assumes that the store is not getting any writes (otherwise
603     // in-flight transactions might be made visible)
604     if (!toBeAddedFiles.isEmpty()) {
605       region.getMVCC().advanceTo(this.getMaxSequenceId());
606     }
607
608     completeCompaction(toBeRemovedStoreFiles);
609   }
610
611   private StoreFile createStoreFileAndReader(final Path p) throws IOException {
612     StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
613     return createStoreFileAndReader(info);
614   }
615
616   private StoreFile createStoreFileAndReader(final StoreFileInfo info)
617       throws IOException {
618     info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
619     StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
620       this.family.getBloomFilterType());
621     StoreFileReader r = storeFile.createReader();
622     r.setReplicaStoreFile(isPrimaryReplicaStore());
623     return storeFile;
624   }
625
626   @Override
627   public long add(final Cell cell) {
628     lock.readLock().lock();
629     try {
630        return this.memstore.add(cell);
631     } finally {
632       lock.readLock().unlock();
633     }
634   }
635
636   @Override
637   public long timeOfOldestEdit() {
638     return memstore.timeOfOldestEdit();
639   }
640
641   /**
642    * Adds a value to the memstore
643    *
644    * @param kv
645    * @return memstore size delta
646    */
647   protected long delete(final KeyValue kv) {
648     lock.readLock().lock();
649     try {
650       return this.memstore.delete(kv);
651     } finally {
652       lock.readLock().unlock();
653     }
654   }
655
656   /**
657    * @return All store files.
658    */
659   @Override
660   public Collection<StoreFile> getStorefiles() {
661     return this.storeEngine.getStoreFileManager().getStorefiles();
662   }
663
664   @Override
665   public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
666     HFile.Reader reader  = null;
667     try {
668       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
669           + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
670       reader = HFile.createReader(srcPath.getFileSystem(conf),
671           srcPath, cacheConf, conf);
672       reader.loadFileInfo();
673
674       byte[] firstKey = reader.getFirstRowKey();
675       Preconditions.checkState(firstKey != null, "First key can not be null");
676       Cell lk = reader.getLastKey();
677       Preconditions.checkState(lk != null, "Last key can not be null");
678       byte[] lastKey =  CellUtil.cloneRow(lk);
679
680       LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
681           " last=" + Bytes.toStringBinary(lastKey));
682       LOG.debug("Region bounds: first=" +
683           Bytes.toStringBinary(getRegionInfo().getStartKey()) +
684           " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
685
686       if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
687         throw new WrongRegionException(
688             "Bulk load file " + srcPath.toString() + " does not fit inside region "
689             + this.getRegionInfo().getRegionNameAsString());
690       }
691
692       if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
693           HConstants.DEFAULT_MAX_FILE_SIZE)) {
694         LOG.warn("Trying to bulk load hfile " + srcPath.toString() + " with size: " +
695             reader.length() + " bytes can be problematic as it may lead to oversplitting.");
696       }
697
698       if (verifyBulkLoads) {
699         long verificationStartTime = EnvironmentEdgeManager.currentTime();
700         LOG.info("Full verification started for bulk load hfile: " + srcPath.toString());
701         Cell prevCell = null;
702         HFileScanner scanner = reader.getScanner(false, false, false);
703         scanner.seekTo();
704         do {
705           Cell cell = scanner.getCell();
706           if (prevCell != null) {
707             if (comparator.compareRows(prevCell, cell) > 0) {
708               throw new InvalidHFileException("Previous row is greater than"
709                   + " current row: path=" + srcPath + " previous="
710                   + CellUtil.getCellKeyAsString(prevCell) + " current="
711                   + CellUtil.getCellKeyAsString(cell));
712             }
713             if (CellComparator.compareFamilies(prevCell, cell) != 0) {
714               throw new InvalidHFileException("Previous key had different"
715                   + " family compared to current key: path=" + srcPath
716                   + " previous="
717                   + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
718                       prevCell.getFamilyLength())
719                   + " current="
720                   + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
721                       cell.getFamilyLength()));
722             }
723           }
724           prevCell = cell;
725         } while (scanner.next());
726       LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString()
727          + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime)
728          + " ms");
729       }
730     } finally {
731       if (reader != null) reader.close();
732     }
733   }
734
735   @Override
736   public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
737     Path srcPath = new Path(srcPathStr);
738     Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
739
740     LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
741         + dstPath + " - updating store file list.");
742
743     StoreFile sf = createStoreFileAndReader(dstPath);
744     bulkLoadHFile(sf);
745
746     LOG.info("Successfully loaded store file " + srcPath + " into store " + this
747         + " (new location: " + dstPath + ")");
748
749     return dstPath;
750   }
751
752   @Override
753   public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
754     StoreFile sf = createStoreFileAndReader(fileInfo);
755     bulkLoadHFile(sf);
756   }
757
758   private void bulkLoadHFile(StoreFile sf) throws IOException {
759     StoreFileReader r = sf.getReader();
760     this.storeSize += r.length();
761     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
762
763     // Append the new storefile into the list
764     this.lock.writeLock().lock();
765     try {
766       this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
767     } finally {
768       // We need the lock, as long as we are updating the storeFiles
769       // or changing the memstore. Let us release it before calling
770       // notifyChangeReadersObservers. See HBASE-4485 for a possible
771       // deadlock scenario that could have happened if continue to hold
772       // the lock.
773       this.lock.writeLock().unlock();
774     }
775     LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
776     if (LOG.isTraceEnabled()) {
777       String traceMessage = "BULK LOAD time,size,store size,store files ["
778           + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
779           + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
780       LOG.trace(traceMessage);
781     }
782   }
783
784   @Override
785   public ImmutableCollection<StoreFile> close() throws IOException {
786     this.lock.writeLock().lock();
787     try {
788       // Clear so metrics doesn't find them.
789       ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
790       Collection<StoreFile> compactedfiles =
791           storeEngine.getStoreFileManager().clearCompactedFiles();
792       // clear the compacted files
793       if (compactedfiles != null && !compactedfiles.isEmpty()) {
794         removeCompactedfiles(compactedfiles);
795       }
796       if (!result.isEmpty()) {
797         // initialize the thread pool for closing store files in parallel.
798         ThreadPoolExecutor storeFileCloserThreadPool = this.region
799             .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
800                 + this.getColumnFamilyName());
801
802         // close each store file in parallel
803         CompletionService<Void> completionService =
804           new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
805         for (final StoreFile f : result) {
806           completionService.submit(new Callable<Void>() {
807             @Override
808             public Void call() throws IOException {
809               boolean evictOnClose =
810                   cacheConf != null? cacheConf.shouldEvictOnClose(): true;
811               f.closeReader(evictOnClose);
812               return null;
813             }
814           });
815         }
816
817         IOException ioe = null;
818         try {
819           for (int i = 0; i < result.size(); i++) {
820             try {
821               Future<Void> future = completionService.take();
822               future.get();
823             } catch (InterruptedException e) {
824               if (ioe == null) {
825                 ioe = new InterruptedIOException();
826                 ioe.initCause(e);
827               }
828             } catch (ExecutionException e) {
829               if (ioe == null) ioe = new IOException(e.getCause());
830             }
831           }
832         } finally {
833           storeFileCloserThreadPool.shutdownNow();
834         }
835         if (ioe != null) throw ioe;
836       }
837       LOG.info("Closed " + this);
838       return result;
839     } finally {
840       this.lock.writeLock().unlock();
841     }
842   }
843
844   /**
845    * Snapshot this stores memstore. Call before running
846    * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController)}
847    *  so it has some work to do.
848    */
849   void snapshot() {
850     this.lock.writeLock().lock();
851     try {
852       this.memstore.snapshot();
853     } finally {
854       this.lock.writeLock().unlock();
855     }
856   }
857
858   /**
859    * Write out current snapshot. Presumes {@link #snapshot()} has been called previously.
860    * @param logCacheFlushId flush sequence number
861    * @param snapshot
862    * @param status
863    * @param throughputController
864    * @return The path name of the tmp file to which the store was flushed
865    * @throws IOException if exception occurs during process
866    */
867   protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
868       MonitoredTask status, ThroughputController throughputController) throws IOException {
869     // If an exception happens flushing, we let it out without clearing
870     // the memstore snapshot.  The old snapshot will be returned when we say
871     // 'snapshot', the next time flush comes around.
872     // Retry after catching exception when flushing, otherwise server will abort
873     // itself
874     StoreFlusher flusher = storeEngine.getStoreFlusher();
875     IOException lastException = null;
876     for (int i = 0; i < flushRetriesNumber; i++) {
877       try {
878         List<Path> pathNames =
879             flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController);
880         Path lastPathName = null;
881         try {
882           for (Path pathName : pathNames) {
883             lastPathName = pathName;
884             validateStoreFile(pathName);
885           }
886           return pathNames;
887         } catch (Exception e) {
888           LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
889           if (e instanceof IOException) {
890             lastException = (IOException) e;
891           } else {
892             lastException = new IOException(e);
893           }
894         }
895       } catch (IOException e) {
896         LOG.warn("Failed flushing store file, retrying num=" + i, e);
897         lastException = e;
898       }
899       if (lastException != null && i < (flushRetriesNumber - 1)) {
900         try {
901           Thread.sleep(pauseTime);
902         } catch (InterruptedException e) {
903           IOException iie = new InterruptedIOException();
904           iie.initCause(e);
905           throw iie;
906         }
907       }
908     }
909     throw lastException;
910   }
911
912   /*
913    * @param path The pathname of the tmp file into which the store was flushed
914    * @param logCacheFlushId
915    * @param status
916    * @return StoreFile created.
917    * @throws IOException
918    */
919   private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
920       throws IOException {
921     // Write-out finished successfully, move into the right spot
922     Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
923 
924     status.setStatus("Flushing " + this + ": reopening flushed file");
925     StoreFile sf = createStoreFileAndReader(dstPath);
926
927     StoreFileReader r = sf.getReader();
928     this.storeSize += r.length();
929     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
930
931     if (LOG.isInfoEnabled()) {
932       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
933         ", sequenceid=" + logCacheFlushId +
934         ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
935     }
936     return sf;
937   }
938
939   @Override
940   public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
941                                             boolean isCompaction, boolean includeMVCCReadpoint,
942                                             boolean includesTag)
943       throws IOException {
944     return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
945         includesTag, false);
946   }
947
948   /*
949    * @param maxKeyCount
950    * @param compression Compression algorithm to use
951    * @param isCompaction whether we are creating a new file in a compaction
952    * @param includesMVCCReadPoint - whether to include MVCC or not
953    * @param includesTag - includesTag or not
954    * @return Writer for a new StoreFile in the tmp dir.
955    */
956   @Override
957   public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
958       boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
959       boolean shouldDropBehind)
960   throws IOException {
961     return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
962         includesTag, shouldDropBehind, null);
963   }
964
965   /*
966    * @param maxKeyCount
967    * @param compression Compression algorithm to use
968    * @param isCompaction whether we are creating a new file in a compaction
969    * @param includesMVCCReadPoint - whether to include MVCC or not
970    * @param includesTag - includesTag or not
971    * @return Writer for a new StoreFile in the tmp dir.
972    */
973   @Override
974   public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
975       boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
976       boolean shouldDropBehind, final TimeRangeTracker trt)
977   throws IOException {
978     final CacheConfig writerCacheConf;
979     if (isCompaction) {
980       // Don't cache data on write on compactions.
981       writerCacheConf = new CacheConfig(cacheConf);
982       writerCacheConf.setCacheDataOnWrite(false);
983     } else {
984       writerCacheConf = cacheConf;
985     }
986     InetSocketAddress[] favoredNodes = null;
987     if (region.getRegionServerServices() != null) {
988       favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
989           region.getRegionInfo().getEncodedName());
990     }
991     HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
992       cryptoContext);
993     StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf,
994         this.getFileSystem())
995             .withFilePath(fs.createTempName())
996             .withComparator(comparator)
997             .withBloomType(family.getBloomFilterType())
998             .withMaxKeyCount(maxKeyCount)
999             .withFavoredNodes(favoredNodes)
1000             .withFileContext(hFileContext)
1001             .withShouldDropCacheBehind(shouldDropBehind);
1002     if (trt != null) {
1003       builder.withTimeRangeTracker(trt);
1004     }
1005     return builder.build();
1006   }
1007
1008   private HFileContext createFileContext(Compression.Algorithm compression,
1009       boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
1010     if (compression == null) {
1011       compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
1012     }
1013     HFileContext hFileContext = new HFileContextBuilder()
1014                                 .withIncludesMvcc(includeMVCCReadpoint)
1015                                 .withIncludesTags(includesTag)
1016                                 .withCompression(compression)
1017                                 .withCompressTags(family.isCompressTags())
1018                                 .withChecksumType(checksumType)
1019                                 .withBytesPerCheckSum(bytesPerChecksum)
1020                                 .withBlockSize(blocksize)
1021                                 .withHBaseCheckSum(true)
1022                                 .withDataBlockEncoding(family.getDataBlockEncoding())
1023                                 .withEncryptionContext(cryptoContext)
1024                                 .withCreateTime(EnvironmentEdgeManager.currentTime())
1025                                 .build();
1026     return hFileContext;
1027   }
1028
1029
1030   /*
1031    * Change storeFiles adding into place the Reader produced by this new flush.
1032    * @param sfs Store files
1033    * @param snapshotId
1034    * @throws IOException
1035    * @return Whether compaction is required.
1036    */
1037   private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
1038       throws IOException {
1039     this.lock.writeLock().lock();
1040     try {
1041       this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
1042       if (snapshotId > 0) {
1043         this.memstore.clearSnapshot(snapshotId);
1044       }
1045     } finally {
1046       // We need the lock, as long as we are updating the storeFiles
1047       // or changing the memstore. Let us release it before calling
1048       // notifyChangeReadersObservers. See HBASE-4485 for a possible
1049       // deadlock scenario that could have happened if continue to hold
1050       // the lock.
1051       this.lock.writeLock().unlock();
1052     }
1053     // notify to be called here - only in case of flushes
1054     notifyChangedReadersObservers(sfs);
1055     if (LOG.isTraceEnabled()) {
1056       long totalSize = 0;
1057       for (StoreFile sf : sfs) {
1058         totalSize += sf.getReader().length();
1059       }
1060       String traceMessage = "FLUSH time,count,size,store size,store files ["
1061           + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
1062           + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
1063       LOG.trace(traceMessage);
1064     }
1065     return needsCompaction();
1066   }
1067
1068   /*
1069    * Notify all observers that set of Readers has changed.
1070    * @throws IOException
1071    */
1072   private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException {
1073     for (ChangedReadersObserver o : this.changedReaderObservers) {
1074       o.updateReaders(sfs);
1075     }
1076   }
1077
1078   /**
1079    * Get all scanners with no filtering based on TTL (that happens further down
1080    * the line).
1081    * @return all scanners for this store
1082    */
1083   @Override
1084   public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
1085       boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1086       byte[] stopRow, long readPt) throws IOException {
1087     Collection<StoreFile> storeFilesToScan;
1088     List<KeyValueScanner> memStoreScanners;
1089     this.lock.readLock().lock();
1090     try {
1091       storeFilesToScan =
1092           this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
1093       memStoreScanners = this.memstore.getScanners(readPt);
1094     } finally {
1095       this.lock.readLock().unlock();
1096     }
1097
1098     // First the store file scanners
1099
1100     // TODO this used to get the store files in descending order,
1101     // but now we get them in ascending order, which I think is
1102     // actually more correct, since memstore get put at the end.
1103     List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
1104         cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
1105     List<KeyValueScanner> scanners =
1106       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
1107     scanners.addAll(sfScanners);
1108     // Then the memstore scanners
1109     scanners.addAll(memStoreScanners);
1110     return scanners;
1111   }
1112
1113   @Override
1114   public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
1115       boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
1116       byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException {
1117     List<KeyValueScanner> memStoreScanners = null;
1118     if (includeMemstoreScanner) {
1119       this.lock.readLock().lock();
1120       try {
1121         memStoreScanners = this.memstore.getScanners(readPt);
1122       } finally {
1123         this.lock.readLock().unlock();
1124       }
1125     }
1126     List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
1127       cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
1128     List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(sfScanners.size() + 1);
1129     scanners.addAll(sfScanners);
1130     // Then the memstore scanners
1131     if (memStoreScanners != null) {
1132       scanners.addAll(memStoreScanners);
1133     }
1134     return scanners;
1135   }
1136
1137   @Override
1138   public void addChangedReaderObserver(ChangedReadersObserver o) {
1139     this.changedReaderObservers.add(o);
1140   }
1141
1142   @Override
1143   public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1144     // We don't check if observer present; it may not be (legitimately)
1145     this.changedReaderObservers.remove(o);
1146   }
1147
1148   //////////////////////////////////////////////////////////////////////////////
1149   // Compaction
1150   //////////////////////////////////////////////////////////////////////////////
1151
1152   /**
1153    * Compact the StoreFiles.  This method may take some time, so the calling
1154    * thread must be able to block for long periods.
1155    *
1156    * <p>During this time, the Store can work as usual, getting values from
1157    * StoreFiles and writing new StoreFiles from the memstore.
1158    *
1159    * Existing StoreFiles are not destroyed until the new compacted StoreFile is
1160    * completely written-out to disk.
1161    *
1162    * <p>The compactLock prevents multiple simultaneous compactions.
1163    * The structureLock prevents us from interfering with other write operations.
1164    *
1165    * <p>We don't want to hold the structureLock for the whole time, as a compact()
1166    * can be lengthy and we want to allow cache-flushes during this period.
1167    *
1168    * <p> Compaction event should be idempotent, since there is no IO Fencing for
1169    * the region directory in hdfs. A region server might still try to complete the
1170    * compaction after it lost the region. That is why the following events are carefully
1171    * ordered for a compaction:
1172    *  1. Compaction writes new files under region/.tmp directory (compaction output)
1173    *  2. Compaction atomically moves the temporary file under region directory
1174    *  3. Compaction appends a WAL edit containing the compaction input and output files.
1175    *  Forces sync on WAL.
1176    *  4. Compaction deletes the input files from the region directory.
1177    *
1178    * Failure conditions are handled like this:
1179    *  - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes
1180    *  the compaction later, it will only write the new data file to the region directory.
1181    *  Since we already have this data, this will be idempotent but we will have a redundant
1182    *  copy of the data.
1183    *  - If RS fails between 2 and 3, the region will have a redundant copy of the data. The
1184    *  RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL.
1185    *  - If RS fails after 3, the region region server who opens the region will pick up the
1186    *  the compaction marker from the WAL and replay it by removing the compaction input files.
1187    *  Failed RS can also attempt to delete those files, but the operation will be idempotent
1188    *
1189    * See HBASE-2231 for details.
1190    *
1191    * @param compaction compaction details obtained from requestCompaction()
1192    * @throws IOException
1193    * @return Storefile we compacted into or null if we failed or opted out early.
1194    */
1195   @Override
1196   public List<StoreFile> compact(CompactionContext compaction,
1197       ThroughputController throughputController) throws IOException {
1198     return compact(compaction, throughputController, null);
1199   }
1200
1201   @Override
1202   public List<StoreFile> compact(CompactionContext compaction,
1203     ThroughputController throughputController, User user) throws IOException {
1204     assert compaction != null;
1205     List<StoreFile> sfs = null;
1206     CompactionRequest cr = compaction.getRequest();
1207     try {
1208       // Do all sanity checking in here if we have a valid CompactionRequest
1209       // because we need to clean up after it on the way out in a finally
1210       // block below
1211       long compactionStartTime = EnvironmentEdgeManager.currentTime();
1212       assert compaction.hasSelection();
1213       Collection<StoreFile> filesToCompact = cr.getFiles();
1214       assert !filesToCompact.isEmpty();
1215       synchronized (filesCompacting) {
1216         // sanity check: we're compacting files that this store knows about
1217         // TODO: change this to LOG.error() after more debugging
1218         Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1219       }
1220
1221       // Ready to go. Have list of files to compact.
1222       LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1223           + this + " of " + this.getRegionInfo().getRegionNameAsString()
1224           + " into tmpdir=" + fs.getTempDir() + ", totalSize="
1225           + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1226
1227       // Commence the compaction.
1228       List<Path> newFiles = compaction.compact(throughputController, user);
1229
1230       long outputBytes = 0L;
1231       // TODO: get rid of this!
1232       if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1233         LOG.warn("hbase.hstore.compaction.complete is set to false");
1234         sfs = new ArrayList<StoreFile>(newFiles.size());
1235         final boolean evictOnClose =
1236             cacheConf != null? cacheConf.shouldEvictOnClose(): true;
1237         for (Path newFile : newFiles) {
1238           // Create storefile around what we wrote with a reader on it.
1239           StoreFile sf = createStoreFileAndReader(newFile);
1240           sf.closeReader(evictOnClose);
1241           sfs.add(sf);
1242         }
1243         return sfs;
1244       }
1245       // Do the steps necessary to complete the compaction.
1246       sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
1247       writeCompactionWalRecord(filesToCompact, sfs);
1248       replaceStoreFiles(filesToCompact, sfs);
1249       if (cr.isMajor()) {
1250         majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
1251         majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
1252       } else {
1253         compactedCellsCount += getCompactionProgress().totalCompactingKVs;
1254         compactedCellsSize += getCompactionProgress().totalCompactedSize;
1255       }
1256
1257       for (StoreFile sf : sfs) {
1258         outputBytes += sf.getReader().length();
1259       }
1260
1261       // At this point the store will use new files for all new scanners.
1262       completeCompaction(filesToCompact); // update store size.
1263
1264       long now = EnvironmentEdgeManager.currentTime();
1265       if (region.getRegionServerServices() != null
1266           && region.getRegionServerServices().getMetrics() != null) {
1267         region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
1268           now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
1269           outputBytes);
1270       }
1271
1272       logCompactionEndMessage(cr, sfs, now, compactionStartTime);
1273       return sfs;
1274     } finally {
1275       finishCompactionRequest(cr);
1276     }
1277   }
1278
1279   private List<StoreFile> moveCompatedFilesIntoPlace(
1280       final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
1281     List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
1282     for (Path newFile : newFiles) {
1283       assert newFile != null;
1284       final StoreFile sf = moveFileIntoPlace(newFile);
1285       if (this.getCoprocessorHost() != null) {
1286         final Store thisStore = this;
1287         getCoprocessorHost().postCompact(thisStore, sf, cr, user);
1288       }
1289       assert sf != null;
1290       sfs.add(sf);
1291     }
1292     return sfs;
1293   }
1294
1295   // Package-visible for tests
1296   StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1297     validateStoreFile(newFile);
1298     // Move the file into the right spot
1299     Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1300     return createStoreFileAndReader(destPath);
1301   }
1302
1303   /**
1304    * Writes the compaction WAL record.
1305    * @param filesCompacted Files compacted (input).
1306    * @param newFiles Files from compaction.
1307    */
1308   private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1309       Collection<StoreFile> newFiles) throws IOException {
1310     if (region.getWAL() == null) return;
1311     List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
1312     for (StoreFile f : filesCompacted) {
1313       inputPaths.add(f.getPath());
1314     }
1315     List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1316     for (StoreFile f : newFiles) {
1317       outputPaths.add(f.getPath());
1318     }
1319     HRegionInfo info = this.region.getRegionInfo();
1320     CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1321         family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1322     // Fix reaching into Region to get the maxWaitForSeqId.
1323     // Does this method belong in Region altogether given it is making so many references up there?
1324     // Could be Region#writeCompactionMarker(compactionDescriptor);
1325     WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
1326         this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
1327   }
1328
1329   @VisibleForTesting
1330   void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1331       final Collection<StoreFile> result) throws IOException {
1332     this.lock.writeLock().lock();
1333     try {
1334       this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1335       filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock();
1336     } finally {
1337       this.lock.writeLock().unlock();
1338     }
1339   }
1340
1341   /**
1342    * Log a very elaborate compaction completion message.
1343    * @param cr Request.
1344    * @param sfs Resulting files.
1345    * @param compactionStartTime Start time.
1346    */
1347   private void logCompactionEndMessage(
1348       CompactionRequest cr, List<StoreFile> sfs, long now, long compactionStartTime) {
1349     StringBuilder message = new StringBuilder(
1350       "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
1351       + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
1352       + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into ");
1353     if (sfs.isEmpty()) {
1354       message.append("none, ");
1355     } else {
1356       for (StoreFile sf: sfs) {
1357         message.append(sf.getPath().getName());
1358         message.append("(size=");
1359         message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1360         message.append("), ");
1361       }
1362     }
1363     message.append("total size for store is ")
1364       .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize, "", 1))
1365       .append(". This selection was in queue for ")
1366       .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1367       .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1368       .append(" to execute.");
1369     LOG.info(message.toString());
1370     if (LOG.isTraceEnabled()) {
1371       int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1372       long resultSize = 0;
1373       for (StoreFile sf : sfs) {
1374         resultSize += sf.getReader().length();
1375       }
1376       String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1377         + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1378           + cr.getFiles().size() + "," + sfs.size() + "," +  storeSize + "," + fileCount + "]";
1379       LOG.trace(traceMessage);
1380     }
1381   }
1382
1383   /**
1384    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
1385    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
1386    * See HBASE-2231.
1387    * @param compaction
1388    */
1389   @Override
1390   public void replayCompactionMarker(CompactionDescriptor compaction,
1391       boolean pickCompactionFiles, boolean removeFiles)
1392       throws IOException {
1393     LOG.debug("Completing compaction from the WAL marker");
1394     List<String> compactionInputs = compaction.getCompactionInputList();
1395     List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1396
1397     // The Compaction Marker is written after the compaction is completed,
1398     // and the files moved into the region/family folder.
1399     //
1400     // If we crash after the entry is written, we may not have removed the
1401     // input files, but the output file is present.
1402     // (The unremoved input files will be removed by this function)
1403     //
1404     // If we scan the directory and the file is not present, it can mean that:
1405     //   - The file was manually removed by the user
1406     //   - The file was removed as consequence of subsequent compaction
1407     // so, we can't do anything with the "compaction output list" because those
1408     // files have already been loaded when opening the region (by virtue of
1409     // being in the store's folder) or they may be missing due to a compaction.
1410
1411     String familyName = this.getColumnFamilyName();
1412     List<String> inputFiles = new ArrayList<String>(compactionInputs.size());
1413     for (String compactionInput : compactionInputs) {
1414       Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
1415       inputFiles.add(inputPath.getName());
1416     }
1417
1418     //some of the input files might already be deleted
1419     List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1420     for (StoreFile sf : this.getStorefiles()) {
1421       if (inputFiles.contains(sf.getPath().getName())) {
1422         inputStoreFiles.add(sf);
1423       }
1424     }
1425
1426     // check whether we need to pick up the new files
1427     List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
1428
1429     if (pickCompactionFiles) {
1430       for (StoreFile sf : this.getStorefiles()) {
1431         compactionOutputs.remove(sf.getPath().getName());
1432       }
1433       for (String compactionOutput : compactionOutputs) {
1434         StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
1435         StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
1436         outputStoreFiles.add(storeFile);
1437       }
1438     }
1439
1440     if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1441       LOG.info("Replaying compaction marker, replacing input files: " +
1442           inputStoreFiles + " with output files : " + outputStoreFiles);
1443       this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1444       this.completeCompaction(inputStoreFiles);
1445     }
1446   }
1447
1448   /**
1449    * This method tries to compact N recent files for testing.
1450    * Note that because compacting "recent" files only makes sense for some policies,
1451    * e.g. the default one, it assumes default policy is used. It doesn't use policy,
1452    * but instead makes a compaction candidate list by itself.
1453    * @param N Number of files.
1454    */
1455   public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1456     List<StoreFile> filesToCompact;
1457     boolean isMajor;
1458
1459     this.lock.readLock().lock();
1460     try {
1461       synchronized (filesCompacting) {
1462         filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1463         if (!filesCompacting.isEmpty()) {
1464           // exclude all files older than the newest file we're currently
1465           // compacting. this allows us to preserve contiguity (HBASE-2856)
1466           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1467           int idx = filesToCompact.indexOf(last);
1468           Preconditions.checkArgument(idx != -1);
1469           filesToCompact.subList(0, idx + 1).clear();
1470         }
1471         int count = filesToCompact.size();
1472         if (N > count) {
1473           throw new RuntimeException("Not enough files");
1474         }
1475
1476         filesToCompact = filesToCompact.subList(count - N, count);
1477         isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1478         filesCompacting.addAll(filesToCompact);
1479         Collections.sort(filesCompacting, storeEngine.getStoreFileManager()
1480             .getStoreFileComparator());
1481       }
1482     } finally {
1483       this.lock.readLock().unlock();
1484     }
1485
1486     try {
1487       // Ready to go. Have list of files to compact.
1488       List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1489           .compactForTesting(filesToCompact, isMajor);
1490       for (Path newFile: newFiles) {
1491         // Move the compaction into place.
1492         StoreFile sf = moveFileIntoPlace(newFile);
1493         if (this.getCoprocessorHost() != null) {
1494           this.getCoprocessorHost().postCompact(this, sf, null, null);
1495         }
1496         replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1497         completeCompaction(filesToCompact);
1498       }
1499     } finally {
1500       synchronized (filesCompacting) {
1501         filesCompacting.removeAll(filesToCompact);
1502       }
1503     }
1504   }
1505
1506   @Override
1507   public boolean hasReferences() {
1508     return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1509   }
1510
1511   @Override
1512   public CompactionProgress getCompactionProgress() {
1513     return this.storeEngine.getCompactor().getProgress();
1514   }
1515
1516   @Override
1517   public boolean isMajorCompaction() throws IOException {
1518     for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1519       // TODO: what are these reader checks all over the place?
1520       if (sf.getReader() == null) {
1521         LOG.debug("StoreFile " + sf + " has null Reader");
1522         return false;
1523       }
1524     }
1525     return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction(
1526         this.storeEngine.getStoreFileManager().getStorefiles());
1527   }
1528
1529   @Override
1530   public CompactionContext requestCompaction() throws IOException {
1531     return requestCompaction(Store.NO_PRIORITY, null);
1532   }
1533
1534   @Override
1535   public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1536       throws IOException {
1537     return requestCompaction(priority, baseRequest, null);
1538   }
1539   @Override
1540   public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
1541       User user) throws IOException {
1542     // don't even select for compaction if writes are disabled
1543     if (!this.areWritesEnabled()) {
1544       return null;
1545     }
1546
1547     // Before we do compaction, try to get rid of unneeded files to simplify things.
1548     removeUnneededFiles();
1549
1550     final CompactionContext compaction = storeEngine.createCompaction();
1551     CompactionRequest request = null;
1552     this.lock.readLock().lock();
1553     try {
1554       synchronized (filesCompacting) {
1555         // First, see if coprocessor would want to override selection.
1556         if (this.getCoprocessorHost() != null) {
1557           final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1558           boolean override = false;
1559           override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
1560               baseRequest, user);
1561           if (override) {
1562             // Coprocessor is overriding normal file selection.
1563             compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1564           }
1565         }
1566
1567         // Normal case - coprocessor is not overriding file selection.
1568         if (!compaction.hasSelection()) {
1569           boolean isUserCompaction = priority == Store.PRIORITY_USER;
1570           boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1571               offPeakCompactionTracker.compareAndSet(false, true);
1572           try {
1573             compaction.select(this.filesCompacting, isUserCompaction,
1574               mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1575           } catch (IOException e) {
1576             if (mayUseOffPeak) {
1577               offPeakCompactionTracker.set(false);
1578             }
1579             throw e;
1580           }
1581           assert compaction.hasSelection();
1582           if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1583             // Compaction policy doesn't want to take advantage of off-peak.
1584             offPeakCompactionTracker.set(false);
1585           }
1586         }
1587         if (this.getCoprocessorHost() != null) {
1588           this.getCoprocessorHost().postCompactSelection(
1589               this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest, user);
1590         }
1591
1592         // Selected files; see if we have a compaction with some custom base request.
1593         if (baseRequest != null) {
1594           // Update the request with what the system thinks the request should be;
1595           // its up to the request if it wants to listen.
1596           compaction.forceSelect(
1597               baseRequest.combineWith(compaction.getRequest()));
1598         }
1599         // Finally, we have the resulting files list. Check if we have any files at all.
1600         request = compaction.getRequest();
1601         final Collection<StoreFile> selectedFiles = request.getFiles();
1602         if (selectedFiles.isEmpty()) {
1603           return null;
1604         }
1605
1606         addToCompactingFiles(selectedFiles);
1607
1608         // If we're enqueuing a major, clear the force flag.
1609         this.forceMajor = this.forceMajor && !request.isMajor();
1610
1611         // Set common request properties.
1612         // Set priority, either override value supplied by caller or from store.
1613         request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1614         request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1615       }
1616     } finally {
1617       this.lock.readLock().unlock();
1618     }
1619
1620     LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
1621         + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1622         + (request.isAllFiles() ? " (all files)" : ""));
1623     this.region.reportCompactionRequestStart(request.isMajor());
1624     return compaction;
1625   }
1626
1627   /** Adds the files to compacting files. filesCompacting must be locked. */
1628   private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
1629     if (filesToAdd == null) return;
1630     // Check that we do not try to compact the same StoreFile twice.
1631     if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1632       Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1633     }
1634     filesCompacting.addAll(filesToAdd);
1635     Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
1636   }
1637
1638   private void removeUnneededFiles() throws IOException {
1639     if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
1640     if (getFamily().getMinVersions() > 0) {
1641       LOG.debug("Skipping expired store file removal due to min version being " +
1642           getFamily().getMinVersions());
1643       return;
1644     }
1645     this.lock.readLock().lock();
1646     Collection<StoreFile> delSfs = null;
1647     try {
1648       synchronized (filesCompacting) {
1649         long cfTtl = getStoreFileTtl();
1650         if (cfTtl != Long.MAX_VALUE) {
1651           delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
1652               EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1653           addToCompactingFiles(delSfs);
1654         }
1655       }
1656     } finally {
1657       this.lock.readLock().unlock();
1658     }
1659     if (delSfs == null || delSfs.isEmpty()) return;
1660
1661     Collection<StoreFile> newFiles = new ArrayList<StoreFile>(); // No new files.
1662     writeCompactionWalRecord(delSfs, newFiles);
1663     replaceStoreFiles(delSfs, newFiles);
1664     completeCompaction(delSfs);
1665     LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
1666         + this + " of " + this.getRegionInfo().getRegionNameAsString()
1667         + "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1));
1668   }
1669
1670   @Override
1671   public void cancelRequestedCompaction(CompactionContext compaction) {
1672     finishCompactionRequest(compaction.getRequest());
1673   }
1674
1675   private void finishCompactionRequest(CompactionRequest cr) {
1676     this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1677     if (cr.isOffPeak()) {
1678       offPeakCompactionTracker.set(false);
1679       cr.setOffPeak(false);
1680     }
1681     synchronized (filesCompacting) {
1682       filesCompacting.removeAll(cr.getFiles());
1683     }
1684   }
1685
1686   /**
1687    * Validates a store file by opening and closing it. In HFileV2 this should
1688    * not be an expensive operation.
1689    *
1690    * @param path the path to the store file
1691    */
1692   private void validateStoreFile(Path path)
1693       throws IOException {
1694     StoreFile storeFile = null;
1695     try {
1696       storeFile = createStoreFileAndReader(path);
1697     } catch (IOException e) {
1698       LOG.error("Failed to open store file : " + path
1699           + ", keeping it in tmp location", e);
1700       throw e;
1701     } finally {
1702       if (storeFile != null) {
1703         storeFile.closeReader(false);
1704       }
1705     }
1706   }
1707
1708   /**
1709    * <p>It works by processing a compaction that's been written to disk.
1710    *
1711    * <p>It is usually invoked at the end of a compaction, but might also be
1712    * invoked at HStore startup, if the prior execution died midway through.
1713    *
1714    * <p>Moving the compacted TreeMap into place means:
1715    * <pre>
1716    * 1) Unload all replaced StoreFile, close and collect list to delete.
1717    * 2) Compute new store size
1718    * </pre>
1719    *
1720    * @param compactedFiles list of files that were compacted
1721    */
1722   @VisibleForTesting
1723   protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1724     throws IOException {
1725     LOG.debug("Completing compaction...");
1726     this.storeSize = 0L;
1727     this.totalUncompressedBytes = 0L;
1728     for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1729       StoreFileReader r = hsf.getReader();
1730       if (r == null) {
1731         LOG.warn("StoreFile " + hsf + " has a null Reader");
1732         continue;
1733       }
1734       this.storeSize += r.length();
1735       this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1736     }
1737   }
1738
1739   /*
1740    * @param wantedVersions How many versions were asked for.
1741    * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1742    */
1743   int versionsToReturn(final int wantedVersions) {
1744     if (wantedVersions <= 0) {
1745       throw new IllegalArgumentException("Number of versions must be > 0");
1746     }
1747     // Make sure we do not return more than maximum versions for this store.
1748     int maxVersions = this.family.getMaxVersions();
1749     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1750   }
1751
1752   /**
1753    * @param cell
1754    * @param oldestTimestamp
1755    * @return true if the cell is expired
1756    */
1757   static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
1758     // Look for a TTL tag first. Use it instead of the family setting if
1759     // found. If a cell has multiple TTLs, resolve the conflict by using the
1760     // first tag encountered.
1761     Iterator<Tag> i = CellUtil.tagsIterator(cell);
1762     while (i.hasNext()) {
1763       Tag t = i.next();
1764       if (TagType.TTL_TAG_TYPE == t.getType()) {
1765         // Unlike in schema cell TTLs are stored in milliseconds, no need
1766         // to convert
1767         long ts = cell.getTimestamp();
1768         assert t.getValueLength() == Bytes.SIZEOF_LONG;
1769         long ttl = TagUtil.getValueAsLong(t);
1770         if (ts + ttl < now) {
1771           return true;
1772         }
1773         // Per cell TTLs cannot extend lifetime beyond family settings, so
1774         // fall through to check that
1775         break;
1776       }
1777     }
1778     return false;
1779   }
1780
1781   @Override
1782   public boolean canSplit() {
1783     this.lock.readLock().lock();
1784     try {
1785       // Not split-able if we find a reference store file present in the store.
1786       boolean result = !hasReferences();
1787       if (!result) {
1788           if (LOG.isTraceEnabled()) {
1789             LOG.trace("Not splittable; has references: " + this);
1790           }
1791       }
1792       return result;
1793     } finally {
1794       this.lock.readLock().unlock();
1795     }
1796   }
1797
1798   @Override
1799   public byte[] getSplitPoint() {
1800     this.lock.readLock().lock();
1801     try {
1802       // Should already be enforced by the split policy!
1803       assert !this.getRegionInfo().isMetaRegion();
1804       // Not split-able if we find a reference store file present in the store.
1805       if (hasReferences()) {
1806         if (LOG.isTraceEnabled()) {
1807           LOG.trace("Not splittable; has references: " + this);
1808         }
1809         return null;
1810       }
1811       return this.storeEngine.getStoreFileManager().getSplitPoint();
1812     } catch(IOException e) {
1813       LOG.warn("Failed getting store size for " + this, e);
1814     } finally {
1815       this.lock.readLock().unlock();
1816     }
1817     return null;
1818   }
1819
1820   @Override
1821   public long getLastCompactSize() {
1822     return this.lastCompactSize;
1823   }
1824
1825   @Override
1826   public long getSize() {
1827     return storeSize;
1828   }
1829
1830   @Override
1831   public void triggerMajorCompaction() {
1832     this.forceMajor = true;
1833   }
1834
1835
1836   //////////////////////////////////////////////////////////////////////////////
1837   // File administration
1838   //////////////////////////////////////////////////////////////////////////////
1839
1840   @Override
1841   public KeyValueScanner getScanner(Scan scan,
1842       final NavigableSet<byte []> targetCols, long readPt) throws IOException {
1843     lock.readLock().lock();
1844     try {
1845       KeyValueScanner scanner = null;
1846       if (this.getCoprocessorHost() != null) {
1847         scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols, readPt);
1848       }
1849       scanner = createScanner(scan, targetCols, readPt, scanner);
1850       return scanner;
1851     } finally {
1852       lock.readLock().unlock();
1853     }
1854   }
1855
1856   protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
1857       long readPt, KeyValueScanner scanner) throws IOException {
1858     if (scanner == null) {
1859       scanner = scan.isReversed() ? new ReversedStoreScanner(this,
1860           getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
1861           getScanInfo(), scan, targetCols, readPt);
1862     }
1863     return scanner;
1864   }
1865
1866   @Override
1867   public String toString() {
1868     return this.getColumnFamilyName();
1869   }
1870
1871   @Override
1872   public int getStorefilesCount() {
1873     return this.storeEngine.getStoreFileManager().getStorefileCount();
1874   }
1875
1876   @Override
1877   public long getMaxStoreFileAge() {
1878     long earliestTS = Long.MAX_VALUE;
1879     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1880       StoreFileReader r = s.getReader();
1881       if (r == null) {
1882         LOG.warn("StoreFile " + s + " has a null Reader");
1883         continue;
1884       }
1885       if (!s.isHFile()) {
1886         continue;
1887       }
1888       long createdTS = s.getFileInfo().getCreatedTimestamp();
1889       earliestTS = (createdTS < earliestTS) ? createdTS : earliestTS;
1890     }
1891     long now = EnvironmentEdgeManager.currentTime();
1892     return now - earliestTS;
1893   }
1894
1895   @Override
1896   public long getMinStoreFileAge() {
1897     long latestTS = 0;
1898     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1899       StoreFileReader r = s.getReader();
1900       if (r == null) {
1901         LOG.warn("StoreFile " + s + " has a null Reader");
1902         continue;
1903       }
1904       if (!s.isHFile()) {
1905         continue;
1906       }
1907       long createdTS = s.getFileInfo().getCreatedTimestamp();
1908       latestTS = (createdTS > latestTS) ? createdTS : latestTS;
1909     }
1910     long now = EnvironmentEdgeManager.currentTime();
1911     return now - latestTS;
1912   }
1913
1914   @Override
1915   public long getAvgStoreFileAge() {
1916     long sum = 0, count = 0;
1917     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1918       StoreFileReader r = s.getReader();
1919       if (r == null) {
1920         LOG.warn("StoreFile " + s + " has a null Reader");
1921         continue;
1922       }
1923       if (!s.isHFile()) {
1924         continue;
1925       }
1926       sum += s.getFileInfo().getCreatedTimestamp();
1927       count++;
1928     }
1929     if (count == 0) {
1930       return 0;
1931     }
1932     long avgTS = sum / count;
1933     long now = EnvironmentEdgeManager.currentTime();
1934     return now - avgTS;
1935   }
1936
1937   @Override
1938   public long getNumReferenceFiles() {
1939     long numRefFiles = 0;
1940     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1941       if (s.isReference()) {
1942         numRefFiles++;
1943       }
1944     }
1945     return numRefFiles;
1946   }
1947
1948   @Override
1949   public long getNumHFiles() {
1950     long numHFiles = 0;
1951     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1952       if (s.isHFile()) {
1953         numHFiles++;
1954       }
1955     }
1956     return numHFiles;
1957   }
1958
1959   @Override
1960   public long getStoreSizeUncompressed() {
1961     return this.totalUncompressedBytes;
1962   }
1963
1964   @Override
1965   public long getStorefilesSize() {
1966     long size = 0;
1967     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1968       StoreFileReader r = s.getReader();
1969       if (r == null) {
1970         LOG.warn("StoreFile " + s + " has a null Reader");
1971         continue;
1972       }
1973       size += r.length();
1974     }
1975     return size;
1976   }
1977
1978   @Override
1979   public long getStorefilesIndexSize() {
1980     long size = 0;
1981     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1982       StoreFileReader r = s.getReader();
1983       if (r == null) {
1984         LOG.warn("StoreFile " + s + " has a null Reader");
1985         continue;
1986       }
1987       size += r.indexSize();
1988     }
1989     return size;
1990   }
1991
1992   @Override
1993   public long getTotalStaticIndexSize() {
1994     long size = 0;
1995     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1996       StoreFileReader r = s.getReader();
1997       if (r == null) {
1998         continue;
1999       }
2000       size += r.getUncompressedDataIndexSize();
2001     }
2002     return size;
2003   }
2004
2005   @Override
2006   public long getTotalStaticBloomSize() {
2007     long size = 0;
2008     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2009       StoreFileReader r = s.getReader();
2010       if (r == null) {
2011         continue;
2012       }
2013       size += r.getTotalBloomSize();
2014     }
2015     return size;
2016   }
2017
2018   @Override
2019   public long getMemStoreSize() {
2020     return this.memstore.size();
2021   }
2022
2023   @Override
2024   public int getCompactPriority() {
2025     int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
2026     if (priority == PRIORITY_USER) {
2027       LOG.warn("Compaction priority is USER despite there being no user compaction");
2028     }
2029     return priority;
2030   }
2031
2032   @Override
2033   public boolean throttleCompaction(long compactionSize) {
2034     return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
2035   }
2036
2037   public HRegion getHRegion() {
2038     return this.region;
2039   }
2040
2041   @Override
2042   public RegionCoprocessorHost getCoprocessorHost() {
2043     return this.region.getCoprocessorHost();
2044   }
2045
2046   @Override
2047   public HRegionInfo getRegionInfo() {
2048     return this.fs.getRegionInfo();
2049   }
2050
2051   @Override
2052   public boolean areWritesEnabled() {
2053     return this.region.areWritesEnabled();
2054   }
2055
2056   @Override
2057   public long getSmallestReadPoint() {
2058     return this.region.getSmallestReadPoint();
2059   }
2060
2061   /**
2062    * Updates the value for the given row/family/qualifier. This function will always be seen as
2063    * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
2064    * control necessary.
2065    * @param row row to update
2066    * @param f family to update
2067    * @param qualifier qualifier to update
2068    * @param newValue the new value to set into memstore
2069    * @return memstore size delta
2070    * @throws IOException
2071    */
2072   @VisibleForTesting
2073   public long updateColumnValue(byte [] row, byte [] f,
2074                                 byte [] qualifier, long newValue)
2075       throws IOException {
2076
2077     this.lock.readLock().lock();
2078     try {
2079       long now = EnvironmentEdgeManager.currentTime();
2080
2081       return this.memstore.updateColumnValue(row,
2082           f,
2083           qualifier,
2084           newValue,
2085           now);
2086
2087     } finally {
2088       this.lock.readLock().unlock();
2089     }
2090   }
2091
2092   @Override
2093   public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
2094     this.lock.readLock().lock();
2095     try {
2096       return this.memstore.upsert(cells, readpoint);
2097     } finally {
2098       this.lock.readLock().unlock();
2099     }
2100   }
2101
2102   @Override
2103   public StoreFlushContext createFlushContext(long cacheFlushId) {
2104     return new StoreFlusherImpl(cacheFlushId);
2105   }
2106
2107   private final class StoreFlusherImpl implements StoreFlushContext {
2108
2109     private long cacheFlushSeqNum;
2110     private MemStoreSnapshot snapshot;
2111     private List<Path> tempFiles;
2112     private List<Path> committedFiles;
2113     private long cacheFlushCount;
2114     private long cacheFlushSize;
2115     private long outputFileSize;
2116
2117     private StoreFlusherImpl(long cacheFlushSeqNum) {
2118       this.cacheFlushSeqNum = cacheFlushSeqNum;
2119     }
2120
2121     /**
2122      * This is not thread safe. The caller should have a lock on the region or the store.
2123      * If necessary, the lock can be added with the patch provided in HBASE-10087
2124      */
2125     @Override
2126     public void prepare() {
2127       // passing the current sequence number of the wal - to allow bookkeeping in the memstore
2128       this.snapshot = memstore.snapshot();
2129       this.cacheFlushCount = snapshot.getCellsCount();
2130       this.cacheFlushSize = snapshot.getSize();
2131       committedFiles = new ArrayList<Path>(1);
2132     }
2133
2134     @Override
2135     public void flushCache(MonitoredTask status) throws IOException {
2136       RegionServerServices rsService = region.getRegionServerServices();
2137       ThroughputController throughputController =
2138           rsService == null ? null : rsService.getFlushThroughputController();
2139       tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController);
2140     }
2141
2142     @Override
2143     public boolean commit(MonitoredTask status) throws IOException {
2144       if (this.tempFiles == null || this.tempFiles.isEmpty()) {
2145         return false;
2146       }
2147       List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
2148       for (Path storeFilePath : tempFiles) {
2149         try {
2150           StoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);
2151           outputFileSize += sf.getReader().length();
2152           storeFiles.add(sf);
2153         } catch (IOException ex) {
2154           LOG.error("Failed to commit store file " + storeFilePath, ex);
2155           // Try to delete the files we have committed before.
2156           for (StoreFile sf : storeFiles) {
2157             Path pathToDelete = sf.getPath();
2158             try {
2159               sf.deleteReader();
2160             } catch (IOException deleteEx) {
2161               LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
2162               Runtime.getRuntime().halt(1);
2163             }
2164           }
2165           throw new IOException("Failed to commit the flush", ex);
2166         }
2167       }
2168
2169       for (StoreFile sf : storeFiles) {
2170         if (HStore.this.getCoprocessorHost() != null) {
2171           HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
2172         }
2173         committedFiles.add(sf.getPath());
2174       }
2175
2176       HStore.this.flushedCellsCount += cacheFlushCount;
2177       HStore.this.flushedCellsSize += cacheFlushSize;
2178       HStore.this.flushedOutputFileSize += outputFileSize;
2179
2180       // Add new file to store files.  Clear snapshot too while we have the Store write lock.
2181       return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
2182     }
2183
2184     @Override
2185     public long getOutputFileSize() {
2186       return outputFileSize;
2187     }
2188
2189     @Override
2190     public List<Path> getCommittedFiles() {
2191       return committedFiles;
2192     }
2193
2194     /**
2195      * Similar to commit, but called in secondary region replicas for replaying the
2196      * flush cache from primary region. Adds the new files to the store, and drops the
2197      * snapshot depending on dropMemstoreSnapshot argument.
2198      * @param fileNames names of the flushed files
2199      * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot
2200      * @throws IOException
2201      */
2202     @Override
2203     public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
2204         throws IOException {
2205       List<StoreFile> storeFiles = new ArrayList<StoreFile>(fileNames.size());
2206       for (String file : fileNames) {
2207         // open the file as a store file (hfile link, etc)
2208         StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
2209         StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
2210         storeFiles.add(storeFile);
2211         HStore.this.storeSize += storeFile.getReader().length();
2212         HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
2213         if (LOG.isInfoEnabled()) {
2214           LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
2215             " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
2216             ", sequenceid=" +  + storeFile.getReader().getSequenceID() +
2217             ", filesize=" + StringUtils.humanReadableInt(storeFile.getReader().length()));
2218         }
2219       }
2220
2221       long snapshotId = -1; // -1 means do not drop
2222       if (dropMemstoreSnapshot && snapshot != null) {
2223         snapshotId = snapshot.getId();
2224       }
2225       HStore.this.updateStorefiles(storeFiles, snapshotId);
2226     }
2227
2228     /**
2229      * Abort the snapshot preparation. Drops the snapshot if any.
2230      * @throws IOException
2231      */
2232     @Override
2233     public void abort() throws IOException {
2234       if (snapshot == null) {
2235         return;
2236       }
2237       HStore.this.updateStorefiles(new ArrayList<StoreFile>(0), snapshot.getId());
2238     }
2239   }
2240
2241   @Override
2242   public boolean needsCompaction() {
2243     return this.storeEngine.needsCompaction(this.filesCompacting);
2244   }
2245
2246   @Override
2247   public CacheConfig getCacheConfig() {
2248     return this.cacheConf;
2249   }
2250
2251   public static final long FIXED_OVERHEAD =
2252       ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG)
2253               + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
2254
2255   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2256       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2257       + ClassSize.CONCURRENT_SKIPLISTMAP
2258       + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2259       + ScanInfo.FIXED_OVERHEAD);
2260
2261   @Override
2262   public long heapSize() {
2263     return DEEP_OVERHEAD + this.memstore.heapSize();
2264   }
2265
2266   @Override
2267   public CellComparator getComparator() {
2268     return comparator;
2269   }
2270
2271   @Override
2272   public ScanInfo getScanInfo() {
2273     return scanInfo;
2274   }
2275
2276   /**
2277    * Set scan info, used by test
2278    * @param scanInfo new scan info to use for test
2279    */
2280   void setScanInfo(ScanInfo scanInfo) {
2281     this.scanInfo = scanInfo;
2282   }
2283
2284   @Override
2285   public boolean hasTooManyStoreFiles() {
2286     return getStorefilesCount() > this.blockingFileCount;
2287   }
2288
2289   @Override
2290   public long getFlushedCellsCount() {
2291     return flushedCellsCount;
2292   }
2293
2294   @Override
2295   public long getFlushedCellsSize() {
2296     return flushedCellsSize;
2297   }
2298
2299   @Override
2300   public long getFlushedOutputFileSize() {
2301     return flushedOutputFileSize;
2302   }
2303
2304   @Override
2305   public long getCompactedCellsCount() {
2306     return compactedCellsCount;
2307   }
2308
2309   @Override
2310   public long getCompactedCellsSize() {
2311     return compactedCellsSize;
2312   }
2313
2314   @Override
2315   public long getMajorCompactedCellsCount() {
2316     return majorCompactedCellsCount;
2317   }
2318
2319   @Override
2320   public long getMajorCompactedCellsSize() {
2321     return majorCompactedCellsSize;
2322   }
2323
2324   /**
2325    * Returns the StoreEngine that is backing this concrete implementation of Store.
2326    * @return Returns the {@link StoreEngine} object used internally inside this HStore object.
2327    */
2328   @VisibleForTesting
2329   public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2330     return this.storeEngine;
2331   }
2332
2333   protected OffPeakHours getOffPeakHours() {
2334     return this.offPeakHours;
2335   }
2336
2337   /**
2338    * {@inheritDoc}
2339    */
2340   @Override
2341   public void onConfigurationChange(Configuration conf) {
2342     this.conf = new CompoundConfiguration()
2343             .add(conf)
2344             .addBytesMap(family.getValues());
2345     this.storeEngine.compactionPolicy.setConf(conf);
2346     this.offPeakHours = OffPeakHours.getInstance(conf);
2347   }
2348
2349   /**
2350    * {@inheritDoc}
2351    */
2352   @Override
2353   public void registerChildren(ConfigurationManager manager) {
2354     // No children to register
2355   }
2356
2357   /**
2358    * {@inheritDoc}
2359    */
2360   @Override
2361   public void deregisterChildren(ConfigurationManager manager) {
2362     // No children to deregister
2363   }
2364
2365   @Override
2366   public double getCompactionPressure() {
2367     return storeEngine.getStoreFileManager().getCompactionPressure();
2368   }
2369
2370   @Override
2371   public boolean isPrimaryReplicaStore() {
2372 	   return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
2373   }
2374
2375   @Override
2376   public void closeAndArchiveCompactedFiles() throws IOException {
2377     lock.readLock().lock();
2378     Collection<StoreFile> copyCompactedfiles = null;
2379     try {
2380       Collection<StoreFile> compactedfiles =
2381           this.getStoreEngine().getStoreFileManager().getCompactedfiles();
2382       if (compactedfiles != null && compactedfiles.size() != 0) {
2383         // Do a copy under read lock
2384         copyCompactedfiles = new ArrayList<StoreFile>(compactedfiles);
2385       } else {
2386         if (LOG.isTraceEnabled()) {
2387           LOG.trace("No compacted files to archive");
2388           return;
2389         }
2390       }
2391     } finally {
2392       lock.readLock().unlock();
2393     }
2394     if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
2395       removeCompactedfiles(copyCompactedfiles);
2396     }
2397   }
2398
2399   /**
2400    * Archives and removes the compacted files
2401    * @param compactedfiles The compacted files in this store that are not active in reads
2402    * @throws IOException
2403    */
2404   private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
2405       throws IOException {
2406     final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
2407     for (final StoreFile file : compactedfiles) {
2408       synchronized (file) {
2409         try {
2410           StoreFileReader r = file.getReader();
2411           if (r == null) {
2412             if (LOG.isDebugEnabled()) {
2413               LOG.debug("The file " + file + " was closed but still not archived.");
2414             }
2415             filesToRemove.add(file);
2416           }
2417           if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
2418             // Even if deleting fails we need not bother as any new scanners won't be
2419             // able to use the compacted file as the status is already compactedAway
2420             if (LOG.isTraceEnabled()) {
2421               LOG.trace("Closing and archiving the file " + file.getPath());
2422             }
2423             r.close(true);
2424             // Just close and return
2425             filesToRemove.add(file);
2426           }
2427         } catch (Exception e) {
2428           LOG.error(
2429             "Exception while trying to close the compacted store file " + file.getPath().getName());
2430         }
2431       }
2432     }
2433     if (this.isPrimaryReplicaStore()) {
2434       // Only the primary region is allowed to move the file to archive.
2435       // The secondary region does not move the files to archive. Any active reads from
2436       // the secondary region will still work because the file as such has active readers on it.
2437       if (!filesToRemove.isEmpty()) {
2438         if (LOG.isDebugEnabled()) {
2439           LOG.debug("Moving the files " + filesToRemove + " to archive");
2440         }
2441         // Only if this is successful it has to be removed
2442         this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToRemove);
2443       }
2444     }
2445     if (!filesToRemove.isEmpty()) {
2446       // Clear the compactedfiles from the store file manager
2447       clearCompactedfiles(filesToRemove);
2448     }
2449   }
2450
2451   @Override public void finalizeFlush() {
2452     memstore.finalizeFlush();
2453   }
2454
2455   @Override public MemStore getMemStore() {
2456     return memstore;
2457   }
2458
2459   private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {
2460     if (LOG.isTraceEnabled()) {
2461       LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");
2462     }
2463     try {
2464       lock.writeLock().lock();
2465       this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
2466     } finally {
2467       lock.writeLock().unlock();
2468     }
2469   }
2470 }