View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.InetSocketAddress;
24  import java.security.Key;
25  import java.security.KeyException;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.Collections;
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.NavigableSet;
34  import java.util.Set;
35  import java.util.concurrent.Callable;
36  import java.util.concurrent.CompletionService;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.ExecutorCompletionService;
40  import java.util.concurrent.Future;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  import java.util.concurrent.locks.ReentrantReadWriteLock;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.fs.FileSystem;
49  import org.apache.hadoop.fs.Path;
50  import org.apache.hadoop.hbase.Cell;
51  import org.apache.hadoop.hbase.CellComparator;
52  import org.apache.hadoop.hbase.CellUtil;
53  import org.apache.hadoop.hbase.CompoundConfiguration;
54  import org.apache.hadoop.hbase.HColumnDescriptor;
55  import org.apache.hadoop.hbase.HConstants;
56  import org.apache.hadoop.hbase.HRegionInfo;
57  import org.apache.hadoop.hbase.KeyValue;
58  import org.apache.hadoop.hbase.TableName;
59  import org.apache.hadoop.hbase.classification.InterfaceAudience;
60  import org.apache.hadoop.hbase.client.Scan;
61  import org.apache.hadoop.hbase.conf.ConfigurationManager;
62  import org.apache.hadoop.hbase.io.compress.Compression;
63  import org.apache.hadoop.hbase.io.crypto.Cipher;
64  import org.apache.hadoop.hbase.io.crypto.Encryption;
65  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
66  import org.apache.hadoop.hbase.io.hfile.HFile;
67  import org.apache.hadoop.hbase.io.hfile.HFileContext;
68  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
69  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
70  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
71  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
72  import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
73  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
74  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
75  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
76  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
77  import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
78  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
79  import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
80  import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
81  import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
82  import org.apache.hadoop.hbase.security.EncryptionUtil;
83  import org.apache.hadoop.hbase.security.User;
84  import org.apache.hadoop.hbase.util.Bytes;
85  import org.apache.hadoop.hbase.util.ChecksumType;
86  import org.apache.hadoop.hbase.util.ClassSize;
87  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
88  import org.apache.hadoop.hbase.util.Pair;
89  import org.apache.hadoop.hbase.util.ReflectionUtils;
90  import org.apache.hadoop.ipc.RemoteException;
91  import org.apache.hadoop.util.StringUtils;
92  
93  import com.google.common.annotations.VisibleForTesting;
94  import com.google.common.base.Preconditions;
95  import com.google.common.collect.ImmutableCollection;
96  import com.google.common.collect.ImmutableList;
97  import com.google.common.collect.Lists;
98  import com.google.common.collect.Sets;
99  
100 /**
101  * A Store holds a column family in a Region.  Its a memstore and a set of zero
102  * or more StoreFiles, which stretch backwards over time.
103  *
104  * <p>There's no reason to consider append-logging at this level; all logging
105  * and locking is handled at the HRegion level.  Store just provides
106  * services to manage sets of StoreFiles.  One of the most important of those
107  * services is compaction services where files are aggregated once they pass
108  * a configurable threshold.
109  *
110  * <p>The only thing having to do with logs that Store needs to deal with is
111  * the reconstructionLog.  This is a segment of an HRegion's log that might
112  * NOT be present upon startup.  If the param is NULL, there's nothing to do.
113  * If the param is non-NULL, we need to process the log to reconstruct
114  * a TreeMap that might not have been written to disk before the process
115  * died.
116  *
117  * <p>It's assumed that after this constructor returns, the reconstructionLog
118  * file will be deleted (by whoever has instantiated the Store).
119  *
120  * <p>Locking and transactions are handled at a higher level.  This API should
121  * not be called directly but by an HRegion manager.
122  */
123 @InterfaceAudience.Private
124 public class HStore implements Store {
125   private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
126   public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
127       "hbase.server.compactchecker.interval.multiplier";
128   public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
129   public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
130   public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
131 
132   static final Log LOG = LogFactory.getLog(HStore.class);
133 
134   protected final MemStore memstore;
135   // This stores directory in the filesystem.
136   private final HRegion region;
137   private final HColumnDescriptor family;
138   private final HRegionFileSystem fs;
139   private Configuration conf;
140   private final CacheConfig cacheConf;
141   private long lastCompactSize = 0;
142   volatile boolean forceMajor = false;
143   /* how many bytes to write between status checks */
144   static int closeCheckInterval = 0;
145   private volatile long storeSize = 0L;
146   private volatile long totalUncompressedBytes = 0L;
147 
148   /**
149    * RWLock for store operations.
150    * Locked in shared mode when the list of component stores is looked at:
151    *   - all reads/writes to table data
152    *   - checking for split
153    * Locked in exclusive mode when the list of component stores is modified:
154    *   - closing
155    *   - completing a compaction
156    */
157   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
158   private final boolean verifyBulkLoads;
159 
160   private ScanInfo scanInfo;
161 
162   // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
163   final List<StoreFile> filesCompacting = Lists.newArrayList();
164 
165   // All access must be synchronized.
166   private final Set<ChangedReadersObserver> changedReaderObservers =
167     Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
168 
169   private final int blocksize;
170   private HFileDataBlockEncoder dataBlockEncoder;
171 
172   /** Checksum configuration */
173   private ChecksumType checksumType;
174   private int bytesPerChecksum;
175 
176   // Comparing KeyValues
177   private final KeyValue.KVComparator comparator;
178 
179   final StoreEngine<?, ?, ?, ?> storeEngine;
180 
181   private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
182   private volatile OffPeakHours offPeakHours;
183 
184   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
185   private int flushRetriesNumber;
186   private int pauseTime;
187 
188   private long blockingFileCount;
189   private int compactionCheckMultiplier;
190 
191   private Encryption.Context cryptoContext = Encryption.Context.NONE;
192 
193   private volatile long flushedCellsCount = 0;
194   private volatile long compactedCellsCount = 0;
195   private volatile long majorCompactedCellsCount = 0;
196   private volatile long flushedCellsSize = 0;
197   private volatile long compactedCellsSize = 0;
198   private volatile long majorCompactedCellsSize = 0;
199 
200   /**
201    * Constructor
202    * @param region
203    * @param family HColumnDescriptor for this column
204    * @param confParam configuration object
205    * failed.  Can be null.
206    * @throws IOException
207    */
208   protected HStore(final HRegion region, final HColumnDescriptor family,
209       final Configuration confParam) throws IOException {
210 
211     HRegionInfo info = region.getRegionInfo();
212     this.fs = region.getRegionFileSystem();
213 
214     // Assemble the store's home directory and Ensure it exists.
215     fs.createStoreDir(family.getNameAsString());
216     this.region = region;
217     this.family = family;
218     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
219     // CompoundConfiguration will look for keys in reverse order of addition, so we'd
220     // add global config first, then table and cf overrides, then cf metadata.
221     this.conf = new CompoundConfiguration()
222       .add(confParam)
223       .addStringMap(region.getTableDesc().getConfiguration())
224       .addStringMap(family.getConfiguration())
225       .addBytesMap(family.getValues());
226     this.blocksize = family.getBlocksize();
227 
228     this.dataBlockEncoder =
229         new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
230 
231     this.comparator = info.getComparator();
232     // used by ScanQueryMatcher
233     long timeToPurgeDeletes =
234         Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
235     LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
236         "ms in store " + this);
237     // Get TTL
238     long ttl = determineTTLFromFamily(family);
239     // Why not just pass a HColumnDescriptor in here altogether?  Even if have
240     // to clone it?
241     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
242     String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
243     this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
244         Configuration.class, KeyValue.KVComparator.class }, new Object[] { conf, this.comparator });
245     this.offPeakHours = OffPeakHours.getInstance(conf);
246 
247     // Setting up cache configuration for this family
248     this.cacheConf = new CacheConfig(conf, family);
249 
250     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
251 
252     this.blockingFileCount =
253         conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
254     this.compactionCheckMultiplier = conf.getInt(
255         COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
256     if (this.compactionCheckMultiplier <= 0) {
257       LOG.error("Compaction check period multiplier must be positive, setting default: "
258           + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
259       this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
260     }
261 
262     if (HStore.closeCheckInterval == 0) {
263       HStore.closeCheckInterval = conf.getInt(
264           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
265     }
266 
267     this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
268     this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
269 
270     // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
271     this.checksumType = getChecksumType(conf);
272     // initilize bytes per checksum
273     this.bytesPerChecksum = getBytesPerChecksum(conf);
274     flushRetriesNumber = conf.getInt(
275         "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
276     pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
277     if (flushRetriesNumber <= 0) {
278       throw new IllegalArgumentException(
279           "hbase.hstore.flush.retries.number must be > 0, not "
280               + flushRetriesNumber);
281     }
282 
283     // Crypto context for new store files
284     String cipherName = family.getEncryptionType();
285     if (cipherName != null) {
286       Cipher cipher;
287       Key key;
288       byte[] keyBytes = family.getEncryptionKey();
289       if (keyBytes != null) {
290         // Family provides specific key material
291         String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
292           User.getCurrent().getShortName());
293         try {
294           // First try the master key
295           key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
296         } catch (KeyException e) {
297           // If the current master key fails to unwrap, try the alternate, if
298           // one is configured
299           if (LOG.isDebugEnabled()) {
300             LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
301           }
302           String alternateKeyName =
303             conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
304           if (alternateKeyName != null) {
305             try {
306               key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
307             } catch (KeyException ex) {
308               throw new IOException(ex);
309             }
310           } else {
311             throw new IOException(e);
312           }
313         }
314         // Use the algorithm the key wants
315         cipher = Encryption.getCipher(conf, key.getAlgorithm());
316         if (cipher == null) {
317           throw new RuntimeException("Cipher '" + cipher + "' is not available");
318         }
319         // Fail if misconfigured
320         // We use the encryption type specified in the column schema as a sanity check on
321         // what the wrapped key is telling us
322         if (!cipher.getName().equalsIgnoreCase(cipherName)) {
323           throw new RuntimeException("Encryption for family '" + family.getNameAsString() +
324             "' configured with type '" + cipherName +
325             "' but key specifies algorithm '" + cipher.getName() + "'");
326         }
327       } else {
328         // Family does not provide key material, create a random key
329         cipher = Encryption.getCipher(conf, cipherName);
330         if (cipher == null) {
331           throw new RuntimeException("Cipher '" + cipher + "' is not available");
332         }
333         key = cipher.getRandomKey();
334       }
335       cryptoContext = Encryption.newContext(conf);
336       cryptoContext.setCipher(cipher);
337       cryptoContext.setKey(key);
338     }
339   }
340 
341   /**
342    * @param family
343    * @return TTL in seconds of the specified family
344    */
345   private static long determineTTLFromFamily(final HColumnDescriptor family) {
346     // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
347     long ttl = family.getTimeToLive();
348     if (ttl == HConstants.FOREVER) {
349       // Default is unlimited ttl.
350       ttl = Long.MAX_VALUE;
351     } else if (ttl == -1) {
352       ttl = Long.MAX_VALUE;
353     } else {
354       // Second -> ms adjust for user data
355       ttl *= 1000;
356     }
357     return ttl;
358   }
359 
360   @Override
361   public String getColumnFamilyName() {
362     return this.family.getNameAsString();
363   }
364 
365   @Override
366   public TableName getTableName() {
367     return this.getRegionInfo().getTable();
368   }
369 
370   @Override
371   public FileSystem getFileSystem() {
372     return this.fs.getFileSystem();
373   }
374 
375   public HRegionFileSystem getRegionFileSystem() {
376     return this.fs;
377   }
378 
379   /* Implementation of StoreConfigInformation */
380   @Override
381   public long getStoreFileTtl() {
382     // TTL only applies if there's no MIN_VERSIONs setting on the column.
383     return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
384   }
385 
386   @Override
387   public long getMemstoreFlushSize() {
388     // TODO: Why is this in here?  The flushsize of the region rather than the store?  St.Ack
389     return this.region.memstoreFlushSize;
390   }
391 
392   @Override
393   public long getFlushableSize() {
394     return this.memstore.getFlushableSize();
395   }
396 
397   @Override
398   public long getCompactionCheckMultiplier() {
399     return this.compactionCheckMultiplier;
400   }
401 
402   @Override
403   public long getBlockingFileCount() {
404     return blockingFileCount;
405   }
406   /* End implementation of StoreConfigInformation */
407 
408   /**
409    * Returns the configured bytesPerChecksum value.
410    * @param conf The configuration
411    * @return The bytesPerChecksum that is set in the configuration
412    */
413   public static int getBytesPerChecksum(Configuration conf) {
414     return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
415                        HFile.DEFAULT_BYTES_PER_CHECKSUM);
416   }
417 
418   /**
419    * Returns the configured checksum algorithm.
420    * @param conf The configuration
421    * @return The checksum algorithm that is set in the configuration
422    */
423   public static ChecksumType getChecksumType(Configuration conf) {
424     String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
425     if (checksumName == null) {
426       return HFile.DEFAULT_CHECKSUM_TYPE;
427     } else {
428       return ChecksumType.nameToType(checksumName);
429     }
430   }
431 
432   /**
433    * @return how many bytes to write between status checks
434    */
435   public static int getCloseCheckInterval() {
436     return closeCheckInterval;
437   }
438 
439   @Override
440   public HColumnDescriptor getFamily() {
441     return this.family;
442   }
443 
444   /**
445    * @return The maximum sequence id in all store files. Used for log replay.
446    */
447   long getMaxSequenceId() {
448     return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
449   }
450 
451   @Override
452   public long getMaxMemstoreTS() {
453     return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
454   }
455 
456   /**
457    * @param tabledir {@link Path} to where the table is being stored
458    * @param hri {@link HRegionInfo} for the region.
459    * @param family {@link HColumnDescriptor} describing the column family
460    * @return Path to family/Store home directory.
461    */
462   @Deprecated
463   public static Path getStoreHomedir(final Path tabledir,
464       final HRegionInfo hri, final byte[] family) {
465     return getStoreHomedir(tabledir, hri.getEncodedName(), family);
466   }
467 
468   /**
469    * @param tabledir {@link Path} to where the table is being stored
470    * @param encodedName Encoded region name.
471    * @param family {@link HColumnDescriptor} describing the column family
472    * @return Path to family/Store home directory.
473    */
474   @Deprecated
475   public static Path getStoreHomedir(final Path tabledir,
476       final String encodedName, final byte[] family) {
477     return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
478   }
479 
480   @Override
481   public HFileDataBlockEncoder getDataBlockEncoder() {
482     return dataBlockEncoder;
483   }
484 
485   /**
486    * Should be used only in tests.
487    * @param blockEncoder the block delta encoder to use
488    */
489   void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
490     this.dataBlockEncoder = blockEncoder;
491   }
492 
493   /**
494    * Creates an unsorted list of StoreFile loaded in parallel
495    * from the given directory.
496    * @throws IOException
497    */
498   private List<StoreFile> loadStoreFiles() throws IOException {
499     Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
500     return openStoreFiles(files);
501   }
502 
503   private List<StoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
504     if (files == null || files.size() == 0) {
505       return new ArrayList<StoreFile>();
506     }
507     // initialize the thread pool for opening store files in parallel..
508     ThreadPoolExecutor storeFileOpenerThreadPool =
509       this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
510           this.getColumnFamilyName());
511     CompletionService<StoreFile> completionService =
512       new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
513 
514     int totalValidStoreFile = 0;
515     for (final StoreFileInfo storeFileInfo: files) {
516       // open each store file in parallel
517       completionService.submit(new Callable<StoreFile>() {
518         @Override
519         public StoreFile call() throws IOException {
520           StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
521           return storeFile;
522         }
523       });
524       totalValidStoreFile++;
525     }
526 
527     ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
528     IOException ioe = null;
529     try {
530       for (int i = 0; i < totalValidStoreFile; i++) {
531         try {
532           Future<StoreFile> future = completionService.take();
533           StoreFile storeFile = future.get();
534           long length = storeFile.getReader().length();
535           this.storeSize += length;
536           this.totalUncompressedBytes +=
537               storeFile.getReader().getTotalUncompressedBytes();
538           if (LOG.isDebugEnabled()) {
539             LOG.debug("loaded " + storeFile.toStringDetailed());
540           }
541           results.add(storeFile);
542         } catch (InterruptedException e) {
543           if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
544         } catch (ExecutionException e) {
545           if (ioe == null) ioe = new IOException(e.getCause());
546         }
547       }
548     } finally {
549       storeFileOpenerThreadPool.shutdownNow();
550     }
551     if (ioe != null) {
552       // close StoreFile readers
553       for (StoreFile file : results) {
554         try {
555           if (file != null) file.closeReader(true);
556         } catch (IOException e) {
557           LOG.warn(e.getMessage());
558         }
559       }
560       throw ioe;
561     }
562 
563     return results;
564   }
565 
566   /**
567    * Checks the underlying store files, and opens the files that  have not
568    * been opened, and removes the store file readers for store files no longer
569    * available. Mainly used by secondary region replicas to keep up to date with
570    * the primary region files.
571    * @throws IOException
572    */
573   @Override
574   public void refreshStoreFiles() throws IOException {
575     StoreFileManager sfm = storeEngine.getStoreFileManager();
576     Collection<StoreFile> currentFiles = sfm.getStorefiles();
577     if (currentFiles == null) currentFiles = new ArrayList<StoreFile>(0);
578 
579     Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
580     if (newFiles == null) newFiles = new ArrayList<StoreFileInfo>(0);
581 
582     HashMap<StoreFileInfo, StoreFile> currentFilesSet = new HashMap<StoreFileInfo, StoreFile>(currentFiles.size());
583     for (StoreFile sf : currentFiles) {
584       currentFilesSet.put(sf.getFileInfo(), sf);
585     }
586     HashSet<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
587 
588     Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
589     Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
590 
591     if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
592       return;
593     }
594 
595     LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString()
596       + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
597 
598     Set<StoreFile> toBeRemovedStoreFiles = new HashSet<StoreFile>(toBeRemovedFiles.size());
599     for (StoreFileInfo sfi : toBeRemovedFiles) {
600       toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
601     }
602 
603     // try to open the files
604     List<StoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
605 
606     // propogate the file changes to the underlying store file manager
607     replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception
608 
609     // Advance the memstore read point to be at least the new store files seqIds so that
610     // readers might pick it up. This assumes that the store is not getting any writes (otherwise
611     // in-flight transactions might be made visible)
612     if (!toBeAddedFiles.isEmpty()) {
613       region.getMVCC().advanceMemstoreReadPointIfNeeded(this.getMaxSequenceId());
614     }
615 
616     // notify scanners, close file readers, and recompute store size
617     completeCompaction(toBeRemovedStoreFiles, false);
618   }
619 
620   private StoreFile createStoreFileAndReader(final Path p) throws IOException {
621     StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
622     return createStoreFileAndReader(info);
623   }
624 
625   private StoreFile createStoreFileAndReader(final StoreFileInfo info)
626       throws IOException {
627     info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
628     StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
629       this.family.getBloomFilterType());
630     storeFile.createReader();
631     return storeFile;
632   }
633 
634   @Override
635   public Pair<Long, Cell> add(final Cell cell) {
636     lock.readLock().lock();
637     try {
638        return this.memstore.add(cell);
639     } finally {
640       lock.readLock().unlock();
641     }
642   }
643 
644   @Override
645   public long timeOfOldestEdit() {
646     return memstore.timeOfOldestEdit();
647   }
648 
649   /**
650    * Adds a value to the memstore
651    *
652    * @param kv
653    * @return memstore size delta
654    */
655   protected long delete(final KeyValue kv) {
656     lock.readLock().lock();
657     try {
658       return this.memstore.delete(kv);
659     } finally {
660       lock.readLock().unlock();
661     }
662   }
663 
664   @Override
665   public void rollback(final Cell cell) {
666     lock.readLock().lock();
667     try {
668       this.memstore.rollback(cell);
669     } finally {
670       lock.readLock().unlock();
671     }
672   }
673 
674   /**
675    * @return All store files.
676    */
677   @Override
678   public Collection<StoreFile> getStorefiles() {
679     return this.storeEngine.getStoreFileManager().getStorefiles();
680   }
681 
682   @Override
683   public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
684     HFile.Reader reader  = null;
685     try {
686       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
687           + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
688       reader = HFile.createReader(srcPath.getFileSystem(conf),
689           srcPath, cacheConf, conf);
690       reader.loadFileInfo();
691 
692       byte[] firstKey = reader.getFirstRowKey();
693       Preconditions.checkState(firstKey != null, "First key can not be null");
694       byte[] lk = reader.getLastKey();
695       Preconditions.checkState(lk != null, "Last key can not be null");
696       byte[] lastKey =  KeyValue.createKeyValueFromKey(lk).getRow();
697 
698       LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
699           " last=" + Bytes.toStringBinary(lastKey));
700       LOG.debug("Region bounds: first=" +
701           Bytes.toStringBinary(getRegionInfo().getStartKey()) +
702           " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
703 
704       if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
705         throw new WrongRegionException(
706             "Bulk load file " + srcPath.toString() + " does not fit inside region "
707             + this.getRegionInfo().getRegionNameAsString());
708       }
709 
710       if (verifyBulkLoads) {
711         long verificationStartTime = EnvironmentEdgeManager.currentTime();
712         LOG.info("Full verification started for bulk load hfile: " + srcPath.toString());
713         Cell prevCell = null;
714         HFileScanner scanner = reader.getScanner(false, false, false);
715         scanner.seekTo();
716         do {
717           Cell cell = scanner.getKeyValue();
718           if (prevCell != null) {
719             if (CellComparator.compareRows(prevCell, cell) > 0) {
720               throw new InvalidHFileException("Previous row is greater than"
721                   + " current row: path=" + srcPath + " previous="
722                   + CellUtil.getCellKeyAsString(prevCell) + " current="
723                   + CellUtil.getCellKeyAsString(cell));
724             }
725             if (CellComparator.compareFamilies(prevCell, cell) != 0) {
726               throw new InvalidHFileException("Previous key had different"
727                   + " family compared to current key: path=" + srcPath
728                   + " previous="
729                   + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
730                       prevCell.getFamilyLength())
731                   + " current="
732                   + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
733                       cell.getFamilyLength()));
734             }
735           }
736           prevCell = cell;
737         } while (scanner.next());
738       LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString()
739          + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime)
740          + " ms");
741       }
742     } finally {
743       if (reader != null) reader.close();
744     }
745   }
746 
747   @Override
748   public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
749     Path srcPath = new Path(srcPathStr);
750     Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
751 
752     StoreFile sf = createStoreFileAndReader(dstPath);
753 
754     StoreFile.Reader r = sf.getReader();
755     this.storeSize += r.length();
756     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
757 
758     LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() +
759         "' as " + dstPath + " - updating store file list.");
760 
761     // Append the new storefile into the list
762     this.lock.writeLock().lock();
763     try {
764       this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
765     } finally {
766       // We need the lock, as long as we are updating the storeFiles
767       // or changing the memstore. Let us release it before calling
768       // notifyChangeReadersObservers. See HBASE-4485 for a possible
769       // deadlock scenario that could have happened if continue to hold
770       // the lock.
771       this.lock.writeLock().unlock();
772     }
773     notifyChangedReadersObservers();
774     LOG.info("Successfully loaded store file " + srcPath
775         + " into store " + this + " (new location: " + dstPath + ")");
776     if (LOG.isTraceEnabled()) {
777       String traceMessage = "BULK LOAD time,size,store size,store files ["
778           + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
779           + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
780       LOG.trace(traceMessage);
781     }
782   }
783 
784   @Override
785   public ImmutableCollection<StoreFile> close() throws IOException {
786     this.lock.writeLock().lock();
787     try {
788       // Clear so metrics doesn't find them.
789       ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
790 
791       if (!result.isEmpty()) {
792         // initialize the thread pool for closing store files in parallel.
793         ThreadPoolExecutor storeFileCloserThreadPool = this.region
794             .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
795                 + this.getColumnFamilyName());
796 
797         // close each store file in parallel
798         CompletionService<Void> completionService =
799           new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
800         for (final StoreFile f : result) {
801           completionService.submit(new Callable<Void>() {
802             @Override
803             public Void call() throws IOException {
804               f.closeReader(true);
805               return null;
806             }
807           });
808         }
809 
810         IOException ioe = null;
811         try {
812           for (int i = 0; i < result.size(); i++) {
813             try {
814               Future<Void> future = completionService.take();
815               future.get();
816             } catch (InterruptedException e) {
817               if (ioe == null) {
818                 ioe = new InterruptedIOException();
819                 ioe.initCause(e);
820               }
821             } catch (ExecutionException e) {
822               if (ioe == null) ioe = new IOException(e.getCause());
823             }
824           }
825         } finally {
826           storeFileCloserThreadPool.shutdownNow();
827         }
828         if (ioe != null) throw ioe;
829       }
830       LOG.info("Closed " + this);
831       return result;
832     } finally {
833       this.lock.writeLock().unlock();
834     }
835   }
836 
837   /**
838    * Snapshot this stores memstore. Call before running
839    * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask)}
840    *  so it has some work to do.
841    */
842   void snapshot() {
843     this.lock.writeLock().lock();
844     try {
845       this.memstore.snapshot();
846     } finally {
847       this.lock.writeLock().unlock();
848     }
849   }
850 
851   /**
852    * Write out current snapshot.  Presumes {@link #snapshot()} has been called
853    * previously.
854    * @param logCacheFlushId flush sequence number
855    * @param snapshot
856    * @param status
857    * @return The path name of the tmp file to which the store was flushed
858    * @throws IOException
859    */
860   protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
861       MonitoredTask status) throws IOException {
862     // If an exception happens flushing, we let it out without clearing
863     // the memstore snapshot.  The old snapshot will be returned when we say
864     // 'snapshot', the next time flush comes around.
865     // Retry after catching exception when flushing, otherwise server will abort
866     // itself
867     StoreFlusher flusher = storeEngine.getStoreFlusher();
868     IOException lastException = null;
869     for (int i = 0; i < flushRetriesNumber; i++) {
870       try {
871         List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
872         Path lastPathName = null;
873         try {
874           for (Path pathName : pathNames) {
875             lastPathName = pathName;
876             validateStoreFile(pathName);
877           }
878           return pathNames;
879         } catch (Exception e) {
880           LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
881           if (e instanceof IOException) {
882             lastException = (IOException) e;
883           } else {
884             lastException = new IOException(e);
885           }
886         }
887       } catch (IOException e) {
888         LOG.warn("Failed flushing store file, retrying num=" + i, e);
889         lastException = e;
890       }
891       if (lastException != null && i < (flushRetriesNumber - 1)) {
892         try {
893           Thread.sleep(pauseTime);
894         } catch (InterruptedException e) {
895           IOException iie = new InterruptedIOException();
896           iie.initCause(e);
897           throw iie;
898         }
899       }
900     }
901     throw lastException;
902   }
903 
904   /*
905    * @param path The pathname of the tmp file into which the store was flushed
906    * @param logCacheFlushId
907    * @param status
908    * @return StoreFile created.
909    * @throws IOException
910    */
911   private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
912       throws IOException {
913     // Write-out finished successfully, move into the right spot
914     Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
915 
916     status.setStatus("Flushing " + this + ": reopening flushed file");
917     StoreFile sf = createStoreFileAndReader(dstPath);
918 
919     StoreFile.Reader r = sf.getReader();
920     this.storeSize += r.length();
921     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
922 
923     if (LOG.isInfoEnabled()) {
924       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
925         ", sequenceid=" + logCacheFlushId +
926         ", filesize=" + StringUtils.humanReadableInt(r.length()));
927     }
928     return sf;
929   }
930 
931   /*
932    * @param maxKeyCount
933    * @param compression Compression algorithm to use
934    * @param isCompaction whether we are creating a new file in a compaction
935    * @param includesMVCCReadPoint - whether to include MVCC or not
936    * @param includesTag - includesTag or not
937    * @return Writer for a new StoreFile in the tmp dir.
938    */
939   @Override
940   public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
941       boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
942   throws IOException {
943     final CacheConfig writerCacheConf;
944     if (isCompaction) {
945       // Don't cache data on write on compactions.
946       writerCacheConf = new CacheConfig(cacheConf);
947       writerCacheConf.setCacheDataOnWrite(false);
948     } else {
949       writerCacheConf = cacheConf;
950     }
951     InetSocketAddress[] favoredNodes = null;
952     if (region.getRegionServerServices() != null) {
953       favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
954           region.getRegionInfo().getEncodedName());
955     }
956     HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
957       cryptoContext);
958     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
959         this.getFileSystem())
960             .withFilePath(fs.createTempName())
961             .withComparator(comparator)
962             .withBloomType(family.getBloomFilterType())
963             .withMaxKeyCount(maxKeyCount)
964             .withFavoredNodes(favoredNodes)
965             .withFileContext(hFileContext)
966             .build();
967     return w;
968   }
969 
970   private HFileContext createFileContext(Compression.Algorithm compression,
971       boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
972     if (compression == null) {
973       compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
974     }
975     HFileContext hFileContext = new HFileContextBuilder()
976                                 .withIncludesMvcc(includeMVCCReadpoint)
977                                 .withIncludesTags(includesTag)
978                                 .withCompression(compression)
979                                 .withCompressTags(family.isCompressTags())
980                                 .withChecksumType(checksumType)
981                                 .withBytesPerCheckSum(bytesPerChecksum)
982                                 .withBlockSize(blocksize)
983                                 .withHBaseCheckSum(true)
984                                 .withDataBlockEncoding(family.getDataBlockEncoding())
985                                 .withEncryptionContext(cryptoContext)
986                                 .build();
987     return hFileContext;
988   }
989 
990 
991   /*
992    * Change storeFiles adding into place the Reader produced by this new flush.
993    * @param sfs Store files
994    * @param snapshotId
995    * @throws IOException
996    * @return Whether compaction is required.
997    */
998   private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
999       throws IOException {
1000     this.lock.writeLock().lock();
1001     try {
1002       this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
1003       this.memstore.clearSnapshot(snapshotId);
1004     } finally {
1005       // We need the lock, as long as we are updating the storeFiles
1006       // or changing the memstore. Let us release it before calling
1007       // notifyChangeReadersObservers. See HBASE-4485 for a possible
1008       // deadlock scenario that could have happened if continue to hold
1009       // the lock.
1010       this.lock.writeLock().unlock();
1011     }
1012 
1013     // Tell listeners of the change in readers.
1014     notifyChangedReadersObservers();
1015 
1016     if (LOG.isTraceEnabled()) {
1017       long totalSize = 0;
1018       for (StoreFile sf : sfs) {
1019         totalSize += sf.getReader().length();
1020       }
1021       String traceMessage = "FLUSH time,count,size,store size,store files ["
1022           + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
1023           + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
1024       LOG.trace(traceMessage);
1025     }
1026     return needsCompaction();
1027   }
1028 
1029   /*
1030    * Notify all observers that set of Readers has changed.
1031    * @throws IOException
1032    */
1033   private void notifyChangedReadersObservers() throws IOException {
1034     for (ChangedReadersObserver o: this.changedReaderObservers) {
1035       o.updateReaders();
1036     }
1037   }
1038 
1039   /**
1040    * Get all scanners with no filtering based on TTL (that happens further down
1041    * the line).
1042    * @return all scanners for this store
1043    */
1044   @Override
1045   public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
1046       boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1047       byte[] stopRow, long readPt) throws IOException {
1048     Collection<StoreFile> storeFilesToScan;
1049     List<KeyValueScanner> memStoreScanners;
1050     this.lock.readLock().lock();
1051     try {
1052       storeFilesToScan =
1053           this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
1054       memStoreScanners = this.memstore.getScanners(readPt);
1055     } finally {
1056       this.lock.readLock().unlock();
1057     }
1058 
1059     // First the store file scanners
1060 
1061     // TODO this used to get the store files in descending order,
1062     // but now we get them in ascending order, which I think is
1063     // actually more correct, since memstore get put at the end.
1064     List<StoreFileScanner> sfScanners = StoreFileScanner
1065       .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher,
1066         readPt);
1067     List<KeyValueScanner> scanners =
1068       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
1069     scanners.addAll(sfScanners);
1070     // Then the memstore scanners
1071     scanners.addAll(memStoreScanners);
1072     return scanners;
1073   }
1074 
1075   @Override
1076   public void addChangedReaderObserver(ChangedReadersObserver o) {
1077     this.changedReaderObservers.add(o);
1078   }
1079 
1080   @Override
1081   public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1082     // We don't check if observer present; it may not be (legitimately)
1083     this.changedReaderObservers.remove(o);
1084   }
1085 
1086   //////////////////////////////////////////////////////////////////////////////
1087   // Compaction
1088   //////////////////////////////////////////////////////////////////////////////
1089 
1090   /**
1091    * Compact the StoreFiles.  This method may take some time, so the calling
1092    * thread must be able to block for long periods.
1093    *
1094    * <p>During this time, the Store can work as usual, getting values from
1095    * StoreFiles and writing new StoreFiles from the memstore.
1096    *
1097    * Existing StoreFiles are not destroyed until the new compacted StoreFile is
1098    * completely written-out to disk.
1099    *
1100    * <p>The compactLock prevents multiple simultaneous compactions.
1101    * The structureLock prevents us from interfering with other write operations.
1102    *
1103    * <p>We don't want to hold the structureLock for the whole time, as a compact()
1104    * can be lengthy and we want to allow cache-flushes during this period.
1105    *
1106    * <p> Compaction event should be idempotent, since there is no IO Fencing for
1107    * the region directory in hdfs. A region server might still try to complete the
1108    * compaction after it lost the region. That is why the following events are carefully
1109    * ordered for a compaction:
1110    *  1. Compaction writes new files under region/.tmp directory (compaction output)
1111    *  2. Compaction atomically moves the temporary file under region directory
1112    *  3. Compaction appends a WAL edit containing the compaction input and output files.
1113    *  Forces sync on WAL.
1114    *  4. Compaction deletes the input files from the region directory.
1115    *
1116    * Failure conditions are handled like this:
1117    *  - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes
1118    *  the compaction later, it will only write the new data file to the region directory.
1119    *  Since we already have this data, this will be idempotent but we will have a redundant
1120    *  copy of the data.
1121    *  - If RS fails between 2 and 3, the region will have a redundant copy of the data. The
1122    *  RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL.
1123    *  - If RS fails after 3, the region region server who opens the region will pick up the
1124    *  the compaction marker from the WAL and replay it by removing the compaction input files.
1125    *  Failed RS can also attempt to delete those files, but the operation will be idempotent
1126    *
1127    * See HBASE-2231 for details.
1128    *
1129    * @param compaction compaction details obtained from requestCompaction()
1130    * @throws IOException
1131    * @return Storefile we compacted into or null if we failed or opted out early.
1132    */
1133   @Override
1134   public List<StoreFile> compact(CompactionContext compaction) throws IOException {
1135     assert compaction != null && compaction.hasSelection();
1136     CompactionRequest cr = compaction.getRequest();
1137     Collection<StoreFile> filesToCompact = cr.getFiles();
1138     assert !filesToCompact.isEmpty();
1139     synchronized (filesCompacting) {
1140       // sanity check: we're compacting files that this store knows about
1141       // TODO: change this to LOG.error() after more debugging
1142       Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1143     }
1144 
1145     // Ready to go. Have list of files to compact.
1146     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1147         + this + " of " + this.getRegionInfo().getRegionNameAsString()
1148         + " into tmpdir=" + fs.getTempDir() + ", totalSize="
1149         + StringUtils.humanReadableInt(cr.getSize()));
1150 
1151     long compactionStartTime = EnvironmentEdgeManager.currentTime();
1152     List<StoreFile> sfs = null;
1153     try {
1154       // Commence the compaction.
1155       List<Path> newFiles = compaction.compact();
1156 
1157       // TODO: get rid of this!
1158       if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1159         LOG.warn("hbase.hstore.compaction.complete is set to false");
1160         sfs = new ArrayList<StoreFile>(newFiles.size());
1161         for (Path newFile : newFiles) {
1162           // Create storefile around what we wrote with a reader on it.
1163           StoreFile sf = createStoreFileAndReader(newFile);
1164           sf.closeReader(true);
1165           sfs.add(sf);
1166         }
1167         return sfs;
1168       }
1169       // Do the steps necessary to complete the compaction.
1170       sfs = moveCompatedFilesIntoPlace(cr, newFiles);
1171       writeCompactionWalRecord(filesToCompact, sfs);
1172       replaceStoreFiles(filesToCompact, sfs);
1173       if (cr.isMajor()) {
1174         majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
1175         majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
1176       } else {
1177         compactedCellsCount += getCompactionProgress().totalCompactingKVs;
1178         compactedCellsSize += getCompactionProgress().totalCompactedSize;
1179       }
1180       // At this point the store will use new files for all new scanners.
1181       completeCompaction(filesToCompact, true); // Archive old files & update store size.
1182     } finally {
1183       finishCompactionRequest(cr);
1184     }
1185     logCompactionEndMessage(cr, sfs, compactionStartTime);
1186     return sfs;
1187   }
1188 
1189   private List<StoreFile> moveCompatedFilesIntoPlace(
1190       CompactionRequest cr, List<Path> newFiles) throws IOException {
1191     List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
1192     for (Path newFile : newFiles) {
1193       assert newFile != null;
1194       StoreFile sf = moveFileIntoPlace(newFile);
1195       if (this.getCoprocessorHost() != null) {
1196         this.getCoprocessorHost().postCompact(this, sf, cr);
1197       }
1198       assert sf != null;
1199       sfs.add(sf);
1200     }
1201     return sfs;
1202   }
1203 
1204   // Package-visible for tests
1205   StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1206     validateStoreFile(newFile);
1207     // Move the file into the right spot
1208     Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1209     return createStoreFileAndReader(destPath);
1210   }
1211 
1212   /**
1213    * Writes the compaction WAL record.
1214    * @param filesCompacted Files compacted (input).
1215    * @param newFiles Files from compaction.
1216    */
1217   private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1218       Collection<StoreFile> newFiles) throws IOException {
1219     if (region.getWAL() == null) return;
1220     List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
1221     for (StoreFile f : filesCompacted) {
1222       inputPaths.add(f.getPath());
1223     }
1224     List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1225     for (StoreFile f : newFiles) {
1226       outputPaths.add(f.getPath());
1227     }
1228     HRegionInfo info = this.region.getRegionInfo();
1229     CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1230         family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1231     WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
1232         this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
1233   }
1234 
1235   @VisibleForTesting
1236   void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1237       final Collection<StoreFile> result) throws IOException {
1238     this.lock.writeLock().lock();
1239     try {
1240       this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1241       filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock();
1242     } finally {
1243       this.lock.writeLock().unlock();
1244     }
1245   }
1246 
1247   /**
1248    * Log a very elaborate compaction completion message.
1249    * @param cr Request.
1250    * @param sfs Resulting files.
1251    * @param compactionStartTime Start time.
1252    */
1253   private void logCompactionEndMessage(
1254       CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
1255     long now = EnvironmentEdgeManager.currentTime();
1256     StringBuilder message = new StringBuilder(
1257       "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
1258       + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
1259       + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into ");
1260     if (sfs.isEmpty()) {
1261       message.append("none, ");
1262     } else {
1263       for (StoreFile sf: sfs) {
1264         message.append(sf.getPath().getName());
1265         message.append("(size=");
1266         message.append(StringUtils.humanReadableInt(sf.getReader().length()));
1267         message.append("), ");
1268       }
1269     }
1270     message.append("total size for store is ")
1271       .append(StringUtils.humanReadableInt(storeSize))
1272       .append(". This selection was in queue for ")
1273       .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1274       .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1275       .append(" to execute.");
1276     LOG.info(message.toString());
1277     if (LOG.isTraceEnabled()) {
1278       int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1279       long resultSize = 0;
1280       for (StoreFile sf : sfs) {
1281         resultSize += sf.getReader().length();
1282       }
1283       String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1284         + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1285           + cr.getFiles().size() + "," + sfs.size() + "," +  storeSize + "," + fileCount + "]";
1286       LOG.trace(traceMessage);
1287     }
1288   }
1289 
1290   /**
1291    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
1292    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
1293    * See HBASE-2231.
1294    * @param compaction
1295    */
1296   @Override
1297   public void completeCompactionMarker(CompactionDescriptor compaction)
1298       throws IOException {
1299     LOG.debug("Completing compaction from the WAL marker");
1300     List<String> compactionInputs = compaction.getCompactionInputList();
1301 
1302     // The Compaction Marker is written after the compaction is completed,
1303     // and the files moved into the region/family folder.
1304     //
1305     // If we crash after the entry is written, we may not have removed the
1306     // input files, but the output file is present.
1307     // (The unremoved input files will be removed by this function)
1308     //
1309     // If we scan the directory and the file is not present, it can mean that:
1310     //   - The file was manually removed by the user
1311     //   - The file was removed as consequence of subsequent compaction
1312     // so, we can't do anything with the "compaction output list" because those
1313     // files have already been loaded when opening the region (by virtue of
1314     // being in the store's folder) or they may be missing due to a compaction.
1315 
1316     String familyName = this.getColumnFamilyName();
1317     List<Path> inputPaths = new ArrayList<Path>(compactionInputs.size());
1318     for (String compactionInput : compactionInputs) {
1319       Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
1320       inputPaths.add(inputPath);
1321     }
1322 
1323     //some of the input files might already be deleted
1324     List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1325     for (StoreFile sf : this.getStorefiles()) {
1326       if (inputPaths.contains(sf.getQualifiedPath())) {
1327         inputStoreFiles.add(sf);
1328       }
1329     }
1330 
1331     this.replaceStoreFiles(inputStoreFiles, Collections.EMPTY_LIST);
1332     this.completeCompaction(inputStoreFiles);
1333   }
1334 
1335   /**
1336    * This method tries to compact N recent files for testing.
1337    * Note that because compacting "recent" files only makes sense for some policies,
1338    * e.g. the default one, it assumes default policy is used. It doesn't use policy,
1339    * but instead makes a compaction candidate list by itself.
1340    * @param N Number of files.
1341    */
1342   public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1343     List<StoreFile> filesToCompact;
1344     boolean isMajor;
1345 
1346     this.lock.readLock().lock();
1347     try {
1348       synchronized (filesCompacting) {
1349         filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1350         if (!filesCompacting.isEmpty()) {
1351           // exclude all files older than the newest file we're currently
1352           // compacting. this allows us to preserve contiguity (HBASE-2856)
1353           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1354           int idx = filesToCompact.indexOf(last);
1355           Preconditions.checkArgument(idx != -1);
1356           filesToCompact.subList(0, idx + 1).clear();
1357         }
1358         int count = filesToCompact.size();
1359         if (N > count) {
1360           throw new RuntimeException("Not enough files");
1361         }
1362 
1363         filesToCompact = filesToCompact.subList(count - N, count);
1364         isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1365         filesCompacting.addAll(filesToCompact);
1366         Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1367       }
1368     } finally {
1369       this.lock.readLock().unlock();
1370     }
1371 
1372     try {
1373       // Ready to go. Have list of files to compact.
1374       List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1375           .compactForTesting(filesToCompact, isMajor);
1376       for (Path newFile: newFiles) {
1377         // Move the compaction into place.
1378         StoreFile sf = moveFileIntoPlace(newFile);
1379         if (this.getCoprocessorHost() != null) {
1380           this.getCoprocessorHost().postCompact(this, sf, null);
1381         }
1382         replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1383         completeCompaction(filesToCompact, true);
1384       }
1385     } finally {
1386       synchronized (filesCompacting) {
1387         filesCompacting.removeAll(filesToCompact);
1388       }
1389     }
1390   }
1391 
1392   @Override
1393   public boolean hasReferences() {
1394     return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1395   }
1396 
1397   @Override
1398   public CompactionProgress getCompactionProgress() {
1399     return this.storeEngine.getCompactor().getProgress();
1400   }
1401 
1402   @Override
1403   public boolean isMajorCompaction() throws IOException {
1404     for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1405       // TODO: what are these reader checks all over the place?
1406       if (sf.getReader() == null) {
1407         LOG.debug("StoreFile " + sf + " has null Reader");
1408         return false;
1409       }
1410     }
1411     return storeEngine.getCompactionPolicy().isMajorCompaction(
1412         this.storeEngine.getStoreFileManager().getStorefiles());
1413   }
1414 
1415   @Override
1416   public CompactionContext requestCompaction() throws IOException {
1417     return requestCompaction(Store.NO_PRIORITY, null);
1418   }
1419 
1420   @Override
1421   public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1422       throws IOException {
1423     // don't even select for compaction if writes are disabled
1424     if (!this.areWritesEnabled()) {
1425       return null;
1426     }
1427 
1428     // Before we do compaction, try to get rid of unneeded files to simplify things.
1429     removeUnneededFiles();
1430 
1431     CompactionContext compaction = storeEngine.createCompaction();
1432     CompactionRequest request = null;
1433     this.lock.readLock().lock();
1434     try {
1435       synchronized (filesCompacting) {
1436         // First, see if coprocessor would want to override selection.
1437         if (this.getCoprocessorHost() != null) {
1438           List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1439           boolean override = this.getCoprocessorHost().preCompactSelection(
1440               this, candidatesForCoproc, baseRequest);
1441           if (override) {
1442             // Coprocessor is overriding normal file selection.
1443             compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1444           }
1445         }
1446 
1447         // Normal case - coprocessor is not overriding file selection.
1448         if (!compaction.hasSelection()) {
1449           boolean isUserCompaction = priority == Store.PRIORITY_USER;
1450           boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1451               offPeakCompactionTracker.compareAndSet(false, true);
1452           try {
1453             compaction.select(this.filesCompacting, isUserCompaction,
1454               mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1455           } catch (IOException e) {
1456             if (mayUseOffPeak) {
1457               offPeakCompactionTracker.set(false);
1458             }
1459             throw e;
1460           }
1461           assert compaction.hasSelection();
1462           if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1463             // Compaction policy doesn't want to take advantage of off-peak.
1464             offPeakCompactionTracker.set(false);
1465           }
1466         }
1467         if (this.getCoprocessorHost() != null) {
1468           this.getCoprocessorHost().postCompactSelection(
1469               this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
1470         }
1471 
1472         // Selected files; see if we have a compaction with some custom base request.
1473         if (baseRequest != null) {
1474           // Update the request with what the system thinks the request should be;
1475           // its up to the request if it wants to listen.
1476           compaction.forceSelect(
1477               baseRequest.combineWith(compaction.getRequest()));
1478         }
1479         // Finally, we have the resulting files list. Check if we have any files at all.
1480         request = compaction.getRequest();
1481         final Collection<StoreFile> selectedFiles = request.getFiles();
1482         if (selectedFiles.isEmpty()) {
1483           return null;
1484         }
1485 
1486         addToCompactingFiles(selectedFiles);
1487 
1488         // If we're enqueuing a major, clear the force flag.
1489         this.forceMajor = this.forceMajor && !request.isMajor();
1490 
1491         // Set common request properties.
1492         // Set priority, either override value supplied by caller or from store.
1493         request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1494         request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1495       }
1496     } finally {
1497       this.lock.readLock().unlock();
1498     }
1499 
1500     LOG.debug(getRegionInfo().getEncodedName() + " - "  + getColumnFamilyName()
1501         + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1502         + (request.isAllFiles() ? " (all files)" : ""));
1503     this.region.reportCompactionRequestStart(request.isMajor());
1504     return compaction;
1505   }
1506 
1507   /** Adds the files to compacting files. filesCompacting must be locked. */
1508   private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
1509     if (filesToAdd == null) return;
1510     // Check that we do not try to compact the same StoreFile twice.
1511     if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1512       Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1513     }
1514     filesCompacting.addAll(filesToAdd);
1515     Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1516   }
1517 
1518   private void removeUnneededFiles() throws IOException {
1519     if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
1520     if (getFamily().getMinVersions() > 0) {
1521       LOG.debug("Skipping expired store file removal due to min version being " +
1522           getFamily().getMinVersions());
1523       return;
1524     }
1525     this.lock.readLock().lock();
1526     Collection<StoreFile> delSfs = null;
1527     try {
1528       synchronized (filesCompacting) {
1529         long cfTtl = getStoreFileTtl();
1530         if (cfTtl != Long.MAX_VALUE) {
1531           delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
1532               EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1533           addToCompactingFiles(delSfs);
1534         }
1535       }
1536     } finally {
1537       this.lock.readLock().unlock();
1538     }
1539     if (delSfs == null || delSfs.isEmpty()) return;
1540 
1541     Collection<StoreFile> newFiles = new ArrayList<StoreFile>(); // No new files.
1542     writeCompactionWalRecord(delSfs, newFiles);
1543     replaceStoreFiles(delSfs, newFiles);
1544     completeCompaction(delSfs);
1545     LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
1546         + this + " of " + this.getRegionInfo().getRegionNameAsString()
1547         + "; total size for store is " + StringUtils.humanReadableInt(storeSize));
1548   }
1549 
1550   @Override
1551   public void cancelRequestedCompaction(CompactionContext compaction) {
1552     finishCompactionRequest(compaction.getRequest());
1553   }
1554 
1555   private void finishCompactionRequest(CompactionRequest cr) {
1556     this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1557     if (cr.isOffPeak()) {
1558       offPeakCompactionTracker.set(false);
1559       cr.setOffPeak(false);
1560     }
1561     synchronized (filesCompacting) {
1562       filesCompacting.removeAll(cr.getFiles());
1563     }
1564   }
1565 
1566   /**
1567    * Validates a store file by opening and closing it. In HFileV2 this should
1568    * not be an expensive operation.
1569    *
1570    * @param path the path to the store file
1571    */
1572   private void validateStoreFile(Path path)
1573       throws IOException {
1574     StoreFile storeFile = null;
1575     try {
1576       storeFile = createStoreFileAndReader(path);
1577     } catch (IOException e) {
1578       LOG.error("Failed to open store file : " + path
1579           + ", keeping it in tmp location", e);
1580       throw e;
1581     } finally {
1582       if (storeFile != null) {
1583         storeFile.closeReader(false);
1584       }
1585     }
1586   }
1587 
1588   /**
1589    * <p>It works by processing a compaction that's been written to disk.
1590    *
1591    * <p>It is usually invoked at the end of a compaction, but might also be
1592    * invoked at HStore startup, if the prior execution died midway through.
1593    *
1594    * <p>Moving the compacted TreeMap into place means:
1595    * <pre>
1596    * 1) Unload all replaced StoreFile, close and collect list to delete.
1597    * 2) Compute new store size
1598    * </pre>
1599    *
1600    * @param compactedFiles list of files that were compacted
1601    */
1602   @VisibleForTesting
1603   protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1604     throws IOException {
1605     completeCompaction(compactedFiles, true);
1606   }
1607 
1608 
1609   /**
1610    * <p>It works by processing a compaction that's been written to disk.
1611    *
1612    * <p>It is usually invoked at the end of a compaction, but might also be
1613    * invoked at HStore startup, if the prior execution died midway through.
1614    *
1615    * <p>Moving the compacted TreeMap into place means:
1616    * <pre>
1617    * 1) Unload all replaced StoreFile, close and collect list to delete.
1618    * 2) Compute new store size
1619    * </pre>
1620    *
1621    * @param compactedFiles list of files that were compacted
1622    */
1623   @VisibleForTesting
1624   protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles)
1625       throws IOException {
1626     try {
1627       // Do not delete old store files until we have sent out notification of
1628       // change in case old files are still being accessed by outstanding scanners.
1629       // Don't do this under writeLock; see HBASE-4485 for a possible deadlock
1630       // scenario that could have happened if continue to hold the lock.
1631       notifyChangedReadersObservers();
1632       // At this point the store will use new files for all scanners.
1633 
1634       // let the archive util decide if we should archive or delete the files
1635       LOG.debug("Removing store files after compaction...");
1636       for (StoreFile compactedFile : compactedFiles) {
1637         compactedFile.closeReader(true);
1638       }
1639       if (removeFiles) {
1640         this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
1641       }
1642     } catch (IOException e) {
1643       e = e instanceof RemoteException ?
1644                 ((RemoteException)e).unwrapRemoteException() : e;
1645       LOG.error("Failed removing compacted files in " + this +
1646         ". Files we were trying to remove are " + compactedFiles.toString() +
1647         "; some of them may have been already removed", e);
1648     }
1649 
1650     // 4. Compute new store size
1651     this.storeSize = 0L;
1652     this.totalUncompressedBytes = 0L;
1653     for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1654       StoreFile.Reader r = hsf.getReader();
1655       if (r == null) {
1656         LOG.warn("StoreFile " + hsf + " has a null Reader");
1657         continue;
1658       }
1659       this.storeSize += r.length();
1660       this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1661     }
1662   }
1663 
1664   /*
1665    * @param wantedVersions How many versions were asked for.
1666    * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1667    */
1668   int versionsToReturn(final int wantedVersions) {
1669     if (wantedVersions <= 0) {
1670       throw new IllegalArgumentException("Number of versions must be > 0");
1671     }
1672     // Make sure we do not return more than maximum versions for this store.
1673     int maxVersions = this.family.getMaxVersions();
1674     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1675   }
1676 
1677   static boolean isExpired(final Cell key, final long oldestTimestamp) {
1678     return key.getTimestamp() < oldestTimestamp;
1679   }
1680 
1681   @Override
1682   public Cell getRowKeyAtOrBefore(final byte[] row) throws IOException {
1683     // If minVersions is set, we will not ignore expired KVs.
1684     // As we're only looking for the latest matches, that should be OK.
1685     // With minVersions > 0 we guarantee that any KV that has any version
1686     // at all (expired or not) has at least one version that will not expire.
1687     // Note that this method used to take a KeyValue as arguments. KeyValue
1688     // can be back-dated, a row key cannot.
1689     long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.scanInfo.getTtl();
1690 
1691     KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1692 
1693     GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1694       this.comparator, kv, ttlToUse, this.getRegionInfo().isMetaRegion());
1695     this.lock.readLock().lock();
1696     try {
1697       // First go to the memstore.  Pick up deletes and candidates.
1698       this.memstore.getRowKeyAtOrBefore(state);
1699       // Check if match, if we got a candidate on the asked for 'kv' row.
1700       // Process each relevant store file. Run through from newest to oldest.
1701       Iterator<StoreFile> sfIterator = this.storeEngine.getStoreFileManager()
1702           .getCandidateFilesForRowKeyBefore(state.getTargetKey());
1703       while (sfIterator.hasNext()) {
1704         StoreFile sf = sfIterator.next();
1705         sfIterator.remove(); // Remove sf from iterator.
1706         boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
1707         Cell candidate = state.getCandidate();
1708         // we have an optimization here which stops the search if we find exact match.
1709         if (candidate != null && CellUtil.matchingRow(candidate, row)) {
1710           return candidate;
1711         }
1712         if (haveNewCandidate) {
1713           sfIterator = this.storeEngine.getStoreFileManager().updateCandidateFilesForRowKeyBefore(
1714               sfIterator, state.getTargetKey(), candidate);
1715         }
1716       }
1717       return state.getCandidate();
1718     } finally {
1719       this.lock.readLock().unlock();
1720     }
1721   }
1722 
1723   /*
1724    * Check an individual MapFile for the row at or before a given row.
1725    * @param f
1726    * @param state
1727    * @throws IOException
1728    * @return True iff the candidate has been updated in the state.
1729    */
1730   private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
1731                                           final GetClosestRowBeforeTracker state)
1732       throws IOException {
1733     StoreFile.Reader r = f.getReader();
1734     if (r == null) {
1735       LOG.warn("StoreFile " + f + " has a null Reader");
1736       return false;
1737     }
1738     if (r.getEntries() == 0) {
1739       LOG.warn("StoreFile " + f + " is a empty store file");
1740       return false;
1741     }
1742     // TODO: Cache these keys rather than make each time?
1743     byte [] fk = r.getFirstKey();
1744     if (fk == null) return false;
1745     KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1746     byte [] lk = r.getLastKey();
1747     KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1748     KeyValue firstOnRow = state.getTargetKey();
1749     if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1750       // If last key in file is not of the target table, no candidates in this
1751       // file.  Return.
1752       if (!state.isTargetTable(lastKV)) return false;
1753       // If the row we're looking for is past the end of file, set search key to
1754       // last key. TODO: Cache last and first key rather than make each time.
1755       firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1756     }
1757     // Get a scanner that caches blocks and that uses pread.
1758     HFileScanner scanner = r.getScanner(true, true, false);
1759     // Seek scanner.  If can't seek it, return.
1760     if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1761     // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
1762     // Unlikely that there'll be an instance of actual first row in table.
1763     if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1764     // If here, need to start backing up.
1765     while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1766        firstOnRow.getKeyLength())) {
1767       Cell kv = scanner.getKeyValue();
1768       if (!state.isTargetTable(kv)) break;
1769       if (!state.isBetterCandidate(kv)) break;
1770       // Make new first on row.
1771       firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1772       // Seek scanner.  If can't seek it, break.
1773       if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1774       // If we find something, break;
1775       if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1776     }
1777     return false;
1778   }
1779 
1780   /*
1781    * Seek the file scanner to firstOnRow or first entry in file.
1782    * @param scanner
1783    * @param firstOnRow
1784    * @param firstKV
1785    * @return True if we successfully seeked scanner.
1786    * @throws IOException
1787    */
1788   private boolean seekToScanner(final HFileScanner scanner,
1789                                 final KeyValue firstOnRow,
1790                                 final KeyValue firstKV)
1791       throws IOException {
1792     KeyValue kv = firstOnRow;
1793     // If firstOnRow < firstKV, set to firstKV
1794     if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1795     int result = scanner.seekTo(kv);
1796     return result != -1;
1797   }
1798 
1799   /*
1800    * When we come in here, we are probably at the kv just before we break into
1801    * the row that firstOnRow is on.  Usually need to increment one time to get
1802    * on to the row we are interested in.
1803    * @param scanner
1804    * @param firstOnRow
1805    * @param state
1806    * @return True we found a candidate.
1807    * @throws IOException
1808    */
1809   private boolean walkForwardInSingleRow(final HFileScanner scanner,
1810                                          final KeyValue firstOnRow,
1811                                          final GetClosestRowBeforeTracker state)
1812       throws IOException {
1813     boolean foundCandidate = false;
1814     do {
1815       Cell kv = scanner.getKeyValue();
1816       // If we are not in the row, skip.
1817       if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
1818       // Did we go beyond the target row? If so break.
1819       if (state.isTooFar(kv, firstOnRow)) break;
1820       if (state.isExpired(kv)) {
1821         continue;
1822       }
1823       // If we added something, this row is a contender. break.
1824       if (state.handle(kv)) {
1825         foundCandidate = true;
1826         break;
1827       }
1828     } while(scanner.next());
1829     return foundCandidate;
1830   }
1831 
1832   @Override
1833   public boolean canSplit() {
1834     this.lock.readLock().lock();
1835     try {
1836       // Not split-able if we find a reference store file present in the store.
1837       boolean result = !hasReferences();
1838       if (!result && LOG.isDebugEnabled()) {
1839         LOG.debug("Cannot split region due to reference files being there");
1840       }
1841       return result;
1842     } finally {
1843       this.lock.readLock().unlock();
1844     }
1845   }
1846 
1847   @Override
1848   public byte[] getSplitPoint() {
1849     this.lock.readLock().lock();
1850     try {
1851       // Should already be enforced by the split policy!
1852       assert !this.getRegionInfo().isMetaRegion();
1853       // Not split-able if we find a reference store file present in the store.
1854       if (hasReferences()) {
1855         return null;
1856       }
1857       return this.storeEngine.getStoreFileManager().getSplitPoint();
1858     } catch(IOException e) {
1859       LOG.warn("Failed getting store size for " + this, e);
1860     } finally {
1861       this.lock.readLock().unlock();
1862     }
1863     return null;
1864   }
1865 
1866   @Override
1867   public long getLastCompactSize() {
1868     return this.lastCompactSize;
1869   }
1870 
1871   @Override
1872   public long getSize() {
1873     return storeSize;
1874   }
1875 
1876   @Override
1877   public void triggerMajorCompaction() {
1878     this.forceMajor = true;
1879   }
1880 
1881 
1882   //////////////////////////////////////////////////////////////////////////////
1883   // File administration
1884   //////////////////////////////////////////////////////////////////////////////
1885 
1886   @Override
1887   public KeyValueScanner getScanner(Scan scan,
1888       final NavigableSet<byte []> targetCols, long readPt) throws IOException {
1889     lock.readLock().lock();
1890     try {
1891       KeyValueScanner scanner = null;
1892       if (this.getCoprocessorHost() != null) {
1893         scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
1894       }
1895       if (scanner == null) {
1896         scanner = scan.isReversed() ? new ReversedStoreScanner(this,
1897             getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
1898             getScanInfo(), scan, targetCols, readPt);
1899       }
1900       return scanner;
1901     } finally {
1902       lock.readLock().unlock();
1903     }
1904   }
1905 
1906   @Override
1907   public String toString() {
1908     return this.getColumnFamilyName();
1909   }
1910 
1911   @Override
1912   // TODO: why is there this and also getNumberOfStorefiles?! Remove one.
1913   public int getStorefilesCount() {
1914     return this.storeEngine.getStoreFileManager().getStorefileCount();
1915   }
1916 
1917   @Override
1918   public long getStoreSizeUncompressed() {
1919     return this.totalUncompressedBytes;
1920   }
1921 
1922   @Override
1923   public long getStorefilesSize() {
1924     long size = 0;
1925     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1926       StoreFile.Reader r = s.getReader();
1927       if (r == null) {
1928         LOG.warn("StoreFile " + s + " has a null Reader");
1929         continue;
1930       }
1931       size += r.length();
1932     }
1933     return size;
1934   }
1935 
1936   @Override
1937   public long getStorefilesIndexSize() {
1938     long size = 0;
1939     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1940       StoreFile.Reader r = s.getReader();
1941       if (r == null) {
1942         LOG.warn("StoreFile " + s + " has a null Reader");
1943         continue;
1944       }
1945       size += r.indexSize();
1946     }
1947     return size;
1948   }
1949 
1950   @Override
1951   public long getTotalStaticIndexSize() {
1952     long size = 0;
1953     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1954       size += s.getReader().getUncompressedDataIndexSize();
1955     }
1956     return size;
1957   }
1958 
1959   @Override
1960   public long getTotalStaticBloomSize() {
1961     long size = 0;
1962     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1963       StoreFile.Reader r = s.getReader();
1964       size += r.getTotalBloomSize();
1965     }
1966     return size;
1967   }
1968 
1969   @Override
1970   public long getMemStoreSize() {
1971     return this.memstore.size();
1972   }
1973 
1974   @Override
1975   public int getCompactPriority() {
1976     int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
1977     if (priority == PRIORITY_USER) {
1978       LOG.warn("Compaction priority is USER despite there being no user compaction");
1979     }
1980     return priority;
1981   }
1982 
1983   @Override
1984   public boolean throttleCompaction(long compactionSize) {
1985     return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
1986   }
1987 
1988   public HRegion getHRegion() {
1989     return this.region;
1990   }
1991 
1992   @Override
1993   public RegionCoprocessorHost getCoprocessorHost() {
1994     return this.region.getCoprocessorHost();
1995   }
1996 
1997   @Override
1998   public HRegionInfo getRegionInfo() {
1999     return this.fs.getRegionInfo();
2000   }
2001 
2002   @Override
2003   public boolean areWritesEnabled() {
2004     return this.region.areWritesEnabled();
2005   }
2006 
2007   @Override
2008   public long getSmallestReadPoint() {
2009     return this.region.getSmallestReadPoint();
2010   }
2011 
2012   /**
2013    * Used in tests. TODO: Remove
2014    *
2015    * Updates the value for the given row/family/qualifier. This function will always be seen as
2016    * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
2017    * control necessary.
2018    * @param row row to update
2019    * @param f family to update
2020    * @param qualifier qualifier to update
2021    * @param newValue the new value to set into memstore
2022    * @return memstore size delta
2023    * @throws IOException
2024    */
2025   public long updateColumnValue(byte [] row, byte [] f,
2026                                 byte [] qualifier, long newValue)
2027       throws IOException {
2028 
2029     this.lock.readLock().lock();
2030     try {
2031       long now = EnvironmentEdgeManager.currentTime();
2032 
2033       return this.memstore.updateColumnValue(row,
2034           f,
2035           qualifier,
2036           newValue,
2037           now);
2038 
2039     } finally {
2040       this.lock.readLock().unlock();
2041     }
2042   }
2043 
2044   @Override
2045   public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
2046     this.lock.readLock().lock();
2047     try {
2048       return this.memstore.upsert(cells, readpoint);
2049     } finally {
2050       this.lock.readLock().unlock();
2051     }
2052   }
2053 
2054   @Override
2055   public StoreFlushContext createFlushContext(long cacheFlushId) {
2056     return new StoreFlusherImpl(cacheFlushId);
2057   }
2058 
2059   private class StoreFlusherImpl implements StoreFlushContext {
2060 
2061     private long cacheFlushSeqNum;
2062     private MemStoreSnapshot snapshot;
2063     private List<Path> tempFiles;
2064     private List<Path> committedFiles;
2065     private long cacheFlushCount;
2066     private long cacheFlushSize;
2067 
2068     private StoreFlusherImpl(long cacheFlushSeqNum) {
2069       this.cacheFlushSeqNum = cacheFlushSeqNum;
2070     }
2071 
2072     /**
2073      * This is not thread safe. The caller should have a lock on the region or the store.
2074      * If necessary, the lock can be added with the patch provided in HBASE-10087
2075      */
2076     @Override
2077     public void prepare() {
2078       this.snapshot = memstore.snapshot();
2079       this.cacheFlushCount = snapshot.getCellsCount();
2080       this.cacheFlushSize = snapshot.getSize();
2081       committedFiles = new ArrayList<Path>(1);
2082     }
2083 
2084     @Override
2085     public void flushCache(MonitoredTask status) throws IOException {
2086       tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
2087     }
2088 
2089     @Override
2090     public boolean commit(MonitoredTask status) throws IOException {
2091       if (this.tempFiles == null || this.tempFiles.isEmpty()) {
2092         return false;
2093       }
2094       List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
2095       for (Path storeFilePath : tempFiles) {
2096         try {
2097           storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status));
2098         } catch (IOException ex) {
2099           LOG.error("Failed to commit store file " + storeFilePath, ex);
2100           // Try to delete the files we have committed before.
2101           for (StoreFile sf : storeFiles) {
2102             Path pathToDelete = sf.getPath();
2103             try {
2104               sf.deleteReader();
2105             } catch (IOException deleteEx) {
2106               LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
2107               Runtime.getRuntime().halt(1);
2108             }
2109           }
2110           throw new IOException("Failed to commit the flush", ex);
2111         }
2112       }
2113 
2114       for (StoreFile sf : storeFiles) {
2115         if (HStore.this.getCoprocessorHost() != null) {
2116           HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
2117         }
2118         committedFiles.add(sf.getPath());
2119       }
2120 
2121       HStore.this.flushedCellsCount += cacheFlushCount;
2122       HStore.this.flushedCellsSize += cacheFlushSize;
2123 
2124       // Add new file to store files.  Clear snapshot too while we have the Store write lock.
2125       return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
2126     }
2127 
2128     @Override
2129     public List<Path> getCommittedFiles() {
2130       return committedFiles;
2131     }
2132   }
2133 
2134   @Override
2135   public boolean needsCompaction() {
2136     return this.storeEngine.needsCompaction(this.filesCompacting);
2137   }
2138 
2139   @Override
2140   public CacheConfig getCacheConfig() {
2141     return this.cacheConf;
2142   }
2143 
2144   public static final long FIXED_OVERHEAD =
2145       ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
2146               + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
2147 
2148   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2149       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2150       + ClassSize.CONCURRENT_SKIPLISTMAP
2151       + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2152       + ScanInfo.FIXED_OVERHEAD);
2153 
2154   @Override
2155   public long heapSize() {
2156     return DEEP_OVERHEAD + this.memstore.heapSize();
2157   }
2158 
2159   @Override
2160   public KeyValue.KVComparator getComparator() {
2161     return comparator;
2162   }
2163 
2164   @Override
2165   public ScanInfo getScanInfo() {
2166     return scanInfo;
2167   }
2168 
2169   /**
2170    * Set scan info, used by test
2171    * @param scanInfo new scan info to use for test
2172    */
2173   void setScanInfo(ScanInfo scanInfo) {
2174     this.scanInfo = scanInfo;
2175   }
2176 
2177   @Override
2178   public boolean hasTooManyStoreFiles() {
2179     return getStorefilesCount() > this.blockingFileCount;
2180   }
2181 
2182   @Override
2183   public long getFlushedCellsCount() {
2184     return flushedCellsCount;
2185   }
2186 
2187   @Override
2188   public long getFlushedCellsSize() {
2189     return flushedCellsSize;
2190   }
2191 
2192   @Override
2193   public long getCompactedCellsCount() {
2194     return compactedCellsCount;
2195   }
2196 
2197   @Override
2198   public long getCompactedCellsSize() {
2199     return compactedCellsSize;
2200   }
2201 
2202   @Override
2203   public long getMajorCompactedCellsCount() {
2204     return majorCompactedCellsCount;
2205   }
2206 
2207   @Override
2208   public long getMajorCompactedCellsSize() {
2209     return majorCompactedCellsSize;
2210   }
2211 
2212   /**
2213    * Returns the StoreEngine that is backing this concrete implementation of Store.
2214    * @return Returns the {@link StoreEngine} object used internally inside this HStore object.
2215    */
2216   protected StoreEngine<?, ?, ?, ?> getStoreEngine() {
2217     return this.storeEngine;
2218   }
2219 
2220   protected OffPeakHours getOffPeakHours() {
2221     return this.offPeakHours;
2222   }
2223 
2224   /**
2225    * {@inheritDoc}
2226    */
2227   @Override
2228   public void onConfigurationChange(Configuration conf) {
2229     this.conf = new CompoundConfiguration()
2230             .add(conf)
2231             .addBytesMap(family.getValues());
2232     this.storeEngine.compactionPolicy.setConf(conf);
2233     this.offPeakHours = OffPeakHours.getInstance(conf);
2234   }
2235 
2236   /**
2237    * {@inheritDoc}
2238    */
2239   @Override
2240   public void registerChildren(ConfigurationManager manager) {
2241     // No children to register
2242   }
2243 
2244   /**
2245    * {@inheritDoc}
2246    */
2247   @Override
2248   public void deregisterChildren(ConfigurationManager manager) {
2249     // No children to deregister
2250   }
2251 }