1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.lang.management.ManagementFactory;
23  import java.lang.management.RuntimeMXBean;
24  import java.rmi.UnexpectedException;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.NavigableSet;
30  import java.util.SortedSet;
31  import java.util.concurrent.atomic.AtomicLong;
32  import java.util.concurrent.locks.ReentrantReadWriteLock;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.HBaseConfiguration;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.KeyValueUtil;
43  import org.apache.hadoop.hbase.client.Scan;
44  import org.apache.hadoop.hbase.io.HeapSize;
45  import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.ClassSize;
48  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49  
50  /**
51   * The MemStore holds in-memory modifications to the Store.  Modifications
52   * are {@link KeyValue}s.  When asked to flush, current memstore is moved
53   * to snapshot and is cleared.  We continue to serve edits out of new memstore
54   * and backing snapshot until flusher reports in that the flush succeeded. At
55   * this point we let the snapshot go.
56   * TODO: Adjust size of the memstore when we remove items because they have
57   * been deleted.
58   * TODO: With new KVSLS, need to make sure we update HeapSize with difference
59   * in KV size.
60   */
61  @InterfaceAudience.Private
62  public class MemStore implements HeapSize {
63    private static final Log LOG = LogFactory.getLog(MemStore.class);
64  
65    static final String USEMSLAB_KEY =
66      "hbase.hregion.memstore.mslab.enabled";
67    private static final boolean USEMSLAB_DEFAULT = true;
68  
69    private Configuration conf;
70  
71    // MemStore.  Use a KeyValueSkipListSet rather than SkipListSet because of the
72    // better semantics.  The Map will overwrite if passed a key it already had
73    // whereas the Set will not add new KV if key is same though value might be
74    // different.  Value is not important -- just make sure always same
75    // reference passed.
76    volatile KeyValueSkipListSet kvset;
77  
78    // Snapshot of memstore.  Made for flusher.
79    volatile KeyValueSkipListSet snapshot;
80  
81    final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
82  
83    final KeyValue.KVComparator comparator;
84  
85    // Used comparing versions -- same r/c and ts but different type.
86    final KeyValue.KVComparator comparatorIgnoreType;
87  
88    // Used comparing versions -- same r/c and type but different timestamp.
89    final KeyValue.KVComparator comparatorIgnoreTimestamp;
90  
91    // Used to track own heapSize
92    final AtomicLong size;
93  
94    // Used to track when to flush
95    volatile long timeOfOldestEdit = Long.MAX_VALUE;
96  
97    TimeRangeTracker timeRangeTracker;
98    TimeRangeTracker snapshotTimeRangeTracker;
99  
100   MemStoreChunkPool chunkPool;
101   volatile MemStoreLAB allocator;
102   volatile MemStoreLAB snapshotAllocator;
103 
104 
105 
106   /**
107    * Default constructor. Used for tests.
108    */
109   public MemStore() {
110     this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
111   }
112 
113   /**
114    * Constructor.
115    * @param c Comparator
116    */
117   public MemStore(final Configuration conf,
118                   final KeyValue.KVComparator c) {
119     this.conf = conf;
120     this.comparator = c;
121     this.comparatorIgnoreTimestamp =
122       this.comparator.getComparatorIgnoringTimestamps();
123     this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
124     this.kvset = new KeyValueSkipListSet(c);
125     this.snapshot = new KeyValueSkipListSet(c);
126     timeRangeTracker = new TimeRangeTracker();
127     snapshotTimeRangeTracker = new TimeRangeTracker();
128     this.size = new AtomicLong(DEEP_OVERHEAD);
129     if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
130       this.chunkPool = MemStoreChunkPool.getPool(conf);
131       this.allocator = new MemStoreLAB(conf, chunkPool);
132     } else {
133       this.allocator = null;
134       this.chunkPool = null;
135     }
136   }
137 
138   void dump() {
139     for (KeyValue kv: this.kvset) {
140       LOG.info(kv);
141     }
142     for (KeyValue kv: this.snapshot) {
143       LOG.info(kv);
144     }
145   }
146 
147   /**
148    * Creates a snapshot of the current memstore.
149    * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet<KeyValue>)}
150    * To get the snapshot made by this method, use {@link #getSnapshot()}
151    */
152   void snapshot() {
153     this.lock.writeLock().lock();
154     try {
155       // If snapshot currently has entries, then flusher failed or didn't call
156       // cleanup.  Log a warning.
157       if (!this.snapshot.isEmpty()) {
158         LOG.warn("Snapshot called again without clearing previous. " +
159           "Doing nothing. Another ongoing flush or did we fail last attempt?");
160       } else {
161         if (!this.kvset.isEmpty()) {
162           this.snapshot = this.kvset;
163           this.kvset = new KeyValueSkipListSet(this.comparator);
164           this.snapshotTimeRangeTracker = this.timeRangeTracker;
165           this.timeRangeTracker = new TimeRangeTracker();
166           // Reset heap to not include any keys
167           this.size.set(DEEP_OVERHEAD);
168           this.snapshotAllocator = this.allocator;
169           // Reset allocator so we get a fresh buffer for the new memstore
170           if (allocator != null) {
171             this.allocator = new MemStoreLAB(conf, chunkPool);
172           }
173           timeOfOldestEdit = Long.MAX_VALUE;
174         }
175       }
176     } finally {
177       this.lock.writeLock().unlock();
178     }
179   }
180 
181   /**
182    * Return the current snapshot.
183    * Called by flusher to get current snapshot made by a previous
184    * call to {@link #snapshot()}
185    * @return Return snapshot.
186    * @see {@link #snapshot()}
187    * @see {@link #clearSnapshot(SortedSet<KeyValue>)}
188    */
189   KeyValueSkipListSet getSnapshot() {
190     return this.snapshot;
191   }
192 
193   /**
194    * The passed snapshot was successfully persisted; it can be let go.
195    * @param ss The snapshot to clean out.
196    * @throws UnexpectedException
197    * @see {@link #snapshot()}
198    */
199   void clearSnapshot(final SortedSet<KeyValue> ss)
200   throws UnexpectedException {
201     MemStoreLAB tmpAllocator = null;
202     this.lock.writeLock().lock();
203     try {
204       if (this.snapshot != ss) {
205         throw new UnexpectedException("Current snapshot is " +
206           this.snapshot + ", was passed " + ss);
207       }
208       // OK. Passed in snapshot is same as current snapshot.  If not-empty,
209       // create a new snapshot and let the old one go.
210       if (!ss.isEmpty()) {
211         this.snapshot = new KeyValueSkipListSet(this.comparator);
212         this.snapshotTimeRangeTracker = new TimeRangeTracker();
213       }
214       if (this.snapshotAllocator != null) {
215         tmpAllocator = this.snapshotAllocator;
216         this.snapshotAllocator = null;
217       }
218     } finally {
219       this.lock.writeLock().unlock();
220     }
221     if (tmpAllocator != null) {
222       tmpAllocator.close();
223     }
224   }
225 
226   /**
227    * Write an update
228    * @param kv
229    * @return approximate size of the passed key and value.
230    */
231   long add(final KeyValue kv) {
232     this.lock.readLock().lock();
233     try {
234       KeyValue toAdd = maybeCloneWithAllocator(kv);
235       return internalAdd(toAdd);
236     } finally {
237       this.lock.readLock().unlock();
238     }
239   }
240 
241   long timeOfOldestEdit() {
242     return timeOfOldestEdit;
243   }
244 
245   private boolean addToKVSet(KeyValue e) {
246     boolean b = this.kvset.add(e);
247     setOldestEditTimeToNow();
248     return b;
249   }
250 
251   private boolean removeFromKVSet(KeyValue e) {
252     boolean b = this.kvset.remove(e);
253     setOldestEditTimeToNow();
254     return b;
255   }
256 
257   void setOldestEditTimeToNow() {
258     if (timeOfOldestEdit == Long.MAX_VALUE) {
259       timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis();
260     }
261   }
262 
263   /**
264    * Internal version of add() that doesn't clone KVs with the
265    * allocator, and doesn't take the lock.
266    *
267    * Callers should ensure they already have the read lock taken
268    */
269   private long internalAdd(final KeyValue toAdd) {
270     long s = heapSizeChange(toAdd, addToKVSet(toAdd));
271     timeRangeTracker.includeTimestamp(toAdd);
272     this.size.addAndGet(s);
273     return s;
274   }
275 
276   private KeyValue maybeCloneWithAllocator(KeyValue kv) {
277     if (allocator == null) {
278       return kv;
279     }
280 
281     int len = kv.getLength();
282     Allocation alloc = allocator.allocateBytes(len);
283     if (alloc == null) {
284       // The allocation was too large, allocator decided
285       // not to do anything with it.
286       return kv;
287     }
288     assert alloc != null && alloc.getData() != null;
289     System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
290     KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
291     newKv.setMemstoreTS(kv.getMemstoreTS());
292     return newKv;
293   }
294 
295   /**
296    * Remove n key from the memstore. Only kvs that have the same key and the
297    * same memstoreTS are removed.  It is ok to not update timeRangeTracker
298    * in this call. It is possible that we can optimize this method by using
299    * tailMap/iterator, but since this method is called rarely (only for
300    * error recovery), we can leave those optimization for the future.
301    * @param kv
302    */
303   void rollback(final KeyValue kv) {
304     this.lock.readLock().lock();
305     try {
306       // If the key is in the snapshot, delete it. We should not update
307       // this.size, because that tracks the size of only the memstore and
308       // not the snapshot. The flush of this snapshot to disk has not
309       // yet started because Store.flush() waits for all rwcc transactions to
310       // commit before starting the flush to disk.
311       KeyValue found = this.snapshot.get(kv);
312       if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
313         this.snapshot.remove(kv);
314       }
315       // If the key is in the memstore, delete it. Update this.size.
316       found = this.kvset.get(kv);
317       if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
318         removeFromKVSet(kv);
319         long s = heapSizeChange(kv, true);
320         this.size.addAndGet(-s);
321       }
322     } finally {
323       this.lock.readLock().unlock();
324     }
325   }
326 
327   /**
328    * Write a delete
329    * @param delete
330    * @return approximate size of the passed key and value.
331    */
332   long delete(final KeyValue delete) {
333     long s = 0;
334     this.lock.readLock().lock();
335     try {
336       KeyValue toAdd = maybeCloneWithAllocator(delete);
337       s += heapSizeChange(toAdd, addToKVSet(toAdd));
338       timeRangeTracker.includeTimestamp(toAdd);
339     } finally {
340       this.lock.readLock().unlock();
341     }
342     this.size.addAndGet(s);
343     return s;
344   }
345 
346   /**
347    * @param kv Find the row that comes after this one.  If null, we return the
348    * first.
349    * @return Next row or null if none found.
350    */
351   KeyValue getNextRow(final KeyValue kv) {
352     this.lock.readLock().lock();
353     try {
354       return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
355     } finally {
356       this.lock.readLock().unlock();
357     }
358   }
359 
360   /*
361    * @param a
362    * @param b
363    * @return Return lowest of a or b or null if both a and b are null
364    */
365   private KeyValue getLowest(final KeyValue a, final KeyValue b) {
366     if (a == null) {
367       return b;
368     }
369     if (b == null) {
370       return a;
371     }
372     return comparator.compareRows(a, b) <= 0? a: b;
373   }
374 
375   /*
376    * @param key Find row that follows this one.  If null, return first.
377    * @param map Set to look in for a row beyond <code>row</code>.
378    * @return Next row or null if none found.  If one found, will be a new
379    * KeyValue -- can be destroyed by subsequent calls to this method.
380    */
381   private KeyValue getNextRow(final KeyValue key,
382       final NavigableSet<KeyValue> set) {
383     KeyValue result = null;
384     SortedSet<KeyValue> tail = key == null? set: set.tailSet(key);
385     // Iterate until we fall into the next row; i.e. move off current row
386     for (KeyValue kv: tail) {
387       if (comparator.compareRows(kv, key) <= 0)
388         continue;
389       // Note: Not suppressing deletes or expired cells.  Needs to be handled
390       // by higher up functions.
391       result = kv;
392       break;
393     }
394     return result;
395   }
396 
397   /**
398    * @param state column/delete tracking state
399    */
400   void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
401     this.lock.readLock().lock();
402     try {
403       getRowKeyAtOrBefore(kvset, state);
404       getRowKeyAtOrBefore(snapshot, state);
405     } finally {
406       this.lock.readLock().unlock();
407     }
408   }
409 
410   /*
411    * @param set
412    * @param state Accumulates deletes and candidates.
413    */
414   private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
415       final GetClosestRowBeforeTracker state) {
416     if (set.isEmpty()) {
417       return;
418     }
419     if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
420       // Found nothing in row.  Try backing up.
421       getRowKeyBefore(set, state);
422     }
423   }
424 
425   /*
426    * Walk forward in a row from <code>firstOnRow</code>.  Presumption is that
427    * we have been passed the first possible key on a row.  As we walk forward
428    * we accumulate deletes until we hit a candidate on the row at which point
429    * we return.
430    * @param set
431    * @param firstOnRow First possible key on this row.
432    * @param state
433    * @return True if we found a candidate walking this row.
434    */
435   private boolean walkForwardInSingleRow(final SortedSet<KeyValue> set,
436       final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
437     boolean foundCandidate = false;
438     SortedSet<KeyValue> tail = set.tailSet(firstOnRow);
439     if (tail.isEmpty()) return foundCandidate;
440     for (Iterator<KeyValue> i = tail.iterator(); i.hasNext();) {
441       KeyValue kv = i.next();
442       // Did we go beyond the target row? If so break.
443       if (state.isTooFar(kv, firstOnRow)) break;
444       if (state.isExpired(kv)) {
445         i.remove();
446         continue;
447       }
448       // If we added something, this row is a contender. break.
449       if (state.handle(kv)) {
450         foundCandidate = true;
451         break;
452       }
453     }
454     return foundCandidate;
455   }
456 
457   /*
458    * Walk backwards through the passed set a row at a time until we run out of
459    * set or until we get a candidate.
460    * @param set
461    * @param state
462    */
463   private void getRowKeyBefore(NavigableSet<KeyValue> set,
464       final GetClosestRowBeforeTracker state) {
465     KeyValue firstOnRow = state.getTargetKey();
466     for (Member p = memberOfPreviousRow(set, state, firstOnRow);
467         p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
468       // Make sure we don't fall out of our table.
469       if (!state.isTargetTable(p.kv)) break;
470       // Stop looking if we've exited the better candidate range.
471       if (!state.isBetterCandidate(p.kv)) break;
472       // Make into firstOnRow
473       firstOnRow = new KeyValue(p.kv.getRow(), HConstants.LATEST_TIMESTAMP);
474       // If we find something, break;
475       if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
476     }
477   }
478 
479   /**
480    * Only used by tests. TODO: Remove
481    *
482    * Given the specs of a column, update it, first by inserting a new record,
483    * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
484    * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
485    * store will ensure that the insert/delete each are atomic. A scanner/reader will either
486    * get the new value, or the old value and all readers will eventually only see the new
487    * value after the old was removed.
488    *
489    * @param row
490    * @param family
491    * @param qualifier
492    * @param newValue
493    * @param now
494    * @return  Timestamp
495    */
496   long updateColumnValue(byte[] row,
497                                 byte[] family,
498                                 byte[] qualifier,
499                                 long newValue,
500                                 long now) {
501    this.lock.readLock().lock();
502     try {
503       KeyValue firstKv = KeyValue.createFirstOnRow(
504           row, family, qualifier);
505       // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
506       SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
507       if (!snSs.isEmpty()) {
508         KeyValue snKv = snSs.first();
509         // is there a matching KV in the snapshot?
510         if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
511           if (snKv.getTimestamp() == now) {
512             // poop,
513             now += 1;
514           }
515         }
516       }
517 
518       // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
519       // But the timestamp should also be max(now, mostRecentTsInMemstore)
520 
521       // so we cant add the new KV w/o knowing what's there already, but we also
522       // want to take this chance to delete some kvs. So two loops (sad)
523 
524       SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
525       Iterator<KeyValue> it = ss.iterator();
526       while ( it.hasNext() ) {
527         KeyValue kv = it.next();
528 
529         // if this isnt the row we are interested in, then bail:
530         if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) {
531           break; // rows dont match, bail.
532         }
533 
534         // if the qualifier matches and it's a put, just RM it out of the kvset.
535         if (kv.getType() == KeyValue.Type.Put.getCode() &&
536             kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
537           now = kv.getTimestamp();
538         }
539       }
540 
541       // create or update (upsert) a new KeyValue with
542       // 'now' and a 0 memstoreTS == immediately visible
543       List<Cell> cells = new ArrayList<Cell>(1);
544       cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
545       return upsert(cells, 1L);
546     } finally {
547       this.lock.readLock().unlock();
548     }
549   }
550 
551   /**
552    * Update or insert the specified KeyValues.
553    * <p>
554    * For each KeyValue, insert into MemStore.  This will atomically upsert the
555    * value for that row/family/qualifier.  If a KeyValue did already exist,
556    * it will then be removed.
557    * <p>
558    * Currently the memstoreTS is kept at 0 so as each insert happens, it will
559    * be immediately visible.  May want to change this so it is atomic across
560    * all KeyValues.
561    * <p>
562    * This is called under row lock, so Get operations will still see updates
563    * atomically.  Scans will only see each KeyValue update as atomic.
564    *
565    * @param cells
566    * @param readpoint readpoint below which we can safely remove duplicate KVs 
567    * @return change in memstore size
568    */
569   public long upsert(Iterable<? extends Cell> cells, long readpoint) {
570    this.lock.readLock().lock();
571     try {
572       long size = 0;
573       for (Cell cell : cells) {
574         size += upsert(cell, readpoint);
575       }
576       return size;
577     } finally {
578       this.lock.readLock().unlock();
579     }
580   }
581 
582   /**
583    * Inserts the specified KeyValue into MemStore and deletes any existing
584    * versions of the same row/family/qualifier as the specified KeyValue.
585    * <p>
586    * First, the specified KeyValue is inserted into the Memstore.
587    * <p>
588    * If there are any existing KeyValues in this MemStore with the same row,
589    * family, and qualifier, they are removed.
590    * <p>
591    * Callers must hold the read lock.
592    *
593    * @param cell
594    * @return change in size of MemStore
595    */
596   private long upsert(Cell cell, long readpoint) {
597     // Add the KeyValue to the MemStore
598     // Use the internalAdd method here since we (a) already have a lock
599     // and (b) cannot safely use the MSLAB here without potentially
600     // hitting OOME - see TestMemStore.testUpsertMSLAB for a
601     // test that triggers the pathological case if we don't avoid MSLAB
602     // here.
603     KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
604     long addedSize = internalAdd(kv);
605 
606     // Get the KeyValues for the row/family/qualifier regardless of timestamp.
607     // For this case we want to clean up any other puts
608     KeyValue firstKv = KeyValue.createFirstOnRow(
609         kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
610         kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
611         kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
612     SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
613     Iterator<KeyValue> it = ss.iterator();
614     // versions visible to oldest scanner
615     int versionsVisible = 0;
616     while ( it.hasNext() ) {
617       KeyValue cur = it.next();
618 
619       if (kv == cur) {
620         // ignore the one just put in
621         continue;
622       }
623       // check that this is the row and column we are interested in, otherwise bail
624       if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) {
625         // only remove Puts that concurrent scanners cannot possibly see
626         if (cur.getType() == KeyValue.Type.Put.getCode() && cur.getMemstoreTS() <= readpoint) {
627           if (versionsVisible > 1) {
628             // if we get here we have seen at least one version visible to the oldest scanner,
629             // which means we can prove that no scanner will see this version
630 
631             // false means there was a change, so give us the size.
632             long delta = heapSizeChange(cur, true);
633             addedSize -= delta;
634             this.size.addAndGet(-delta);
635             it.remove();
636             setOldestEditTimeToNow();
637           } else {
638             versionsVisible++;
639           }
640         }
641       } else {
642         // past the row or column, done
643         break;
644       }
645     }
646     return addedSize;
647   }
648 
649   /*
650    * Immutable data structure to hold member found in set and the set it was
651    * found in.  Include set because it is carrying context.
652    */
653   private static class Member {
654     final KeyValue kv;
655     final NavigableSet<KeyValue> set;
656     Member(final NavigableSet<KeyValue> s, final KeyValue kv) {
657       this.kv = kv;
658       this.set = s;
659     }
660   }
661 
662   /*
663    * @param set Set to walk back in.  Pass a first in row or we'll return
664    * same row (loop).
665    * @param state Utility and context.
666    * @param firstOnRow First item on the row after the one we want to find a
667    * member in.
668    * @return Null or member of row previous to <code>firstOnRow</code>
669    */
670   private Member memberOfPreviousRow(NavigableSet<KeyValue> set,
671       final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
672     NavigableSet<KeyValue> head = set.headSet(firstOnRow, false);
673     if (head.isEmpty()) return null;
674     for (Iterator<KeyValue> i = head.descendingIterator(); i.hasNext();) {
675       KeyValue found = i.next();
676       if (state.isExpired(found)) {
677         i.remove();
678         continue;
679       }
680       return new Member(head, found);
681     }
682     return null;
683   }
684 
685   /**
686    * @return scanner on memstore and snapshot in this order.
687    */
688   List<KeyValueScanner> getScanners() {
689     this.lock.readLock().lock();
690     try {
691       return Collections.<KeyValueScanner>singletonList(
692           new MemStoreScanner());
693     } finally {
694       this.lock.readLock().unlock();
695     }
696   }
697 
698   /**
699    * Check if this memstore may contain the required keys
700    * @param scan
701    * @return False if the key definitely does not exist in this Memstore
702    */
703   public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
704     return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
705         snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange()))
706         && (Math.max(timeRangeTracker.getMaximumTimestamp(),
707                      snapshotTimeRangeTracker.getMaximumTimestamp()) >=
708             oldestUnexpiredTS);
709   }
710 
711   public TimeRangeTracker getSnapshotTimeRangeTracker() {
712     return this.snapshotTimeRangeTracker;
713   }
714 
715   /*
716    * MemStoreScanner implements the KeyValueScanner.
717    * It lets the caller scan the contents of a memstore -- both current
718    * map and snapshot.
719    * This behaves as if it were a real scanner but does not maintain position.
720    */
721   protected class MemStoreScanner extends NonLazyKeyValueScanner {
722     // Next row information for either kvset or snapshot
723     private KeyValue kvsetNextRow = null;
724     private KeyValue snapshotNextRow = null;
725 
726     // last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
727     private KeyValue kvsetItRow = null;
728     private KeyValue snapshotItRow = null;
729     
730     // iterator based scanning.
731     private Iterator<KeyValue> kvsetIt;
732     private Iterator<KeyValue> snapshotIt;
733 
734     // The kvset and snapshot at the time of creating this scanner
735     private KeyValueSkipListSet kvsetAtCreation;
736     private KeyValueSkipListSet snapshotAtCreation;
737 
738     // the pre-calculated KeyValue to be returned by peek() or next()
739     private KeyValue theNext;
740 
741     // The allocator and snapshot allocator at the time of creating this scanner
742     volatile MemStoreLAB allocatorAtCreation;
743     volatile MemStoreLAB snapshotAllocatorAtCreation;
744 
745     /*
746     Some notes...
747 
748      So memstorescanner is fixed at creation time. this includes pointers/iterators into
749     existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
750     snapshot is moved.  since kvset is null there is no point on reseeking on both,
751       we can save us the trouble. During the snapshot->hfile transition, the memstore
752       scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
753       potentially do something smarter by adjusting the existing memstore scanner.
754 
755       But there is a greater problem here, that being once a scanner has progressed
756       during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
757       if a scan lasts a little while, there is a chance for new entries in kvset to
758       become available but we will never see them.  This needs to be handled at the
759       StoreScanner level with coordination with MemStoreScanner.
760 
761       Currently, this problem is only partly managed: during the small amount of time
762       when the StoreScanner has not yet created a new MemStoreScanner, we will miss
763       the adds to kvset in the MemStoreScanner.
764     */
765 
766     MemStoreScanner() {
767       super();
768 
769       kvsetAtCreation = kvset;
770       snapshotAtCreation = snapshot;
771       if (allocator != null) {
772         this.allocatorAtCreation = allocator;
773         this.allocatorAtCreation.incScannerCount();
774       }
775       if (snapshotAllocator != null) {
776         this.snapshotAllocatorAtCreation = snapshotAllocator;
777         this.snapshotAllocatorAtCreation.incScannerCount();
778       }
779     }
780 
781     private KeyValue getNext(Iterator<KeyValue> it) {
782       long readPoint = MultiVersionConsistencyControl.getThreadReadPoint();
783 
784       KeyValue v = null;
785       try {
786         while (it.hasNext()) {
787           v = it.next();
788           if (v.getMemstoreTS() <= readPoint) {
789             return v;
790           }
791         }
792 
793         return null;
794       } finally {
795         if (v != null) {
796           // in all cases, remember the last KV iterated to
797           if (it == snapshotIt) {
798             snapshotItRow = v;
799           } else {
800             kvsetItRow = v;
801           }
802         }
803       }
804     }
805 
806     /**
807      *  Set the scanner at the seek key.
808      *  Must be called only once: there is no thread safety between the scanner
809      *   and the memStore.
810      * @param key seek value
811      * @return false if the key is null or if there is no data
812      */
813     @Override
814     public synchronized boolean seek(KeyValue key) {
815       if (key == null) {
816         close();
817         return false;
818       }
819 
820       // kvset and snapshot will never be null.
821       // if tailSet can't find anything, SortedSet is empty (not null).
822       kvsetIt = kvsetAtCreation.tailSet(key).iterator();
823       snapshotIt = snapshotAtCreation.tailSet(key).iterator();
824       kvsetItRow = null;
825       snapshotItRow = null;
826 
827       return seekInSubLists(key);
828     }
829 
830 
831     /**
832      * (Re)initialize the iterators after a seek or a reseek.
833      */
834     private synchronized boolean seekInSubLists(KeyValue key){
835       kvsetNextRow = getNext(kvsetIt);
836       snapshotNextRow = getNext(snapshotIt);
837 
838       // Calculate the next value
839       theNext = getLowest(kvsetNextRow, snapshotNextRow);
840 
841       // has data
842       return (theNext != null);
843     }
844 
845 
846     /**
847      * Move forward on the sub-lists set previously by seek.
848      * @param key seek value (should be non-null)
849      * @return true if there is at least one KV to read, false otherwise
850      */
851     @Override
852     public synchronized boolean reseek(KeyValue key) {
853       /*
854       See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
855       This code is executed concurrently with flush and puts, without locks.
856       Two points must be known when working on this code:
857       1) It's not possible to use the 'kvTail' and 'snapshot'
858        variables, as they are modified during a flush.
859       2) The ideal implementation for performance would use the sub skip list
860        implicitly pointed by the iterators 'kvsetIt' and
861        'snapshotIt'. Unfortunately the Java API does not offer a method to
862        get it. So we remember the last keys we iterated to and restore
863        the reseeked set to at least that point.
864        */
865 
866       kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator();
867       snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
868 
869       return seekInSubLists(key);
870     }
871 
872 
873     @Override
874     public synchronized KeyValue peek() {
875       //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
876       return theNext;
877     }
878 
879     @Override
880     public synchronized KeyValue next() {
881       if (theNext == null) {
882           return null;
883       }
884 
885       final KeyValue ret = theNext;
886 
887       // Advance one of the iterators
888       if (theNext == kvsetNextRow) {
889         kvsetNextRow = getNext(kvsetIt);
890       } else {
891         snapshotNextRow = getNext(snapshotIt);
892       }
893 
894       // Calculate the next value
895       theNext = getLowest(kvsetNextRow, snapshotNextRow);
896 
897       //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
898       //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
899       //    getLowest() + " threadpoint=" + readpoint);
900       return ret;
901     }
902 
903     /*
904      * Returns the lower of the two key values, or null if they are both null.
905      * This uses comparator.compare() to compare the KeyValue using the memstore
906      * comparator.
907      */
908     private KeyValue getLowest(KeyValue first, KeyValue second) {
909       if (first == null && second == null) {
910         return null;
911       }
912       if (first != null && second != null) {
913         int compare = comparator.compare(first, second);
914         return (compare <= 0 ? first : second);
915       }
916       return (first != null ? first : second);
917     }
918 
919     /*
920      * Returns the higher of the two key values, or null if they are both null.
921      * This uses comparator.compare() to compare the KeyValue using the memstore
922      * comparator.
923      */
924     private KeyValue getHighest(KeyValue first, KeyValue second) {
925       if (first == null && second == null) {
926         return null;
927       }
928       if (first != null && second != null) {
929         int compare = comparator.compare(first, second);
930         return (compare > 0 ? first : second);
931       }
932       return (first != null ? first : second);
933     }
934 
935     public synchronized void close() {
936       this.kvsetNextRow = null;
937       this.snapshotNextRow = null;
938 
939       this.kvsetIt = null;
940       this.snapshotIt = null;
941       
942       if (allocatorAtCreation != null) {
943         this.allocatorAtCreation.decScannerCount();
944         this.allocatorAtCreation = null;
945       }
946       if (snapshotAllocatorAtCreation != null) {
947         this.snapshotAllocatorAtCreation.decScannerCount();
948         this.snapshotAllocatorAtCreation = null;
949       }
950 
951       this.kvsetItRow = null;
952       this.snapshotItRow = null;
953     }
954 
955     /**
956      * MemStoreScanner returns max value as sequence id because it will
957      * always have the latest data among all files.
958      */
959     @Override
960     public long getSequenceID() {
961       return Long.MAX_VALUE;
962     }
963 
964     @Override
965     public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns,
966         long oldestUnexpiredTS) {
967       return shouldSeek(scan, oldestUnexpiredTS);
968     }
969   }
970 
971   public final static long FIXED_OVERHEAD = ClassSize.align(
972       ClassSize.OBJECT + (13 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
973 
974   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
975       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
976       ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
977       (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
978 
979   /** Used for readability when we don't store memstore timestamp in HFile */
980   public static final boolean NO_PERSISTENT_TS = false;
981 
982   /*
983    * Calculate how the MemStore size has changed.  Includes overhead of the
984    * backing Map.
985    * @param kv
986    * @param notpresent True if the kv was NOT present in the set.
987    * @return Size
988    */
989   static long heapSizeChange(final KeyValue kv, final boolean notpresent) {
990     return notpresent ?
991         ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
992         0;
993   }
994 
995   /**
996    * Get the entire heap usage for this MemStore not including keys in the
997    * snapshot.
998    */
999   @Override
1000   public long heapSize() {
1001     return size.get();
1002   }
1003 
1004   /**
1005    * Get the heap usage of KVs in this MemStore.
1006    */
1007   public long keySize() {
1008     return heapSize() - DEEP_OVERHEAD;
1009   }
1010 
1011   /**
1012    * Code to help figure if our approximation of object heap sizes is close
1013    * enough.  See hbase-900.  Fills memstores then waits so user can heap
1014    * dump and bring up resultant hprof in something like jprofiler which
1015    * allows you get 'deep size' on objects.
1016    * @param args main args
1017    */
1018   public static void main(String [] args) {
1019     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
1020     LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
1021       runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
1022     LOG.info("vmInputArguments=" + runtime.getInputArguments());
1023     MemStore memstore1 = new MemStore();
1024     // TODO: x32 vs x64
1025     long size = 0;
1026     final int count = 10000;
1027     byte [] fam = Bytes.toBytes("col");
1028     byte [] qf = Bytes.toBytes("umn");
1029     byte [] empty = new byte[0];
1030     for (int i = 0; i < count; i++) {
1031       // Give each its own ts
1032       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
1033     }
1034     LOG.info("memstore1 estimated size=" + size);
1035     for (int i = 0; i < count; i++) {
1036       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
1037     }
1038     LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
1039     // Make a variably sized memstore.
1040     MemStore memstore2 = new MemStore();
1041     for (int i = 0; i < count; i++) {
1042       size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
1043         new byte[i]));
1044     }
1045     LOG.info("memstore2 estimated size=" + size);
1046     final int seconds = 30;
1047     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
1048     for (int i = 0; i < seconds; i++) {
1049       // Thread.sleep(1000);
1050     }
1051     LOG.info("Exiting.");
1052   }
1053 }