View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.base.Preconditions;
23  import com.google.common.collect.ImmutableCollection;
24  import com.google.common.collect.ImmutableList;
25  import com.google.common.collect.Lists;
26  import com.google.common.collect.Sets;
27  
28  import java.io.IOException;
29  import java.io.InterruptedIOException;
30  import java.net.InetSocketAddress;
31  import java.security.PrivilegedExceptionAction;
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.HashSet;
37  import java.util.Iterator;
38  import java.util.List;
39  import java.util.NavigableSet;
40  import java.util.Set;
41  import java.util.concurrent.Callable;
42  import java.util.concurrent.CompletionService;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ExecutionException;
45  import java.util.concurrent.ExecutorCompletionService;
46  import java.util.concurrent.Future;
47  import java.util.concurrent.ThreadPoolExecutor;
48  import java.util.concurrent.atomic.AtomicBoolean;
49  import java.util.concurrent.locks.ReentrantReadWriteLock;
50  
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.hbase.Cell;
57  import org.apache.hadoop.hbase.CellComparator;
58  import org.apache.hadoop.hbase.CellUtil;
59  import org.apache.hadoop.hbase.CompoundConfiguration;
60  import org.apache.hadoop.hbase.HColumnDescriptor;
61  import org.apache.hadoop.hbase.HConstants;
62  import org.apache.hadoop.hbase.HRegionInfo;
63  import org.apache.hadoop.hbase.KeyValue;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.Tag;
66  import org.apache.hadoop.hbase.TagType;
67  import org.apache.hadoop.hbase.TagUtil;
68  import org.apache.hadoop.hbase.classification.InterfaceAudience;
69  import org.apache.hadoop.hbase.client.Scan;
70  import org.apache.hadoop.hbase.conf.ConfigurationManager;
71  import org.apache.hadoop.hbase.io.compress.Compression;
72  import org.apache.hadoop.hbase.io.crypto.Encryption;
73  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
74  import org.apache.hadoop.hbase.io.hfile.HFile;
75  import org.apache.hadoop.hbase.io.hfile.HFileContext;
76  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
77  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
78  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
79  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
80  import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
81  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
82  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
83  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
84  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
85  import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
86  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
87  import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
88  import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
89  import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
90  import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
91  import org.apache.hadoop.hbase.security.EncryptionUtil;
92  import org.apache.hadoop.hbase.security.User;
93  import org.apache.hadoop.hbase.util.Bytes;
94  import org.apache.hadoop.hbase.util.ChecksumType;
95  import org.apache.hadoop.hbase.util.ClassSize;
96  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
97  import org.apache.hadoop.hbase.util.ReflectionUtils;
98  import org.apache.hadoop.util.StringUtils;
99  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
100 
101 /**
102  * A Store holds a column family in a Region.  Its a memstore and a set of zero
103  * or more StoreFiles, which stretch backwards over time.
104  *
105  * <p>There's no reason to consider append-logging at this level; all logging
106  * and locking is handled at the HRegion level.  Store just provides
107  * services to manage sets of StoreFiles.  One of the most important of those
108  * services is compaction services where files are aggregated once they pass
109  * a configurable threshold.
110  *
111  * <p>Locking and transactions are handled at a higher level.  This API should
112  * not be called directly but by an HRegion manager.
113  */
114 @InterfaceAudience.Private
115 public class HStore implements Store {
116   private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
117   public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
118       "hbase.server.compactchecker.interval.multiplier";
119   public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
120   public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
121   public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
122 
123   private static final Log LOG = LogFactory.getLog(HStore.class);
124 
125   protected final MemStore memstore;
126   // This stores directory in the filesystem.
127   protected final HRegion region;
128   private final HColumnDescriptor family;
129   private final HRegionFileSystem fs;
130   protected Configuration conf;
131   protected CacheConfig cacheConf;
132   private long lastCompactSize = 0;
133   volatile boolean forceMajor = false;
134   /* how many bytes to write between status checks */
135   static int closeCheckInterval = 0;
136   private volatile long storeSize = 0L;
137   private volatile long totalUncompressedBytes = 0L;
138 
139   /**
140    * RWLock for store operations.
141    * Locked in shared mode when the list of component stores is looked at:
142    *   - all reads/writes to table data
143    *   - checking for split
144    * Locked in exclusive mode when the list of component stores is modified:
145    *   - closing
146    *   - completing a compaction
147    */
148   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
149   private final boolean verifyBulkLoads;
150 
151   private ScanInfo scanInfo;
152 
153   // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
154   final List<StoreFile> filesCompacting = Lists.newArrayList();
155 
156   // All access must be synchronized.
157   private final Set<ChangedReadersObserver> changedReaderObservers =
158     Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
159 
160   protected final int blocksize;
161   private HFileDataBlockEncoder dataBlockEncoder;
162 
163   /** Checksum configuration */
164   protected ChecksumType checksumType;
165   protected int bytesPerChecksum;
166 
167   // Comparing KeyValues
168   private final CellComparator comparator;
169 
170   final StoreEngine<?, ?, ?, ?> storeEngine;
171 
172   private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
173   private volatile OffPeakHours offPeakHours;
174 
175   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
176   private int flushRetriesNumber;
177   private int pauseTime;
178 
179   private long blockingFileCount;
180   private int compactionCheckMultiplier;
181   protected Encryption.Context cryptoContext = Encryption.Context.NONE;
182 
183   private volatile long flushedCellsCount = 0;
184   private volatile long compactedCellsCount = 0;
185   private volatile long majorCompactedCellsCount = 0;
186   private volatile long flushedCellsSize = 0;
187   private volatile long flushedOutputFileSize = 0;
188   private volatile long compactedCellsSize = 0;
189   private volatile long majorCompactedCellsSize = 0;
190 
191   /**
192    * Constructor
193    * @param region
194    * @param family HColumnDescriptor for this column
195    * @param confParam configuration object
196    * failed.  Can be null.
197    * @throws IOException
198    */
199   protected HStore(final HRegion region, final HColumnDescriptor family,
200       final Configuration confParam) throws IOException {
201 
202     this.fs = region.getRegionFileSystem();
203 
204     // Assemble the store's home directory and Ensure it exists.
205     fs.createStoreDir(family.getNameAsString());
206     this.region = region;
207     this.family = family;
208     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
209     // CompoundConfiguration will look for keys in reverse order of addition, so we'd
210     // add global config first, then table and cf overrides, then cf metadata.
211     this.conf = new CompoundConfiguration()
212       .add(confParam)
213       .addStringMap(region.getTableDesc().getConfiguration())
214       .addStringMap(family.getConfiguration())
215       .addBytesMap(family.getValues());
216     this.blocksize = family.getBlocksize();
217 
218     this.dataBlockEncoder =
219         new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
220 
221     this.comparator = region.getCellCompartor();
222     // used by ScanQueryMatcher
223     long timeToPurgeDeletes =
224         Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
225     LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
226         "ms in store " + this);
227     // Get TTL
228     long ttl = determineTTLFromFamily(family);
229     // Why not just pass a HColumnDescriptor in here altogether?  Even if have
230     // to clone it?
231     scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
232     String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
233     this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
234         Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator });
235     this.offPeakHours = OffPeakHours.getInstance(conf);
236 
237     // Setting up cache configuration for this family
238     createCacheConf(family);
239 
240     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
241 
242     this.blockingFileCount =
243         conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
244     this.compactionCheckMultiplier = conf.getInt(
245         COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
246     if (this.compactionCheckMultiplier <= 0) {
247       LOG.error("Compaction check period multiplier must be positive, setting default: "
248           + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
249       this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
250     }
251 
252     if (HStore.closeCheckInterval == 0) {
253       HStore.closeCheckInterval = conf.getInt(
254           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
255     }
256 
257     this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
258     this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
259 
260     // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
261     this.checksumType = getChecksumType(conf);
262     // initilize bytes per checksum
263     this.bytesPerChecksum = getBytesPerChecksum(conf);
264     flushRetriesNumber = conf.getInt(
265         "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
266     pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
267     if (flushRetriesNumber <= 0) {
268       throw new IllegalArgumentException(
269           "hbase.hstore.flush.retries.number must be > 0, not "
270               + flushRetriesNumber);
271     }
272     cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
273   }
274 
275   /**
276    * Creates the cache config.
277    * @param family The current column family.
278    */
279   protected void createCacheConf(final HColumnDescriptor family) {
280     this.cacheConf = new CacheConfig(conf, family);
281   }
282 
283   /**
284    * Creates the store engine configured for the given Store.
285    * @param store The store. An unfortunate dependency needed due to it
286    *              being passed to coprocessors via the compactor.
287    * @param conf Store configuration.
288    * @param kvComparator KVComparator for storeFileManager.
289    * @return StoreEngine to use.
290    */
291   protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
292       CellComparator kvComparator) throws IOException {
293     return StoreEngine.create(store, conf, comparator);
294   }
295 
296   /**
297    * @param family
298    * @return TTL in seconds of the specified family
299    */
300   public static long determineTTLFromFamily(final HColumnDescriptor family) {
301     // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
302     long ttl = family.getTimeToLive();
303     if (ttl == HConstants.FOREVER) {
304       // Default is unlimited ttl.
305       ttl = Long.MAX_VALUE;
306     } else if (ttl == -1) {
307       ttl = Long.MAX_VALUE;
308     } else {
309       // Second -> ms adjust for user data
310       ttl *= 1000;
311     }
312     return ttl;
313   }
314 
315   @Override
316   public String getColumnFamilyName() {
317     return this.family.getNameAsString();
318   }
319 
320   @Override
321   public TableName getTableName() {
322     return this.getRegionInfo().getTable();
323   }
324 
325   @Override
326   public FileSystem getFileSystem() {
327     return this.fs.getFileSystem();
328   }
329 
330   public HRegionFileSystem getRegionFileSystem() {
331     return this.fs;
332   }
333 
334   /* Implementation of StoreConfigInformation */
335   @Override
336   public long getStoreFileTtl() {
337     // TTL only applies if there's no MIN_VERSIONs setting on the column.
338     return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
339   }
340 
341   @Override
342   public long getMemstoreFlushSize() {
343     // TODO: Why is this in here?  The flushsize of the region rather than the store?  St.Ack
344     return this.region.memstoreFlushSize;
345   }
346 
347   @Override
348   public long getFlushableSize() {
349     return this.memstore.getFlushableSize();
350   }
351 
352   @Override
353   public long getSnapshotSize() {
354     return this.memstore.getSnapshotSize();
355   }
356 
357   @Override
358   public long getCompactionCheckMultiplier() {
359     return this.compactionCheckMultiplier;
360   }
361 
362   @Override
363   public long getBlockingFileCount() {
364     return blockingFileCount;
365   }
366   /* End implementation of StoreConfigInformation */
367 
368   /**
369    * Returns the configured bytesPerChecksum value.
370    * @param conf The configuration
371    * @return The bytesPerChecksum that is set in the configuration
372    */
373   public static int getBytesPerChecksum(Configuration conf) {
374     return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
375                        HFile.DEFAULT_BYTES_PER_CHECKSUM);
376   }
377 
378   /**
379    * Returns the configured checksum algorithm.
380    * @param conf The configuration
381    * @return The checksum algorithm that is set in the configuration
382    */
383   public static ChecksumType getChecksumType(Configuration conf) {
384     String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
385     if (checksumName == null) {
386       return ChecksumType.getDefaultChecksumType();
387     } else {
388       return ChecksumType.nameToType(checksumName);
389     }
390   }
391 
392   /**
393    * @return how many bytes to write between status checks
394    */
395   public static int getCloseCheckInterval() {
396     return closeCheckInterval;
397   }
398 
399   @Override
400   public HColumnDescriptor getFamily() {
401     return this.family;
402   }
403 
404   /**
405    * @return The maximum sequence id in all store files. Used for log replay.
406    */
407   @Override
408   public long getMaxSequenceId() {
409     return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
410   }
411 
412   @Override
413   public long getMaxMemstoreTS() {
414     return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
415   }
416 
417   /**
418    * @param tabledir {@link Path} to where the table is being stored
419    * @param hri {@link HRegionInfo} for the region.
420    * @param family {@link HColumnDescriptor} describing the column family
421    * @return Path to family/Store home directory.
422    */
423   @Deprecated
424   public static Path getStoreHomedir(final Path tabledir,
425       final HRegionInfo hri, final byte[] family) {
426     return getStoreHomedir(tabledir, hri.getEncodedName(), family);
427   }
428 
429   /**
430    * @param tabledir {@link Path} to where the table is being stored
431    * @param encodedName Encoded region name.
432    * @param family {@link HColumnDescriptor} describing the column family
433    * @return Path to family/Store home directory.
434    */
435   @Deprecated
436   public static Path getStoreHomedir(final Path tabledir,
437       final String encodedName, final byte[] family) {
438     return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
439   }
440 
441   @Override
442   public HFileDataBlockEncoder getDataBlockEncoder() {
443     return dataBlockEncoder;
444   }
445 
446   /**
447    * Should be used only in tests.
448    * @param blockEncoder the block delta encoder to use
449    */
450   void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
451     this.dataBlockEncoder = blockEncoder;
452   }
453 
454   /**
455    * Creates an unsorted list of StoreFile loaded in parallel
456    * from the given directory.
457    * @throws IOException
458    */
459   private List<StoreFile> loadStoreFiles() throws IOException {
460     Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
461     return openStoreFiles(files);
462   }
463 
464   private List<StoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
465     if (files == null || files.size() == 0) {
466       return new ArrayList<StoreFile>();
467     }
468     // initialize the thread pool for opening store files in parallel..
469     ThreadPoolExecutor storeFileOpenerThreadPool =
470       this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
471           this.getColumnFamilyName());
472     CompletionService<StoreFile> completionService =
473       new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
474 
475     int totalValidStoreFile = 0;
476     for (final StoreFileInfo storeFileInfo: files) {
477       // open each store file in parallel
478       completionService.submit(new Callable<StoreFile>() {
479         @Override
480         public StoreFile call() throws IOException {
481           StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
482           return storeFile;
483         }
484       });
485       totalValidStoreFile++;
486     }
487 
488     ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
489     IOException ioe = null;
490     try {
491       for (int i = 0; i < totalValidStoreFile; i++) {
492         try {
493           Future<StoreFile> future = completionService.take();
494           StoreFile storeFile = future.get();
495           if (storeFile != null) {
496             long length = storeFile.getReader().length();
497             this.storeSize += length;
498             this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
499             if (LOG.isDebugEnabled()) {
500               LOG.debug("loaded " + storeFile.toStringDetailed());
501             }
502             results.add(storeFile);
503           }
504         } catch (InterruptedException e) {
505           if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
506         } catch (ExecutionException e) {
507           if (ioe == null) ioe = new IOException(e.getCause());
508         }
509       }
510     } finally {
511       storeFileOpenerThreadPool.shutdownNow();
512     }
513     if (ioe != null) {
514       // close StoreFile readers
515       boolean evictOnClose =
516           cacheConf != null? cacheConf.shouldEvictOnClose(): true;
517       for (StoreFile file : results) {
518         try {
519           if (file != null) file.closeReader(evictOnClose);
520         } catch (IOException e) {
521           LOG.warn(e.getMessage());
522         }
523       }
524       throw ioe;
525     }
526 
527     return results;
528   }
529 
530   /**
531    * Checks the underlying store files, and opens the files that  have not
532    * been opened, and removes the store file readers for store files no longer
533    * available. Mainly used by secondary region replicas to keep up to date with
534    * the primary region files.
535    * @throws IOException
536    */
537   @Override
538   public void refreshStoreFiles() throws IOException {
539     Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
540     refreshStoreFilesInternal(newFiles);
541   }
542 
543   @Override
544   public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
545     List<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(newFiles.size());
546     for (String file : newFiles) {
547       storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file));
548     }
549     refreshStoreFilesInternal(storeFiles);
550   }
551 
552   /**
553    * Checks the underlying store files, and opens the files that  have not
554    * been opened, and removes the store file readers for store files no longer
555    * available. Mainly used by secondary region replicas to keep up to date with
556    * the primary region files.
557    * @throws IOException
558    */
559   private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
560     StoreFileManager sfm = storeEngine.getStoreFileManager();
561     Collection<StoreFile> currentFiles = sfm.getStorefiles();
562     if (currentFiles == null) currentFiles = new ArrayList<StoreFile>(0);
563 
564     if (newFiles == null) newFiles = new ArrayList<StoreFileInfo>(0);
565 
566     HashMap<StoreFileInfo, StoreFile> currentFilesSet =
567         new HashMap<StoreFileInfo, StoreFile>(currentFiles.size());
568     for (StoreFile sf : currentFiles) {
569       currentFilesSet.put(sf.getFileInfo(), sf);
570     }
571     HashSet<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
572 
573     Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
574     Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
575 
576     if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
577       return;
578     }
579 
580     LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString()
581       + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
582 
583     Set<StoreFile> toBeRemovedStoreFiles = new HashSet<StoreFile>(toBeRemovedFiles.size());
584     for (StoreFileInfo sfi : toBeRemovedFiles) {
585       toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
586     }
587 
588     // try to open the files
589     List<StoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
590 
591     // propogate the file changes to the underlying store file manager
592     replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception
593 
594     // Advance the memstore read point to be at least the new store files seqIds so that
595     // readers might pick it up. This assumes that the store is not getting any writes (otherwise
596     // in-flight transactions might be made visible)
597     if (!toBeAddedFiles.isEmpty()) {
598       region.getMVCC().advanceTo(this.getMaxSequenceId());
599     }
600 
601     completeCompaction(toBeRemovedStoreFiles);
602   }
603 
604   private StoreFile createStoreFileAndReader(final Path p) throws IOException {
605     StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
606     return createStoreFileAndReader(info);
607   }
608 
609   private StoreFile createStoreFileAndReader(final StoreFileInfo info)
610       throws IOException {
611     info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
612     StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
613       this.family.getBloomFilterType());
614     StoreFileReader r = storeFile.createReader();
615     r.setReplicaStoreFile(isPrimaryReplicaStore());
616     return storeFile;
617   }
618 
619   @Override
620   public long add(final Cell cell) {
621     lock.readLock().lock();
622     try {
623        return this.memstore.add(cell);
624     } finally {
625       lock.readLock().unlock();
626     }
627   }
628 
629   @Override
630   public long timeOfOldestEdit() {
631     return memstore.timeOfOldestEdit();
632   }
633 
634   /**
635    * Adds a value to the memstore
636    *
637    * @param kv
638    * @return memstore size delta
639    */
640   protected long delete(final KeyValue kv) {
641     lock.readLock().lock();
642     try {
643       return this.memstore.delete(kv);
644     } finally {
645       lock.readLock().unlock();
646     }
647   }
648 
649   /**
650    * @return All store files.
651    */
652   @Override
653   public Collection<StoreFile> getStorefiles() {
654     return this.storeEngine.getStoreFileManager().getStorefiles();
655   }
656 
657   @Override
658   public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
659     HFile.Reader reader  = null;
660     try {
661       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
662           + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
663       reader = HFile.createReader(srcPath.getFileSystem(conf),
664           srcPath, cacheConf, conf);
665       reader.loadFileInfo();
666 
667       byte[] firstKey = reader.getFirstRowKey();
668       Preconditions.checkState(firstKey != null, "First key can not be null");
669       Cell lk = reader.getLastKey();
670       Preconditions.checkState(lk != null, "Last key can not be null");
671       byte[] lastKey =  CellUtil.cloneRow(lk);
672 
673       LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
674           " last=" + Bytes.toStringBinary(lastKey));
675       LOG.debug("Region bounds: first=" +
676           Bytes.toStringBinary(getRegionInfo().getStartKey()) +
677           " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
678 
679       if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
680         throw new WrongRegionException(
681             "Bulk load file " + srcPath.toString() + " does not fit inside region "
682             + this.getRegionInfo().getRegionNameAsString());
683       }
684 
685       if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
686           HConstants.DEFAULT_MAX_FILE_SIZE)) {
687         LOG.warn("Trying to bulk load hfile " + srcPath.toString() + " with size: " +
688             reader.length() + " bytes can be problematic as it may lead to oversplitting.");
689       }
690 
691       if (verifyBulkLoads) {
692         long verificationStartTime = EnvironmentEdgeManager.currentTime();
693         LOG.info("Full verification started for bulk load hfile: " + srcPath.toString());
694         Cell prevCell = null;
695         HFileScanner scanner = reader.getScanner(false, false, false);
696         scanner.seekTo();
697         do {
698           Cell cell = scanner.getCell();
699           if (prevCell != null) {
700             if (comparator.compareRows(prevCell, cell) > 0) {
701               throw new InvalidHFileException("Previous row is greater than"
702                   + " current row: path=" + srcPath + " previous="
703                   + CellUtil.getCellKeyAsString(prevCell) + " current="
704                   + CellUtil.getCellKeyAsString(cell));
705             }
706             if (CellComparator.compareFamilies(prevCell, cell) != 0) {
707               throw new InvalidHFileException("Previous key had different"
708                   + " family compared to current key: path=" + srcPath
709                   + " previous="
710                   + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
711                       prevCell.getFamilyLength())
712                   + " current="
713                   + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
714                       cell.getFamilyLength()));
715             }
716           }
717           prevCell = cell;
718         } while (scanner.next());
719       LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString()
720          + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime)
721          + " ms");
722       }
723     } finally {
724       if (reader != null) reader.close();
725     }
726   }
727 
728   @Override
729   public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
730     Path srcPath = new Path(srcPathStr);
731     Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
732 
733     LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
734         + dstPath + " - updating store file list.");
735 
736     StoreFile sf = createStoreFileAndReader(dstPath);
737     bulkLoadHFile(sf);
738 
739     LOG.info("Successfully loaded store file " + srcPath + " into store " + this
740         + " (new location: " + dstPath + ")");
741 
742     return dstPath;
743   }
744 
745   @Override
746   public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
747     StoreFile sf = createStoreFileAndReader(fileInfo);
748     bulkLoadHFile(sf);
749   }
750 
751   private void bulkLoadHFile(StoreFile sf) throws IOException {
752     StoreFileReader r = sf.getReader();
753     this.storeSize += r.length();
754     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
755 
756     // Append the new storefile into the list
757     this.lock.writeLock().lock();
758     try {
759       this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
760     } finally {
761       // We need the lock, as long as we are updating the storeFiles
762       // or changing the memstore. Let us release it before calling
763       // notifyChangeReadersObservers. See HBASE-4485 for a possible
764       // deadlock scenario that could have happened if continue to hold
765       // the lock.
766       this.lock.writeLock().unlock();
767     }
768     LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
769     if (LOG.isTraceEnabled()) {
770       String traceMessage = "BULK LOAD time,size,store size,store files ["
771           + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
772           + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
773       LOG.trace(traceMessage);
774     }
775   }
776 
777   @Override
778   public ImmutableCollection<StoreFile> close() throws IOException {
779     this.lock.writeLock().lock();
780     try {
781       // Clear so metrics doesn't find them.
782       ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
783       Collection<StoreFile> compactedfiles =
784           storeEngine.getStoreFileManager().clearCompactedFiles();
785       // clear the compacted files
786       if (compactedfiles != null && !compactedfiles.isEmpty()) {
787         removeCompactedfiles(compactedfiles);
788       }
789       if (!result.isEmpty()) {
790         // initialize the thread pool for closing store files in parallel.
791         ThreadPoolExecutor storeFileCloserThreadPool = this.region
792             .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
793                 + this.getColumnFamilyName());
794 
795         // close each store file in parallel
796         CompletionService<Void> completionService =
797           new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
798         for (final StoreFile f : result) {
799           completionService.submit(new Callable<Void>() {
800             @Override
801             public Void call() throws IOException {
802               boolean evictOnClose =
803                   cacheConf != null? cacheConf.shouldEvictOnClose(): true;
804               f.closeReader(evictOnClose);
805               return null;
806             }
807           });
808         }
809 
810         IOException ioe = null;
811         try {
812           for (int i = 0; i < result.size(); i++) {
813             try {
814               Future<Void> future = completionService.take();
815               future.get();
816             } catch (InterruptedException e) {
817               if (ioe == null) {
818                 ioe = new InterruptedIOException();
819                 ioe.initCause(e);
820               }
821             } catch (ExecutionException e) {
822               if (ioe == null) ioe = new IOException(e.getCause());
823             }
824           }
825         } finally {
826           storeFileCloserThreadPool.shutdownNow();
827         }
828         if (ioe != null) throw ioe;
829       }
830       LOG.info("Closed " + this);
831       return result;
832     } finally {
833       this.lock.writeLock().unlock();
834     }
835   }
836 
837   /**
838    * Snapshot this stores memstore. Call before running
839    * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController)}
840    *  so it has some work to do.
841    */
842   void snapshot() {
843     this.lock.writeLock().lock();
844     try {
845       this.memstore.snapshot();
846     } finally {
847       this.lock.writeLock().unlock();
848     }
849   }
850 
851   /**
852    * Write out current snapshot. Presumes {@link #snapshot()} has been called previously.
853    * @param logCacheFlushId flush sequence number
854    * @param snapshot
855    * @param status
856    * @param throughputController
857    * @return The path name of the tmp file to which the store was flushed
858    * @throws IOException if exception occurs during process
859    */
860   protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
861       MonitoredTask status, ThroughputController throughputController) throws IOException {
862     // If an exception happens flushing, we let it out without clearing
863     // the memstore snapshot.  The old snapshot will be returned when we say
864     // 'snapshot', the next time flush comes around.
865     // Retry after catching exception when flushing, otherwise server will abort
866     // itself
867     StoreFlusher flusher = storeEngine.getStoreFlusher();
868     IOException lastException = null;
869     for (int i = 0; i < flushRetriesNumber; i++) {
870       try {
871         List<Path> pathNames =
872             flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController);
873         Path lastPathName = null;
874         try {
875           for (Path pathName : pathNames) {
876             lastPathName = pathName;
877             validateStoreFile(pathName);
878           }
879           return pathNames;
880         } catch (Exception e) {
881           LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
882           if (e instanceof IOException) {
883             lastException = (IOException) e;
884           } else {
885             lastException = new IOException(e);
886           }
887         }
888       } catch (IOException e) {
889         LOG.warn("Failed flushing store file, retrying num=" + i, e);
890         lastException = e;
891       }
892       if (lastException != null && i < (flushRetriesNumber - 1)) {
893         try {
894           Thread.sleep(pauseTime);
895         } catch (InterruptedException e) {
896           IOException iie = new InterruptedIOException();
897           iie.initCause(e);
898           throw iie;
899         }
900       }
901     }
902     throw lastException;
903   }
904 
905   /*
906    * @param path The pathname of the tmp file into which the store was flushed
907    * @param logCacheFlushId
908    * @param status
909    * @return StoreFile created.
910    * @throws IOException
911    */
912   private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
913       throws IOException {
914     // Write-out finished successfully, move into the right spot
915     Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
916 
917     status.setStatus("Flushing " + this + ": reopening flushed file");
918     StoreFile sf = createStoreFileAndReader(dstPath);
919 
920     StoreFileReader r = sf.getReader();
921     this.storeSize += r.length();
922     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
923 
924     if (LOG.isInfoEnabled()) {
925       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
926         ", sequenceid=" + logCacheFlushId +
927         ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
928     }
929     return sf;
930   }
931 
932   @Override
933   public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
934                                             boolean isCompaction, boolean includeMVCCReadpoint,
935                                             boolean includesTag)
936       throws IOException {
937     return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
938         includesTag, false);
939   }
940 
941   /*
942    * @param maxKeyCount
943    * @param compression Compression algorithm to use
944    * @param isCompaction whether we are creating a new file in a compaction
945    * @param includesMVCCReadPoint - whether to include MVCC or not
946    * @param includesTag - includesTag or not
947    * @return Writer for a new StoreFile in the tmp dir.
948    */
949   @Override
950   public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
951       boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
952       boolean shouldDropBehind)
953   throws IOException {
954     final CacheConfig writerCacheConf;
955     if (isCompaction) {
956       // Don't cache data on write on compactions.
957       writerCacheConf = new CacheConfig(cacheConf);
958       writerCacheConf.setCacheDataOnWrite(false);
959     } else {
960       writerCacheConf = cacheConf;
961     }
962     InetSocketAddress[] favoredNodes = null;
963     if (region.getRegionServerServices() != null) {
964       favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
965           region.getRegionInfo().getEncodedName());
966     }
967     HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
968       cryptoContext);
969     StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf,
970         this.getFileSystem())
971             .withFilePath(fs.createTempName())
972             .withComparator(comparator)
973             .withBloomType(family.getBloomFilterType())
974             .withMaxKeyCount(maxKeyCount)
975             .withFavoredNodes(favoredNodes)
976             .withFileContext(hFileContext)
977             .withShouldDropCacheBehind(shouldDropBehind)
978             .build();
979     return w;
980   }
981 
982   private HFileContext createFileContext(Compression.Algorithm compression,
983       boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
984     if (compression == null) {
985       compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
986     }
987     HFileContext hFileContext = new HFileContextBuilder()
988                                 .withIncludesMvcc(includeMVCCReadpoint)
989                                 .withIncludesTags(includesTag)
990                                 .withCompression(compression)
991                                 .withCompressTags(family.isCompressTags())
992                                 .withChecksumType(checksumType)
993                                 .withBytesPerCheckSum(bytesPerChecksum)
994                                 .withBlockSize(blocksize)
995                                 .withHBaseCheckSum(true)
996                                 .withDataBlockEncoding(family.getDataBlockEncoding())
997                                 .withEncryptionContext(cryptoContext)
998                                 .withCreateTime(EnvironmentEdgeManager.currentTime())
999                                 .build();
1000     return hFileContext;
1001   }
1002 
1003 
1004   /*
1005    * Change storeFiles adding into place the Reader produced by this new flush.
1006    * @param sfs Store files
1007    * @param snapshotId
1008    * @throws IOException
1009    * @return Whether compaction is required.
1010    */
1011   private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
1012       throws IOException {
1013     this.lock.writeLock().lock();
1014     try {
1015       this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
1016       if (snapshotId > 0) {
1017         this.memstore.clearSnapshot(snapshotId);
1018       }
1019     } finally {
1020       // We need the lock, as long as we are updating the storeFiles
1021       // or changing the memstore. Let us release it before calling
1022       // notifyChangeReadersObservers. See HBASE-4485 for a possible
1023       // deadlock scenario that could have happened if continue to hold
1024       // the lock.
1025       this.lock.writeLock().unlock();
1026     }
1027     // notify to be called here - only in case of flushes
1028     notifyChangedReadersObservers(sfs);
1029     if (LOG.isTraceEnabled()) {
1030       long totalSize = 0;
1031       for (StoreFile sf : sfs) {
1032         totalSize += sf.getReader().length();
1033       }
1034       String traceMessage = "FLUSH time,count,size,store size,store files ["
1035           + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
1036           + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
1037       LOG.trace(traceMessage);
1038     }
1039     return needsCompaction();
1040   }
1041 
1042   /*
1043    * Notify all observers that set of Readers has changed.
1044    * @throws IOException
1045    */
1046   private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException {
1047     for (ChangedReadersObserver o : this.changedReaderObservers) {
1048       o.updateReaders(sfs);
1049     }
1050   }
1051 
1052   /**
1053    * Get all scanners with no filtering based on TTL (that happens further down
1054    * the line).
1055    * @return all scanners for this store
1056    */
1057   @Override
1058   public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
1059       boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1060       byte[] stopRow, long readPt) throws IOException {
1061     Collection<StoreFile> storeFilesToScan;
1062     List<KeyValueScanner> memStoreScanners;
1063     this.lock.readLock().lock();
1064     try {
1065       storeFilesToScan =
1066           this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
1067       memStoreScanners = this.memstore.getScanners(readPt);
1068     } finally {
1069       this.lock.readLock().unlock();
1070     }
1071 
1072     // First the store file scanners
1073 
1074     // TODO this used to get the store files in descending order,
1075     // but now we get them in ascending order, which I think is
1076     // actually more correct, since memstore get put at the end.
1077     List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
1078         cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
1079     List<KeyValueScanner> scanners =
1080       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
1081     scanners.addAll(sfScanners);
1082     // Then the memstore scanners
1083     scanners.addAll(memStoreScanners);
1084     return scanners;
1085   }
1086 
1087   @Override
1088   public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
1089       boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
1090       byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException {
1091     List<KeyValueScanner> memStoreScanners = null;
1092     if (includeMemstoreScanner) {
1093       this.lock.readLock().lock();
1094       try {
1095         memStoreScanners = this.memstore.getScanners(readPt);
1096       } finally {
1097         this.lock.readLock().unlock();
1098       }
1099     }
1100     List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
1101       cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
1102     List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(sfScanners.size() + 1);
1103     scanners.addAll(sfScanners);
1104     // Then the memstore scanners
1105     if (memStoreScanners != null) {
1106       scanners.addAll(memStoreScanners);
1107     }
1108     return scanners;
1109   }
1110 
1111   @Override
1112   public void addChangedReaderObserver(ChangedReadersObserver o) {
1113     this.changedReaderObservers.add(o);
1114   }
1115 
1116   @Override
1117   public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1118     // We don't check if observer present; it may not be (legitimately)
1119     this.changedReaderObservers.remove(o);
1120   }
1121 
1122   //////////////////////////////////////////////////////////////////////////////
1123   // Compaction
1124   //////////////////////////////////////////////////////////////////////////////
1125 
1126   /**
1127    * Compact the StoreFiles.  This method may take some time, so the calling
1128    * thread must be able to block for long periods.
1129    *
1130    * <p>During this time, the Store can work as usual, getting values from
1131    * StoreFiles and writing new StoreFiles from the memstore.
1132    *
1133    * Existing StoreFiles are not destroyed until the new compacted StoreFile is
1134    * completely written-out to disk.
1135    *
1136    * <p>The compactLock prevents multiple simultaneous compactions.
1137    * The structureLock prevents us from interfering with other write operations.
1138    *
1139    * <p>We don't want to hold the structureLock for the whole time, as a compact()
1140    * can be lengthy and we want to allow cache-flushes during this period.
1141    *
1142    * <p> Compaction event should be idempotent, since there is no IO Fencing for
1143    * the region directory in hdfs. A region server might still try to complete the
1144    * compaction after it lost the region. That is why the following events are carefully
1145    * ordered for a compaction:
1146    *  1. Compaction writes new files under region/.tmp directory (compaction output)
1147    *  2. Compaction atomically moves the temporary file under region directory
1148    *  3. Compaction appends a WAL edit containing the compaction input and output files.
1149    *  Forces sync on WAL.
1150    *  4. Compaction deletes the input files from the region directory.
1151    *
1152    * Failure conditions are handled like this:
1153    *  - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes
1154    *  the compaction later, it will only write the new data file to the region directory.
1155    *  Since we already have this data, this will be idempotent but we will have a redundant
1156    *  copy of the data.
1157    *  - If RS fails between 2 and 3, the region will have a redundant copy of the data. The
1158    *  RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL.
1159    *  - If RS fails after 3, the region region server who opens the region will pick up the
1160    *  the compaction marker from the WAL and replay it by removing the compaction input files.
1161    *  Failed RS can also attempt to delete those files, but the operation will be idempotent
1162    *
1163    * See HBASE-2231 for details.
1164    *
1165    * @param compaction compaction details obtained from requestCompaction()
1166    * @throws IOException
1167    * @return Storefile we compacted into or null if we failed or opted out early.
1168    */
1169   @Override
1170   public List<StoreFile> compact(CompactionContext compaction,
1171       ThroughputController throughputController) throws IOException {
1172     return compact(compaction, throughputController, null);
1173   }
1174 
1175   @Override
1176   public List<StoreFile> compact(CompactionContext compaction,
1177     ThroughputController throughputController, User user) throws IOException {
1178     assert compaction != null;
1179     List<StoreFile> sfs = null;
1180     CompactionRequest cr = compaction.getRequest();
1181     try {
1182       // Do all sanity checking in here if we have a valid CompactionRequest
1183       // because we need to clean up after it on the way out in a finally
1184       // block below
1185       long compactionStartTime = EnvironmentEdgeManager.currentTime();
1186       assert compaction.hasSelection();
1187       Collection<StoreFile> filesToCompact = cr.getFiles();
1188       assert !filesToCompact.isEmpty();
1189       synchronized (filesCompacting) {
1190         // sanity check: we're compacting files that this store knows about
1191         // TODO: change this to LOG.error() after more debugging
1192         Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1193       }
1194 
1195       // Ready to go. Have list of files to compact.
1196       LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1197           + this + " of " + this.getRegionInfo().getRegionNameAsString()
1198           + " into tmpdir=" + fs.getTempDir() + ", totalSize="
1199           + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1200 
1201       // Commence the compaction.
1202       List<Path> newFiles = compaction.compact(throughputController, user);
1203 
1204       long outputBytes = 0L;
1205       // TODO: get rid of this!
1206       if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1207         LOG.warn("hbase.hstore.compaction.complete is set to false");
1208         sfs = new ArrayList<StoreFile>(newFiles.size());
1209         final boolean evictOnClose =
1210             cacheConf != null? cacheConf.shouldEvictOnClose(): true;
1211         for (Path newFile : newFiles) {
1212           // Create storefile around what we wrote with a reader on it.
1213           StoreFile sf = createStoreFileAndReader(newFile);
1214           sf.closeReader(evictOnClose);
1215           sfs.add(sf);
1216         }
1217         return sfs;
1218       }
1219       // Do the steps necessary to complete the compaction.
1220       sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
1221       writeCompactionWalRecord(filesToCompact, sfs);
1222       replaceStoreFiles(filesToCompact, sfs);
1223       if (cr.isMajor()) {
1224         majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
1225         majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
1226       } else {
1227         compactedCellsCount += getCompactionProgress().totalCompactingKVs;
1228         compactedCellsSize += getCompactionProgress().totalCompactedSize;
1229       }
1230 
1231       for (StoreFile sf : sfs) {
1232         outputBytes += sf.getReader().length();
1233       }
1234 
1235       // At this point the store will use new files for all new scanners.
1236       completeCompaction(filesToCompact); // update store size.
1237 
1238       long now = EnvironmentEdgeManager.currentTime();
1239       if (region.getRegionServerServices() != null
1240           && region.getRegionServerServices().getMetrics() != null) {
1241         region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
1242           now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
1243           outputBytes);
1244       }
1245 
1246       logCompactionEndMessage(cr, sfs, now, compactionStartTime);
1247       return sfs;
1248     } finally {
1249       finishCompactionRequest(cr);
1250     }
1251   }
1252 
1253   private List<StoreFile> moveCompatedFilesIntoPlace(
1254       final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
1255     List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
1256     for (Path newFile : newFiles) {
1257       assert newFile != null;
1258       final StoreFile sf = moveFileIntoPlace(newFile);
1259       if (this.getCoprocessorHost() != null) {
1260         final Store thisStore = this;
1261         if (user == null) {
1262           getCoprocessorHost().postCompact(thisStore, sf, cr);
1263         } else {
1264           try {
1265             user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1266               @Override
1267               public Void run() throws Exception {
1268                 getCoprocessorHost().postCompact(thisStore, sf, cr);
1269                 return null;
1270               }
1271             });
1272           } catch (InterruptedException ie) {
1273             InterruptedIOException iioe = new InterruptedIOException();
1274             iioe.initCause(ie);
1275             throw iioe;
1276           }
1277         }
1278       }
1279       assert sf != null;
1280       sfs.add(sf);
1281     }
1282     return sfs;
1283   }
1284 
1285   // Package-visible for tests
1286   StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1287     validateStoreFile(newFile);
1288     // Move the file into the right spot
1289     Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1290     return createStoreFileAndReader(destPath);
1291   }
1292 
1293   /**
1294    * Writes the compaction WAL record.
1295    * @param filesCompacted Files compacted (input).
1296    * @param newFiles Files from compaction.
1297    */
1298   private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1299       Collection<StoreFile> newFiles) throws IOException {
1300     if (region.getWAL() == null) return;
1301     List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
1302     for (StoreFile f : filesCompacted) {
1303       inputPaths.add(f.getPath());
1304     }
1305     List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1306     for (StoreFile f : newFiles) {
1307       outputPaths.add(f.getPath());
1308     }
1309     HRegionInfo info = this.region.getRegionInfo();
1310     CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1311         family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1312     // Fix reaching into Region to get the maxWaitForSeqId.
1313     // Does this method belong in Region altogether given it is making so many references up there?
1314     // Could be Region#writeCompactionMarker(compactionDescriptor);
1315     WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
1316         this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
1317   }
1318 
1319   @VisibleForTesting
1320   void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1321       final Collection<StoreFile> result) throws IOException {
1322     this.lock.writeLock().lock();
1323     try {
1324       this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1325       filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock();
1326     } finally {
1327       this.lock.writeLock().unlock();
1328     }
1329   }
1330 
1331   /**
1332    * Log a very elaborate compaction completion message.
1333    * @param cr Request.
1334    * @param sfs Resulting files.
1335    * @param compactionStartTime Start time.
1336    */
1337   private void logCompactionEndMessage(
1338       CompactionRequest cr, List<StoreFile> sfs, long now, long compactionStartTime) {
1339     StringBuilder message = new StringBuilder(
1340       "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
1341       + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
1342       + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into ");
1343     if (sfs.isEmpty()) {
1344       message.append("none, ");
1345     } else {
1346       for (StoreFile sf: sfs) {
1347         message.append(sf.getPath().getName());
1348         message.append("(size=");
1349         message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1350         message.append("), ");
1351       }
1352     }
1353     message.append("total size for store is ")
1354       .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize, "", 1))
1355       .append(". This selection was in queue for ")
1356       .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1357       .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1358       .append(" to execute.");
1359     LOG.info(message.toString());
1360     if (LOG.isTraceEnabled()) {
1361       int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1362       long resultSize = 0;
1363       for (StoreFile sf : sfs) {
1364         resultSize += sf.getReader().length();
1365       }
1366       String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1367         + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1368           + cr.getFiles().size() + "," + sfs.size() + "," +  storeSize + "," + fileCount + "]";
1369       LOG.trace(traceMessage);
1370     }
1371   }
1372 
1373   /**
1374    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
1375    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
1376    * See HBASE-2231.
1377    * @param compaction
1378    */
1379   @Override
1380   public void replayCompactionMarker(CompactionDescriptor compaction,
1381       boolean pickCompactionFiles, boolean removeFiles)
1382       throws IOException {
1383     LOG.debug("Completing compaction from the WAL marker");
1384     List<String> compactionInputs = compaction.getCompactionInputList();
1385     List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1386 
1387     // The Compaction Marker is written after the compaction is completed,
1388     // and the files moved into the region/family folder.
1389     //
1390     // If we crash after the entry is written, we may not have removed the
1391     // input files, but the output file is present.
1392     // (The unremoved input files will be removed by this function)
1393     //
1394     // If we scan the directory and the file is not present, it can mean that:
1395     //   - The file was manually removed by the user
1396     //   - The file was removed as consequence of subsequent compaction
1397     // so, we can't do anything with the "compaction output list" because those
1398     // files have already been loaded when opening the region (by virtue of
1399     // being in the store's folder) or they may be missing due to a compaction.
1400 
1401     String familyName = this.getColumnFamilyName();
1402     List<String> inputFiles = new ArrayList<String>(compactionInputs.size());
1403     for (String compactionInput : compactionInputs) {
1404       Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
1405       inputFiles.add(inputPath.getName());
1406     }
1407 
1408     //some of the input files might already be deleted
1409     List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1410     for (StoreFile sf : this.getStorefiles()) {
1411       if (inputFiles.contains(sf.getPath().getName())) {
1412         inputStoreFiles.add(sf);
1413       }
1414     }
1415 
1416     // check whether we need to pick up the new files
1417     List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
1418 
1419     if (pickCompactionFiles) {
1420       for (StoreFile sf : this.getStorefiles()) {
1421         compactionOutputs.remove(sf.getPath().getName());
1422       }
1423       for (String compactionOutput : compactionOutputs) {
1424         StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
1425         StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
1426         outputStoreFiles.add(storeFile);
1427       }
1428     }
1429 
1430     if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1431       LOG.info("Replaying compaction marker, replacing input files: " +
1432           inputStoreFiles + " with output files : " + outputStoreFiles);
1433       this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1434       this.completeCompaction(inputStoreFiles);
1435     }
1436   }
1437 
1438   /**
1439    * This method tries to compact N recent files for testing.
1440    * Note that because compacting "recent" files only makes sense for some policies,
1441    * e.g. the default one, it assumes default policy is used. It doesn't use policy,
1442    * but instead makes a compaction candidate list by itself.
1443    * @param N Number of files.
1444    */
1445   public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1446     List<StoreFile> filesToCompact;
1447     boolean isMajor;
1448 
1449     this.lock.readLock().lock();
1450     try {
1451       synchronized (filesCompacting) {
1452         filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1453         if (!filesCompacting.isEmpty()) {
1454           // exclude all files older than the newest file we're currently
1455           // compacting. this allows us to preserve contiguity (HBASE-2856)
1456           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1457           int idx = filesToCompact.indexOf(last);
1458           Preconditions.checkArgument(idx != -1);
1459           filesToCompact.subList(0, idx + 1).clear();
1460         }
1461         int count = filesToCompact.size();
1462         if (N > count) {
1463           throw new RuntimeException("Not enough files");
1464         }
1465 
1466         filesToCompact = filesToCompact.subList(count - N, count);
1467         isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1468         filesCompacting.addAll(filesToCompact);
1469         Collections.sort(filesCompacting, storeEngine.getStoreFileManager()
1470             .getStoreFileComparator());
1471       }
1472     } finally {
1473       this.lock.readLock().unlock();
1474     }
1475 
1476     try {
1477       // Ready to go. Have list of files to compact.
1478       List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1479           .compactForTesting(filesToCompact, isMajor);
1480       for (Path newFile: newFiles) {
1481         // Move the compaction into place.
1482         StoreFile sf = moveFileIntoPlace(newFile);
1483         if (this.getCoprocessorHost() != null) {
1484           this.getCoprocessorHost().postCompact(this, sf, null);
1485         }
1486         replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1487         completeCompaction(filesToCompact);
1488       }
1489     } finally {
1490       synchronized (filesCompacting) {
1491         filesCompacting.removeAll(filesToCompact);
1492       }
1493     }
1494   }
1495 
1496   @Override
1497   public boolean hasReferences() {
1498     return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1499   }
1500 
1501   @Override
1502   public CompactionProgress getCompactionProgress() {
1503     return this.storeEngine.getCompactor().getProgress();
1504   }
1505 
1506   @Override
1507   public boolean isMajorCompaction() throws IOException {
1508     for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1509       // TODO: what are these reader checks all over the place?
1510       if (sf.getReader() == null) {
1511         LOG.debug("StoreFile " + sf + " has null Reader");
1512         return false;
1513       }
1514     }
1515     return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction(
1516         this.storeEngine.getStoreFileManager().getStorefiles());
1517   }
1518 
1519   @Override
1520   public CompactionContext requestCompaction() throws IOException {
1521     return requestCompaction(Store.NO_PRIORITY, null);
1522   }
1523 
1524   @Override
1525   public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1526       throws IOException {
1527     return requestCompaction(priority, baseRequest, null);
1528   }
1529   @Override
1530   public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
1531       User user) throws IOException {
1532     // don't even select for compaction if writes are disabled
1533     if (!this.areWritesEnabled()) {
1534       return null;
1535     }
1536 
1537     // Before we do compaction, try to get rid of unneeded files to simplify things.
1538     removeUnneededFiles();
1539 
1540     final CompactionContext compaction = storeEngine.createCompaction();
1541     CompactionRequest request = null;
1542     this.lock.readLock().lock();
1543     try {
1544       synchronized (filesCompacting) {
1545         final Store thisStore = this;
1546         // First, see if coprocessor would want to override selection.
1547         if (this.getCoprocessorHost() != null) {
1548           final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1549           boolean override = false;
1550           if (user == null) {
1551             override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
1552               baseRequest);
1553           } else {
1554             try {
1555               override = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
1556                 @Override
1557                 public Boolean run() throws Exception {
1558                   return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc,
1559                     baseRequest);
1560                 }
1561               });
1562             } catch (InterruptedException ie) {
1563               InterruptedIOException iioe = new InterruptedIOException();
1564               iioe.initCause(ie);
1565               throw iioe;
1566             }
1567           }
1568           if (override) {
1569             // Coprocessor is overriding normal file selection.
1570             compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1571           }
1572         }
1573 
1574         // Normal case - coprocessor is not overriding file selection.
1575         if (!compaction.hasSelection()) {
1576           boolean isUserCompaction = priority == Store.PRIORITY_USER;
1577           boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1578               offPeakCompactionTracker.compareAndSet(false, true);
1579           try {
1580             compaction.select(this.filesCompacting, isUserCompaction,
1581               mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1582           } catch (IOException e) {
1583             if (mayUseOffPeak) {
1584               offPeakCompactionTracker.set(false);
1585             }
1586             throw e;
1587           }
1588           assert compaction.hasSelection();
1589           if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1590             // Compaction policy doesn't want to take advantage of off-peak.
1591             offPeakCompactionTracker.set(false);
1592           }
1593         }
1594         if (this.getCoprocessorHost() != null) {
1595           if (user == null) {
1596             this.getCoprocessorHost().postCompactSelection(
1597               this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
1598           } else {
1599             try {
1600               user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1601                 @Override
1602                 public Void run() throws Exception {
1603                   getCoprocessorHost().postCompactSelection(
1604                     thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);
1605                   return null;
1606                 }
1607               });
1608             } catch (InterruptedException ie) {
1609               InterruptedIOException iioe = new InterruptedIOException();
1610               iioe.initCause(ie);
1611               throw iioe;
1612             }
1613           }
1614         }
1615 
1616         // Selected files; see if we have a compaction with some custom base request.
1617         if (baseRequest != null) {
1618           // Update the request with what the system thinks the request should be;
1619           // its up to the request if it wants to listen.
1620           compaction.forceSelect(
1621               baseRequest.combineWith(compaction.getRequest()));
1622         }
1623         // Finally, we have the resulting files list. Check if we have any files at all.
1624         request = compaction.getRequest();
1625         final Collection<StoreFile> selectedFiles = request.getFiles();
1626         if (selectedFiles.isEmpty()) {
1627           return null;
1628         }
1629 
1630         addToCompactingFiles(selectedFiles);
1631 
1632         // If we're enqueuing a major, clear the force flag.
1633         this.forceMajor = this.forceMajor && !request.isMajor();
1634 
1635         // Set common request properties.
1636         // Set priority, either override value supplied by caller or from store.
1637         request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1638         request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1639       }
1640     } finally {
1641       this.lock.readLock().unlock();
1642     }
1643 
1644     LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
1645         + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1646         + (request.isAllFiles() ? " (all files)" : ""));
1647     this.region.reportCompactionRequestStart(request.isMajor());
1648     return compaction;
1649   }
1650 
1651   /** Adds the files to compacting files. filesCompacting must be locked. */
1652   private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
1653     if (filesToAdd == null) return;
1654     // Check that we do not try to compact the same StoreFile twice.
1655     if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1656       Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1657     }
1658     filesCompacting.addAll(filesToAdd);
1659     Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
1660   }
1661 
1662   private void removeUnneededFiles() throws IOException {
1663     if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
1664     if (getFamily().getMinVersions() > 0) {
1665       LOG.debug("Skipping expired store file removal due to min version being " +
1666           getFamily().getMinVersions());
1667       return;
1668     }
1669     this.lock.readLock().lock();
1670     Collection<StoreFile> delSfs = null;
1671     try {
1672       synchronized (filesCompacting) {
1673         long cfTtl = getStoreFileTtl();
1674         if (cfTtl != Long.MAX_VALUE) {
1675           delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
1676               EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1677           addToCompactingFiles(delSfs);
1678         }
1679       }
1680     } finally {
1681       this.lock.readLock().unlock();
1682     }
1683     if (delSfs == null || delSfs.isEmpty()) return;
1684 
1685     Collection<StoreFile> newFiles = new ArrayList<StoreFile>(); // No new files.
1686     writeCompactionWalRecord(delSfs, newFiles);
1687     replaceStoreFiles(delSfs, newFiles);
1688     completeCompaction(delSfs);
1689     LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
1690         + this + " of " + this.getRegionInfo().getRegionNameAsString()
1691         + "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1));
1692   }
1693 
1694   @Override
1695   public void cancelRequestedCompaction(CompactionContext compaction) {
1696     finishCompactionRequest(compaction.getRequest());
1697   }
1698 
1699   private void finishCompactionRequest(CompactionRequest cr) {
1700     this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1701     if (cr.isOffPeak()) {
1702       offPeakCompactionTracker.set(false);
1703       cr.setOffPeak(false);
1704     }
1705     synchronized (filesCompacting) {
1706       filesCompacting.removeAll(cr.getFiles());
1707     }
1708   }
1709 
1710   /**
1711    * Validates a store file by opening and closing it. In HFileV2 this should
1712    * not be an expensive operation.
1713    *
1714    * @param path the path to the store file
1715    */
1716   private void validateStoreFile(Path path)
1717       throws IOException {
1718     StoreFile storeFile = null;
1719     try {
1720       storeFile = createStoreFileAndReader(path);
1721     } catch (IOException e) {
1722       LOG.error("Failed to open store file : " + path
1723           + ", keeping it in tmp location", e);
1724       throw e;
1725     } finally {
1726       if (storeFile != null) {
1727         storeFile.closeReader(false);
1728       }
1729     }
1730   }
1731 
1732   /**
1733    * <p>It works by processing a compaction that's been written to disk.
1734    *
1735    * <p>It is usually invoked at the end of a compaction, but might also be
1736    * invoked at HStore startup, if the prior execution died midway through.
1737    *
1738    * <p>Moving the compacted TreeMap into place means:
1739    * <pre>
1740    * 1) Unload all replaced StoreFile, close and collect list to delete.
1741    * 2) Compute new store size
1742    * </pre>
1743    *
1744    * @param compactedFiles list of files that were compacted
1745    */
1746   @VisibleForTesting
1747   protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1748     throws IOException {
1749     LOG.debug("Completing compaction...");
1750     this.storeSize = 0L;
1751     this.totalUncompressedBytes = 0L;
1752     for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1753       StoreFileReader r = hsf.getReader();
1754       if (r == null) {
1755         LOG.warn("StoreFile " + hsf + " has a null Reader");
1756         continue;
1757       }
1758       this.storeSize += r.length();
1759       this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1760     }
1761   }
1762 
1763   /*
1764    * @param wantedVersions How many versions were asked for.
1765    * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1766    */
1767   int versionsToReturn(final int wantedVersions) {
1768     if (wantedVersions <= 0) {
1769       throw new IllegalArgumentException("Number of versions must be > 0");
1770     }
1771     // Make sure we do not return more than maximum versions for this store.
1772     int maxVersions = this.family.getMaxVersions();
1773     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1774   }
1775 
1776   /**
1777    * @param cell
1778    * @param oldestTimestamp
1779    * @return true if the cell is expired
1780    */
1781   static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
1782     // Look for a TTL tag first. Use it instead of the family setting if
1783     // found. If a cell has multiple TTLs, resolve the conflict by using the
1784     // first tag encountered.
1785     Iterator<Tag> i = CellUtil.tagsIterator(cell);
1786     while (i.hasNext()) {
1787       Tag t = i.next();
1788       if (TagType.TTL_TAG_TYPE == t.getType()) {
1789         // Unlike in schema cell TTLs are stored in milliseconds, no need
1790         // to convert
1791         long ts = cell.getTimestamp();
1792         assert t.getValueLength() == Bytes.SIZEOF_LONG;
1793         long ttl = TagUtil.getValueAsLong(t);
1794         if (ts + ttl < now) {
1795           return true;
1796         }
1797         // Per cell TTLs cannot extend lifetime beyond family settings, so
1798         // fall through to check that
1799         break;
1800       }
1801     }
1802     return false;
1803   }
1804 
1805   @Override
1806   public boolean canSplit() {
1807     this.lock.readLock().lock();
1808     try {
1809       // Not split-able if we find a reference store file present in the store.
1810       boolean result = !hasReferences();
1811       if (!result) {
1812           if (LOG.isTraceEnabled()) {
1813             LOG.trace("Not splittable; has references: " + this);
1814           }
1815       }
1816       return result;
1817     } finally {
1818       this.lock.readLock().unlock();
1819     }
1820   }
1821 
1822   @Override
1823   public byte[] getSplitPoint() {
1824     this.lock.readLock().lock();
1825     try {
1826       // Should already be enforced by the split policy!
1827       assert !this.getRegionInfo().isMetaRegion();
1828       // Not split-able if we find a reference store file present in the store.
1829       if (hasReferences()) {
1830         if (LOG.isTraceEnabled()) {
1831           LOG.trace("Not splittable; has references: " + this);
1832         }
1833         return null;
1834       }
1835       return this.storeEngine.getStoreFileManager().getSplitPoint();
1836     } catch(IOException e) {
1837       LOG.warn("Failed getting store size for " + this, e);
1838     } finally {
1839       this.lock.readLock().unlock();
1840     }
1841     return null;
1842   }
1843 
1844   @Override
1845   public long getLastCompactSize() {
1846     return this.lastCompactSize;
1847   }
1848 
1849   @Override
1850   public long getSize() {
1851     return storeSize;
1852   }
1853 
1854   @Override
1855   public void triggerMajorCompaction() {
1856     this.forceMajor = true;
1857   }
1858 
1859 
1860   //////////////////////////////////////////////////////////////////////////////
1861   // File administration
1862   //////////////////////////////////////////////////////////////////////////////
1863 
1864   @Override
1865   public KeyValueScanner getScanner(Scan scan,
1866       final NavigableSet<byte []> targetCols, long readPt) throws IOException {
1867     lock.readLock().lock();
1868     try {
1869       KeyValueScanner scanner = null;
1870       if (this.getCoprocessorHost() != null) {
1871         scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
1872       }
1873       scanner = createScanner(scan, targetCols, readPt, scanner);
1874       return scanner;
1875     } finally {
1876       lock.readLock().unlock();
1877     }
1878   }
1879 
1880   protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
1881       long readPt, KeyValueScanner scanner) throws IOException {
1882     if (scanner == null) {
1883       scanner = scan.isReversed() ? new ReversedStoreScanner(this,
1884           getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
1885           getScanInfo(), scan, targetCols, readPt);
1886     }
1887     return scanner;
1888   }
1889 
1890   @Override
1891   public String toString() {
1892     return this.getColumnFamilyName();
1893   }
1894 
1895   @Override
1896   public int getStorefilesCount() {
1897     return this.storeEngine.getStoreFileManager().getStorefileCount();
1898   }
1899 
1900   @Override
1901   public long getMaxStoreFileAge() {
1902     long earliestTS = Long.MAX_VALUE;
1903     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1904       StoreFileReader r = s.getReader();
1905       if (r == null) {
1906         LOG.warn("StoreFile " + s + " has a null Reader");
1907         continue;
1908       }
1909       if (!s.isHFile()) {
1910         continue;
1911       }
1912       long createdTS = s.getFileInfo().getCreatedTimestamp();
1913       earliestTS = (createdTS < earliestTS) ? createdTS : earliestTS;
1914     }
1915     long now = EnvironmentEdgeManager.currentTime();
1916     return now - earliestTS;
1917   }
1918 
1919   @Override
1920   public long getMinStoreFileAge() {
1921     long latestTS = 0;
1922     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1923       StoreFileReader r = s.getReader();
1924       if (r == null) {
1925         LOG.warn("StoreFile " + s + " has a null Reader");
1926         continue;
1927       }
1928       if (!s.isHFile()) {
1929         continue;
1930       }
1931       long createdTS = s.getFileInfo().getCreatedTimestamp();
1932       latestTS = (createdTS > latestTS) ? createdTS : latestTS;
1933     }
1934     long now = EnvironmentEdgeManager.currentTime();
1935     return now - latestTS;
1936   }
1937 
1938   @Override
1939   public long getAvgStoreFileAge() {
1940     long sum = 0, count = 0;
1941     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1942       StoreFileReader r = s.getReader();
1943       if (r == null) {
1944         LOG.warn("StoreFile " + s + " has a null Reader");
1945         continue;
1946       }
1947       if (!s.isHFile()) {
1948         continue;
1949       }
1950       sum += s.getFileInfo().getCreatedTimestamp();
1951       count++;
1952     }
1953     if (count == 0) {
1954       return 0;
1955     }
1956     long avgTS = sum / count;
1957     long now = EnvironmentEdgeManager.currentTime();
1958     return now - avgTS;
1959   }
1960 
1961   @Override
1962   public long getNumReferenceFiles() {
1963     long numRefFiles = 0;
1964     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1965       if (s.isReference()) {
1966         numRefFiles++;
1967       }
1968     }
1969     return numRefFiles;
1970   }
1971 
1972   @Override
1973   public long getNumHFiles() {
1974     long numHFiles = 0;
1975     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1976       if (s.isHFile()) {
1977         numHFiles++;
1978       }
1979     }
1980     return numHFiles;
1981   }
1982 
1983   @Override
1984   public long getStoreSizeUncompressed() {
1985     return this.totalUncompressedBytes;
1986   }
1987 
1988   @Override
1989   public long getStorefilesSize() {
1990     long size = 0;
1991     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1992       StoreFileReader r = s.getReader();
1993       if (r == null) {
1994         LOG.warn("StoreFile " + s + " has a null Reader");
1995         continue;
1996       }
1997       size += r.length();
1998     }
1999     return size;
2000   }
2001 
2002   @Override
2003   public long getStorefilesIndexSize() {
2004     long size = 0;
2005     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2006       StoreFileReader r = s.getReader();
2007       if (r == null) {
2008         LOG.warn("StoreFile " + s + " has a null Reader");
2009         continue;
2010       }
2011       size += r.indexSize();
2012     }
2013     return size;
2014   }
2015 
2016   @Override
2017   public long getTotalStaticIndexSize() {
2018     long size = 0;
2019     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2020       StoreFileReader r = s.getReader();
2021       if (r == null) {
2022         continue;
2023       }
2024       size += r.getUncompressedDataIndexSize();
2025     }
2026     return size;
2027   }
2028 
2029   @Override
2030   public long getTotalStaticBloomSize() {
2031     long size = 0;
2032     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2033       StoreFileReader r = s.getReader();
2034       if (r == null) {
2035         continue;
2036       }
2037       size += r.getTotalBloomSize();
2038     }
2039     return size;
2040   }
2041 
2042   @Override
2043   public long getMemStoreSize() {
2044     return this.memstore.size();
2045   }
2046 
2047   @Override
2048   public int getCompactPriority() {
2049     int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
2050     if (priority == PRIORITY_USER) {
2051       LOG.warn("Compaction priority is USER despite there being no user compaction");
2052     }
2053     return priority;
2054   }
2055 
2056   @Override
2057   public boolean throttleCompaction(long compactionSize) {
2058     return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
2059   }
2060 
2061   public HRegion getHRegion() {
2062     return this.region;
2063   }
2064 
2065   @Override
2066   public RegionCoprocessorHost getCoprocessorHost() {
2067     return this.region.getCoprocessorHost();
2068   }
2069 
2070   @Override
2071   public HRegionInfo getRegionInfo() {
2072     return this.fs.getRegionInfo();
2073   }
2074 
2075   @Override
2076   public boolean areWritesEnabled() {
2077     return this.region.areWritesEnabled();
2078   }
2079 
2080   @Override
2081   public long getSmallestReadPoint() {
2082     return this.region.getSmallestReadPoint();
2083   }
2084 
2085   /**
2086    * Updates the value for the given row/family/qualifier. This function will always be seen as
2087    * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
2088    * control necessary.
2089    * @param row row to update
2090    * @param f family to update
2091    * @param qualifier qualifier to update
2092    * @param newValue the new value to set into memstore
2093    * @return memstore size delta
2094    * @throws IOException
2095    */
2096   @VisibleForTesting
2097   public long updateColumnValue(byte [] row, byte [] f,
2098                                 byte [] qualifier, long newValue)
2099       throws IOException {
2100 
2101     this.lock.readLock().lock();
2102     try {
2103       long now = EnvironmentEdgeManager.currentTime();
2104 
2105       return this.memstore.updateColumnValue(row,
2106           f,
2107           qualifier,
2108           newValue,
2109           now);
2110 
2111     } finally {
2112       this.lock.readLock().unlock();
2113     }
2114   }
2115 
2116   @Override
2117   public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
2118     this.lock.readLock().lock();
2119     try {
2120       return this.memstore.upsert(cells, readpoint);
2121     } finally {
2122       this.lock.readLock().unlock();
2123     }
2124   }
2125 
2126   @Override
2127   public StoreFlushContext createFlushContext(long cacheFlushId) {
2128     return new StoreFlusherImpl(cacheFlushId);
2129   }
2130 
2131   private final class StoreFlusherImpl implements StoreFlushContext {
2132 
2133     private long cacheFlushSeqNum;
2134     private MemStoreSnapshot snapshot;
2135     private List<Path> tempFiles;
2136     private List<Path> committedFiles;
2137     private long cacheFlushCount;
2138     private long cacheFlushSize;
2139     private long outputFileSize;
2140 
2141     private StoreFlusherImpl(long cacheFlushSeqNum) {
2142       this.cacheFlushSeqNum = cacheFlushSeqNum;
2143     }
2144 
2145     /**
2146      * This is not thread safe. The caller should have a lock on the region or the store.
2147      * If necessary, the lock can be added with the patch provided in HBASE-10087
2148      */
2149     @Override
2150     public void prepare() {
2151       // passing the current sequence number of the wal - to allow bookkeeping in the memstore
2152       this.snapshot = memstore.snapshot(cacheFlushSeqNum);
2153       this.cacheFlushCount = snapshot.getCellsCount();
2154       this.cacheFlushSize = snapshot.getSize();
2155       committedFiles = new ArrayList<Path>(1);
2156     }
2157 
2158     @Override
2159     public void flushCache(MonitoredTask status) throws IOException {
2160       RegionServerServices rsService = region.getRegionServerServices();
2161       ThroughputController throughputController =
2162           rsService == null ? null : rsService.getFlushThroughputController();
2163       tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController);
2164     }
2165 
2166     @Override
2167     public boolean commit(MonitoredTask status) throws IOException {
2168       if (this.tempFiles == null || this.tempFiles.isEmpty()) {
2169         return false;
2170       }
2171       List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
2172       for (Path storeFilePath : tempFiles) {
2173         try {
2174           StoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);
2175           outputFileSize += sf.getReader().length();
2176           storeFiles.add(sf);
2177         } catch (IOException ex) {
2178           LOG.error("Failed to commit store file " + storeFilePath, ex);
2179           // Try to delete the files we have committed before.
2180           for (StoreFile sf : storeFiles) {
2181             Path pathToDelete = sf.getPath();
2182             try {
2183               sf.deleteReader();
2184             } catch (IOException deleteEx) {
2185               LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
2186               Runtime.getRuntime().halt(1);
2187             }
2188           }
2189           throw new IOException("Failed to commit the flush", ex);
2190         }
2191       }
2192 
2193       for (StoreFile sf : storeFiles) {
2194         if (HStore.this.getCoprocessorHost() != null) {
2195           HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
2196         }
2197         committedFiles.add(sf.getPath());
2198       }
2199 
2200       HStore.this.flushedCellsCount += cacheFlushCount;
2201       HStore.this.flushedCellsSize += cacheFlushSize;
2202       HStore.this.flushedOutputFileSize += outputFileSize;
2203 
2204       // Add new file to store files.  Clear snapshot too while we have the Store write lock.
2205       return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
2206     }
2207 
2208     @Override
2209     public long getOutputFileSize() {
2210       return outputFileSize;
2211     }
2212 
2213     @Override
2214     public List<Path> getCommittedFiles() {
2215       return committedFiles;
2216     }
2217 
2218     /**
2219      * Similar to commit, but called in secondary region replicas for replaying the
2220      * flush cache from primary region. Adds the new files to the store, and drops the
2221      * snapshot depending on dropMemstoreSnapshot argument.
2222      * @param fileNames names of the flushed files
2223      * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot
2224      * @throws IOException
2225      */
2226     @Override
2227     public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
2228         throws IOException {
2229       List<StoreFile> storeFiles = new ArrayList<StoreFile>(fileNames.size());
2230       for (String file : fileNames) {
2231         // open the file as a store file (hfile link, etc)
2232         StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
2233         StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
2234         storeFiles.add(storeFile);
2235         HStore.this.storeSize += storeFile.getReader().length();
2236         HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
2237         if (LOG.isInfoEnabled()) {
2238           LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
2239             " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
2240             ", sequenceid=" +  + storeFile.getReader().getSequenceID() +
2241             ", filesize=" + StringUtils.humanReadableInt(storeFile.getReader().length()));
2242         }
2243       }
2244 
2245       long snapshotId = -1; // -1 means do not drop
2246       if (dropMemstoreSnapshot && snapshot != null) {
2247         snapshotId = snapshot.getId();
2248       }
2249       HStore.this.updateStorefiles(storeFiles, snapshotId);
2250     }
2251 
2252     /**
2253      * Abort the snapshot preparation. Drops the snapshot if any.
2254      * @throws IOException
2255      */
2256     @Override
2257     public void abort() throws IOException {
2258       if (snapshot == null) {
2259         return;
2260       }
2261       HStore.this.updateStorefiles(new ArrayList<StoreFile>(0), snapshot.getId());
2262     }
2263   }
2264 
2265   @Override
2266   public boolean needsCompaction() {
2267     return this.storeEngine.needsCompaction(this.filesCompacting);
2268   }
2269 
2270   @Override
2271   public CacheConfig getCacheConfig() {
2272     return this.cacheConf;
2273   }
2274 
2275   public static final long FIXED_OVERHEAD =
2276       ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG)
2277               + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
2278 
2279   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2280       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2281       + ClassSize.CONCURRENT_SKIPLISTMAP
2282       + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2283       + ScanInfo.FIXED_OVERHEAD);
2284 
2285   @Override
2286   public long heapSize() {
2287     return DEEP_OVERHEAD + this.memstore.heapSize();
2288   }
2289 
2290   @Override
2291   public CellComparator getComparator() {
2292     return comparator;
2293   }
2294 
2295   @Override
2296   public ScanInfo getScanInfo() {
2297     return scanInfo;
2298   }
2299 
2300   /**
2301    * Set scan info, used by test
2302    * @param scanInfo new scan info to use for test
2303    */
2304   void setScanInfo(ScanInfo scanInfo) {
2305     this.scanInfo = scanInfo;
2306   }
2307 
2308   @Override
2309   public boolean hasTooManyStoreFiles() {
2310     return getStorefilesCount() > this.blockingFileCount;
2311   }
2312 
2313   @Override
2314   public long getFlushedCellsCount() {
2315     return flushedCellsCount;
2316   }
2317 
2318   @Override
2319   public long getFlushedCellsSize() {
2320     return flushedCellsSize;
2321   }
2322 
2323   @Override
2324   public long getFlushedOutputFileSize() {
2325     return flushedOutputFileSize;
2326   }
2327 
2328   @Override
2329   public long getCompactedCellsCount() {
2330     return compactedCellsCount;
2331   }
2332 
2333   @Override
2334   public long getCompactedCellsSize() {
2335     return compactedCellsSize;
2336   }
2337 
2338   @Override
2339   public long getMajorCompactedCellsCount() {
2340     return majorCompactedCellsCount;
2341   }
2342 
2343   @Override
2344   public long getMajorCompactedCellsSize() {
2345     return majorCompactedCellsSize;
2346   }
2347 
2348   /**
2349    * Returns the StoreEngine that is backing this concrete implementation of Store.
2350    * @return Returns the {@link StoreEngine} object used internally inside this HStore object.
2351    */
2352   @VisibleForTesting
2353   public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2354     return this.storeEngine;
2355   }
2356 
2357   protected OffPeakHours getOffPeakHours() {
2358     return this.offPeakHours;
2359   }
2360 
2361   /**
2362    * {@inheritDoc}
2363    */
2364   @Override
2365   public void onConfigurationChange(Configuration conf) {
2366     this.conf = new CompoundConfiguration()
2367             .add(conf)
2368             .addBytesMap(family.getValues());
2369     this.storeEngine.compactionPolicy.setConf(conf);
2370     this.offPeakHours = OffPeakHours.getInstance(conf);
2371   }
2372 
2373   /**
2374    * {@inheritDoc}
2375    */
2376   @Override
2377   public void registerChildren(ConfigurationManager manager) {
2378     // No children to register
2379   }
2380 
2381   /**
2382    * {@inheritDoc}
2383    */
2384   @Override
2385   public void deregisterChildren(ConfigurationManager manager) {
2386     // No children to deregister
2387   }
2388 
2389   @Override
2390   public double getCompactionPressure() {
2391     return storeEngine.getStoreFileManager().getCompactionPressure();
2392   }
2393 
2394   @Override
2395   public boolean isPrimaryReplicaStore() {
2396 	   return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
2397   }
2398 
2399   @Override
2400   public void closeAndArchiveCompactedFiles() throws IOException {
2401     lock.readLock().lock();
2402     Collection<StoreFile> copyCompactedfiles = null;
2403     try {
2404       Collection<StoreFile> compactedfiles =
2405           this.getStoreEngine().getStoreFileManager().getCompactedfiles();
2406       if (compactedfiles != null && compactedfiles.size() != 0) {
2407         // Do a copy under read lock
2408         copyCompactedfiles = new ArrayList<StoreFile>(compactedfiles);
2409       } else {
2410         if (LOG.isTraceEnabled()) {
2411           LOG.trace("No compacted files to archive");
2412           return;
2413         }
2414       }
2415     } finally {
2416       lock.readLock().unlock();
2417     }
2418     if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
2419       removeCompactedfiles(copyCompactedfiles);
2420     }
2421   }
2422 
2423   /**
2424    * Archives and removes the compacted files
2425    * @param compactedfiles The compacted files in this store that are not active in reads
2426    * @throws IOException
2427    */
2428   private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
2429       throws IOException {
2430     final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
2431     for (final StoreFile file : compactedfiles) {
2432       synchronized (file) {
2433         try {
2434           StoreFileReader r = file.getReader();
2435           if (r == null) {
2436             if (LOG.isDebugEnabled()) {
2437               LOG.debug("The file " + file + " was closed but still not archived.");
2438             }
2439             filesToRemove.add(file);
2440           }
2441           if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
2442             // Even if deleting fails we need not bother as any new scanners won't be
2443             // able to use the compacted file as the status is already compactedAway
2444             if (LOG.isTraceEnabled()) {
2445               LOG.trace("Closing and archiving the file " + file.getPath());
2446             }
2447             r.close(true);
2448             // Just close and return
2449             filesToRemove.add(file);
2450           }
2451         } catch (Exception e) {
2452           LOG.error(
2453             "Exception while trying to close the compacted store file " + file.getPath().getName());
2454         }
2455       }
2456     }
2457     if (this.isPrimaryReplicaStore()) {
2458       // Only the primary region is allowed to move the file to archive.
2459       // The secondary region does not move the files to archive. Any active reads from
2460       // the secondary region will still work because the file as such has active readers on it.
2461       if (!filesToRemove.isEmpty()) {
2462         if (LOG.isDebugEnabled()) {
2463           LOG.debug("Moving the files " + filesToRemove + " to archive");
2464         }
2465         // Only if this is successful it has to be removed
2466         this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToRemove);
2467       }
2468     }
2469     if (!filesToRemove.isEmpty()) {
2470       // Clear the compactedfiles from the store file manager
2471       clearCompactedfiles(filesToRemove);
2472     }
2473   }
2474 
2475   @Override public void finalizeFlush() {
2476     memstore.finalizeFlush();
2477   }
2478 
2479   private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {
2480     if (LOG.isTraceEnabled()) {
2481       LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");
2482     }
2483     try {
2484       lock.writeLock().lock();
2485       this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
2486     } finally {
2487       lock.writeLock().unlock();
2488     }
2489   }
2490 }