View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.lang.management.ManagementFactory;
23  import java.lang.management.RuntimeMXBean;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.NavigableSet;
29  import java.util.SortedSet;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.KeyValueUtil;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.util.ByteRange;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.ClassSize;
46  import org.apache.hadoop.hbase.util.CollectionBackedScanner;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  import org.apache.hadoop.hbase.util.Pair;
49  import org.apache.hadoop.hbase.util.ReflectionUtils;
50  import org.apache.htrace.Trace;
51  
52  import com.google.common.annotations.VisibleForTesting;
53  
54  /**
55   * The MemStore holds in-memory modifications to the Store.  Modifications
56   * are {@link Cell}s.  When asked to flush, current memstore is moved
57   * to snapshot and is cleared.  We continue to serve edits out of new memstore
58   * and backing snapshot until flusher reports in that the flush succeeded. At
59   * this point we let the snapshot go.
60   *  <p>
61   * The MemStore functions should not be called in parallel. Callers should hold
62   *  write and read locks. This is done in {@link HStore}.
63   *  </p>
64   *
65   * TODO: Adjust size of the memstore when we remove items because they have
66   * been deleted.
67   * TODO: With new KVSLS, need to make sure we update HeapSize with difference
68   * in KV size.
69   */
70  @InterfaceAudience.Private
71  public class DefaultMemStore implements MemStore {
72    private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
73    static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
74    private static final boolean USEMSLAB_DEFAULT = true;
75    static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
76  
77    private Configuration conf;
78  
79    // MemStore.  Use a CellSkipListSet rather than SkipListSet because of the
80    // better semantics.  The Map will overwrite if passed a key it already had
81    // whereas the Set will not add new Cell if key is same though value might be
82    // different.  Value is not important -- just make sure always same
83    // reference passed.
84    volatile CellSkipListSet cellSet;
85  
86    // Snapshot of memstore.  Made for flusher.
87    volatile CellSkipListSet snapshot;
88  
89    final KeyValue.KVComparator comparator;
90  
91    // Used to track own heapSize
92    final AtomicLong size;
93    private volatile long snapshotSize;
94  
95    // Used to track when to flush
96    volatile long timeOfOldestEdit = Long.MAX_VALUE;
97  
98    TimeRangeTracker timeRangeTracker;
99    TimeRangeTracker snapshotTimeRangeTracker;
100 
101   volatile MemStoreLAB allocator;
102   volatile MemStoreLAB snapshotAllocator;
103   volatile long snapshotId;
104 
105   /**
106    * Default constructor. Used for tests.
107    */
108   public DefaultMemStore() {
109     this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
110   }
111 
112   /**
113    * Constructor.
114    * @param c Comparator
115    */
116   public DefaultMemStore(final Configuration conf,
117                   final KeyValue.KVComparator c) {
118     this.conf = conf;
119     this.comparator = c;
120     this.cellSet = new CellSkipListSet(c);
121     this.snapshot = new CellSkipListSet(c);
122     timeRangeTracker = new TimeRangeTracker();
123     snapshotTimeRangeTracker = new TimeRangeTracker();
124     this.size = new AtomicLong(DEEP_OVERHEAD);
125     this.snapshotSize = 0;
126     if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
127       String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
128       this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
129           new Class[] { Configuration.class }, new Object[] { conf });
130     } else {
131       this.allocator = null;
132     }
133   }
134 
135   void dump() {
136     for (Cell cell: this.cellSet) {
137       LOG.info(cell);
138     }
139     for (Cell cell: this.snapshot) {
140       LOG.info(cell);
141     }
142   }
143 
144   /**
145    * Creates a snapshot of the current memstore.
146    * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
147    */
148   @Override
149   public MemStoreSnapshot snapshot() {
150     // If snapshot currently has entries, then flusher failed or didn't call
151     // cleanup.  Log a warning.
152     if (!this.snapshot.isEmpty()) {
153       LOG.warn("Snapshot called again without clearing previous. " +
154           "Doing nothing. Another ongoing flush or did we fail last attempt?");
155     } else {
156       this.snapshotId = EnvironmentEdgeManager.currentTime();
157       this.snapshotSize = keySize();
158       if (!this.cellSet.isEmpty()) {
159         this.snapshot = this.cellSet;
160         this.cellSet = new CellSkipListSet(this.comparator);
161         this.snapshotTimeRangeTracker = this.timeRangeTracker;
162         this.timeRangeTracker = new TimeRangeTracker();
163         // Reset heap to not include any keys
164         this.size.set(DEEP_OVERHEAD);
165         this.snapshotAllocator = this.allocator;
166         // Reset allocator so we get a fresh buffer for the new memstore
167         if (allocator != null) {
168           String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
169           this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
170               new Class[] { Configuration.class }, new Object[] { conf });
171         }
172         timeOfOldestEdit = Long.MAX_VALUE;
173       }
174     }
175     return new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
176         this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator));
177   }
178 
179   /**
180    * The passed snapshot was successfully persisted; it can be let go.
181    * @param id Id of the snapshot to clean out.
182    * @throws UnexpectedStateException
183    * @see #snapshot()
184    */
185   @Override
186   public void clearSnapshot(long id) throws UnexpectedStateException {
187     MemStoreLAB tmpAllocator = null;
188     if (this.snapshotId == -1) return;  // already cleared
189     if (this.snapshotId != id) {
190       throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
191           + id);
192     }
193     // OK. Passed in snapshot is same as current snapshot. If not-empty,
194     // create a new snapshot and let the old one go.
195     if (!this.snapshot.isEmpty()) {
196       this.snapshot = new CellSkipListSet(this.comparator);
197       this.snapshotTimeRangeTracker = new TimeRangeTracker();
198     }
199     this.snapshotSize = 0;
200     this.snapshotId = -1;
201     if (this.snapshotAllocator != null) {
202       tmpAllocator = this.snapshotAllocator;
203       this.snapshotAllocator = null;
204     }
205     if (tmpAllocator != null) {
206       tmpAllocator.close();
207     }
208   }
209 
210   @Override
211   public long getFlushableSize() {
212     return this.snapshotSize > 0 ? this.snapshotSize : keySize();
213   }
214 
215   @Override
216   public long getSnapshotSize() {
217     return this.snapshotSize;
218   }
219 
220   /**
221    * Write an update
222    * @param cell
223    * @return approximate size of the passed KV & newly added KV which maybe different than the
224    *         passed-in KV
225    */
226   @Override
227   public Pair<Long, Cell> add(Cell cell) {
228     Cell toAdd = maybeCloneWithAllocator(cell);
229     boolean mslabUsed = (toAdd != cell);
230     return new Pair<Long, Cell>(internalAdd(toAdd, mslabUsed), toAdd);
231   }
232 
233   @Override
234   public long timeOfOldestEdit() {
235     return timeOfOldestEdit;
236   }
237 
238   private boolean addToCellSet(Cell e) {
239     boolean b = this.cellSet.add(e);
240     setOldestEditTimeToNow();
241     return b;
242   }
243 
244   private boolean removeFromCellSet(Cell e) {
245     boolean b = this.cellSet.remove(e);
246     setOldestEditTimeToNow();
247     return b;
248   }
249 
250   void setOldestEditTimeToNow() {
251     if (timeOfOldestEdit == Long.MAX_VALUE) {
252       timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
253     }
254   }
255 
256   /**
257    * Internal version of add() that doesn't clone Cells with the
258    * allocator, and doesn't take the lock.
259    *
260    * Callers should ensure they already have the read lock taken
261    * @param toAdd the cell to add
262    * @param mslabUsed whether using MSLAB
263    * @return the heap size change in bytes
264    */
265   private long internalAdd(final Cell toAdd, boolean mslabUsed) {
266     boolean notPresent = addToCellSet(toAdd);
267     long s = heapSizeChange(toAdd, notPresent);
268     // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the
269     // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger
270     // than the counted number)
271     if (!notPresent && mslabUsed) {
272       s += getCellLength(toAdd);
273     }
274     timeRangeTracker.includeTimestamp(toAdd);
275     this.size.addAndGet(s);
276     return s;
277   }
278 
279   /**
280    * Get cell length after serialized in {@link KeyValue}
281    */
282   @VisibleForTesting
283   int getCellLength(Cell cell) {
284     return KeyValueUtil.length(cell);
285   }
286 
287   private Cell maybeCloneWithAllocator(Cell cell) {
288     if (allocator == null) {
289       return cell;
290     }
291 
292     int len = getCellLength(cell);
293     ByteRange alloc = allocator.allocateBytes(len);
294     if (alloc == null) {
295       // The allocation was too large, allocator decided
296       // not to do anything with it.
297       return cell;
298     }
299     assert alloc.getBytes() != null;
300     KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset());
301     KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
302     newKv.setSequenceId(cell.getSequenceId());
303     return newKv;
304   }
305 
306   /**
307    * Remove n key from the memstore. Only cells that have the same key and the
308    * same memstoreTS are removed.  It is ok to not update timeRangeTracker
309    * in this call. It is possible that we can optimize this method by using
310    * tailMap/iterator, but since this method is called rarely (only for
311    * error recovery), we can leave those optimization for the future.
312    * @param cell
313    */
314   @Override
315   public void rollback(Cell cell) {
316     // If the key is in the snapshot, delete it. We should not update
317     // this.size, because that tracks the size of only the memstore and
318     // not the snapshot. The flush of this snapshot to disk has not
319     // yet started because Store.flush() waits for all rwcc transactions to
320     // commit before starting the flush to disk.
321     Cell found = this.snapshot.get(cell);
322     if (found != null && found.getSequenceId() == cell.getSequenceId()) {
323       this.snapshot.remove(cell);
324       long sz = heapSizeChange(cell, true);
325       this.snapshotSize -= sz;
326     }
327     // If the key is in the memstore, delete it. Update this.size.
328     found = this.cellSet.get(cell);
329     if (found != null && found.getSequenceId() == cell.getSequenceId()) {
330       removeFromCellSet(cell);
331       long s = heapSizeChange(cell, true);
332       this.size.addAndGet(-s);
333     }
334   }
335 
336   /**
337    * Write a delete
338    * @param deleteCell
339    * @return approximate size of the passed key and value.
340    */
341   @Override
342   public long delete(Cell deleteCell) {
343     Cell toAdd = maybeCloneWithAllocator(deleteCell);
344     boolean mslabUsed = (toAdd != deleteCell);
345     return internalAdd(toAdd, mslabUsed);
346   }
347 
348   /**
349    * @param cell Find the row that comes after this one.  If null, we return the
350    * first.
351    * @return Next row or null if none found.
352    */
353   Cell getNextRow(final Cell cell) {
354     return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot));
355   }
356 
357   /*
358    * @param a
359    * @param b
360    * @return Return lowest of a or b or null if both a and b are null
361    */
362   private Cell getLowest(final Cell a, final Cell b) {
363     if (a == null) {
364       return b;
365     }
366     if (b == null) {
367       return a;
368     }
369     return comparator.compareRows(a, b) <= 0? a: b;
370   }
371 
372   /*
373    * @param key Find row that follows this one.  If null, return first.
374    * @param map Set to look in for a row beyond <code>row</code>.
375    * @return Next row or null if none found.  If one found, will be a new
376    * KeyValue -- can be destroyed by subsequent calls to this method.
377    */
378   private Cell getNextRow(final Cell key,
379       final NavigableSet<Cell> set) {
380     Cell result = null;
381     SortedSet<Cell> tail = key == null? set: set.tailSet(key);
382     // Iterate until we fall into the next row; i.e. move off current row
383     for (Cell cell: tail) {
384       if (comparator.compareRows(cell, key) <= 0)
385         continue;
386       // Note: Not suppressing deletes or expired cells.  Needs to be handled
387       // by higher up functions.
388       result = cell;
389       break;
390     }
391     return result;
392   }
393 
394   /**
395    * @param state column/delete tracking state
396    */
397   @Override
398   public void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
399     getRowKeyAtOrBefore(cellSet, state);
400     getRowKeyAtOrBefore(snapshot, state);
401   }
402 
403   /*
404    * @param set
405    * @param state Accumulates deletes and candidates.
406    */
407   private void getRowKeyAtOrBefore(final NavigableSet<Cell> set,
408       final GetClosestRowBeforeTracker state) {
409     if (set.isEmpty()) {
410       return;
411     }
412     if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
413       // Found nothing in row.  Try backing up.
414       getRowKeyBefore(set, state);
415     }
416   }
417 
418   /*
419    * Walk forward in a row from <code>firstOnRow</code>.  Presumption is that
420    * we have been passed the first possible key on a row.  As we walk forward
421    * we accumulate deletes until we hit a candidate on the row at which point
422    * we return.
423    * @param set
424    * @param firstOnRow First possible key on this row.
425    * @param state
426    * @return True if we found a candidate walking this row.
427    */
428   private boolean walkForwardInSingleRow(final SortedSet<Cell> set,
429       final Cell firstOnRow, final GetClosestRowBeforeTracker state) {
430     boolean foundCandidate = false;
431     SortedSet<Cell> tail = set.tailSet(firstOnRow);
432     if (tail.isEmpty()) return foundCandidate;
433     for (Iterator<Cell> i = tail.iterator(); i.hasNext();) {
434       Cell kv = i.next();
435       // Did we go beyond the target row? If so break.
436       if (state.isTooFar(kv, firstOnRow)) break;
437       if (state.isExpired(kv)) {
438         i.remove();
439         continue;
440       }
441       // If we added something, this row is a contender. break.
442       if (state.handle(kv)) {
443         foundCandidate = true;
444         break;
445       }
446     }
447     return foundCandidate;
448   }
449 
450   /*
451    * Walk backwards through the passed set a row at a time until we run out of
452    * set or until we get a candidate.
453    * @param set
454    * @param state
455    */
456   private void getRowKeyBefore(NavigableSet<Cell> set,
457       final GetClosestRowBeforeTracker state) {
458     Cell firstOnRow = state.getTargetKey();
459     for (Member p = memberOfPreviousRow(set, state, firstOnRow);
460         p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
461       // Make sure we don't fall out of our table.
462       if (!state.isTargetTable(p.cell)) break;
463       // Stop looking if we've exited the better candidate range.
464       if (!state.isBetterCandidate(p.cell)) break;
465       // Make into firstOnRow
466       firstOnRow = new KeyValue(p.cell.getRowArray(), p.cell.getRowOffset(), p.cell.getRowLength(),
467           HConstants.LATEST_TIMESTAMP);
468       // If we find something, break;
469       if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
470     }
471   }
472 
473   /**
474    * Only used by tests. TODO: Remove
475    *
476    * Given the specs of a column, update it, first by inserting a new record,
477    * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
478    * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
479    * store will ensure that the insert/delete each are atomic. A scanner/reader will either
480    * get the new value, or the old value and all readers will eventually only see the new
481    * value after the old was removed.
482    *
483    * @param row
484    * @param family
485    * @param qualifier
486    * @param newValue
487    * @param now
488    * @return  Timestamp
489    */
490   @Override
491   public long updateColumnValue(byte[] row,
492                                 byte[] family,
493                                 byte[] qualifier,
494                                 long newValue,
495                                 long now) {
496     Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
497     // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
498     SortedSet<Cell> snSs = snapshot.tailSet(firstCell);
499     if (!snSs.isEmpty()) {
500       Cell snc = snSs.first();
501       // is there a matching Cell in the snapshot?
502       if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
503         if (snc.getTimestamp() == now) {
504           // poop,
505           now += 1;
506         }
507       }
508     }
509 
510     // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
511     // But the timestamp should also be max(now, mostRecentTsInMemstore)
512 
513     // so we cant add the new Cell w/o knowing what's there already, but we also
514     // want to take this chance to delete some cells. So two loops (sad)
515 
516     SortedSet<Cell> ss = cellSet.tailSet(firstCell);
517     for (Cell cell : ss) {
518       // if this isnt the row we are interested in, then bail:
519       if (!CellUtil.matchingColumn(cell, family, qualifier)
520           || !CellUtil.matchingRow(cell, firstCell)) {
521         break; // rows dont match, bail.
522       }
523 
524       // if the qualifier matches and it's a put, just RM it out of the cellSet.
525       if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
526           cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
527         now = cell.getTimestamp();
528       }
529     }
530 
531     // create or update (upsert) a new Cell with
532     // 'now' and a 0 memstoreTS == immediately visible
533     List<Cell> cells = new ArrayList<Cell>(1);
534     cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
535     return upsert(cells, 1L);
536   }
537 
538   /**
539    * Update or insert the specified KeyValues.
540    * <p>
541    * For each KeyValue, insert into MemStore.  This will atomically upsert the
542    * value for that row/family/qualifier.  If a KeyValue did already exist,
543    * it will then be removed.
544    * <p>
545    * Currently the memstoreTS is kept at 0 so as each insert happens, it will
546    * be immediately visible.  May want to change this so it is atomic across
547    * all KeyValues.
548    * <p>
549    * This is called under row lock, so Get operations will still see updates
550    * atomically.  Scans will only see each KeyValue update as atomic.
551    *
552    * @param cells
553    * @param readpoint readpoint below which we can safely remove duplicate KVs
554    * @return change in memstore size
555    */
556   @Override
557   public long upsert(Iterable<Cell> cells, long readpoint) {
558     long size = 0;
559     for (Cell cell : cells) {
560       size += upsert(cell, readpoint);
561     }
562     return size;
563   }
564 
565   /**
566    * Inserts the specified KeyValue into MemStore and deletes any existing
567    * versions of the same row/family/qualifier as the specified KeyValue.
568    * <p>
569    * First, the specified KeyValue is inserted into the Memstore.
570    * <p>
571    * If there are any existing KeyValues in this MemStore with the same row,
572    * family, and qualifier, they are removed.
573    * <p>
574    * Callers must hold the read lock.
575    *
576    * @param cell
577    * @return change in size of MemStore
578    */
579   private long upsert(Cell cell, long readpoint) {
580     // Add the Cell to the MemStore
581     // Use the internalAdd method here since we (a) already have a lock
582     // and (b) cannot safely use the MSLAB here without potentially
583     // hitting OOME - see TestMemStore.testUpsertMSLAB for a
584     // test that triggers the pathological case if we don't avoid MSLAB
585     // here.
586     long addedSize = internalAdd(cell, false);
587 
588     // Get the Cells for the row/family/qualifier regardless of timestamp.
589     // For this case we want to clean up any other puts
590     Cell firstCell = KeyValueUtil.createFirstOnRow(
591         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
592         cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
593         cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
594     SortedSet<Cell> ss = cellSet.tailSet(firstCell);
595     Iterator<Cell> it = ss.iterator();
596     // versions visible to oldest scanner
597     int versionsVisible = 0;
598     while ( it.hasNext() ) {
599       Cell cur = it.next();
600 
601       if (cell == cur) {
602         // ignore the one just put in
603         continue;
604       }
605       // check that this is the row and column we are interested in, otherwise bail
606       if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
607         // only remove Puts that concurrent scanners cannot possibly see
608         if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
609             cur.getSequenceId() <= readpoint) {
610           if (versionsVisible >= 1) {
611             // if we get here we have seen at least one version visible to the oldest scanner,
612             // which means we can prove that no scanner will see this version
613 
614             // false means there was a change, so give us the size.
615             long delta = heapSizeChange(cur, true);
616             addedSize -= delta;
617             this.size.addAndGet(-delta);
618             it.remove();
619             setOldestEditTimeToNow();
620           } else {
621             versionsVisible++;
622           }
623         }
624       } else {
625         // past the row or column, done
626         break;
627       }
628     }
629     return addedSize;
630   }
631 
632   /*
633    * Immutable data structure to hold member found in set and the set it was
634    * found in. Include set because it is carrying context.
635    */
636   private static class Member {
637     final Cell cell;
638     final NavigableSet<Cell> set;
639     Member(final NavigableSet<Cell> s, final Cell kv) {
640       this.cell = kv;
641       this.set = s;
642     }
643   }
644 
645   /*
646    * @param set Set to walk back in.  Pass a first in row or we'll return
647    * same row (loop).
648    * @param state Utility and context.
649    * @param firstOnRow First item on the row after the one we want to find a
650    * member in.
651    * @return Null or member of row previous to <code>firstOnRow</code>
652    */
653   private Member memberOfPreviousRow(NavigableSet<Cell> set,
654       final GetClosestRowBeforeTracker state, final Cell firstOnRow) {
655     NavigableSet<Cell> head = set.headSet(firstOnRow, false);
656     if (head.isEmpty()) return null;
657     for (Iterator<Cell> i = head.descendingIterator(); i.hasNext();) {
658       Cell found = i.next();
659       if (state.isExpired(found)) {
660         i.remove();
661         continue;
662       }
663       return new Member(head, found);
664     }
665     return null;
666   }
667 
668   /**
669    * @return scanner on memstore and snapshot in this order.
670    */
671   @Override
672   public List<KeyValueScanner> getScanners(long readPt) {
673     return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt));
674   }
675 
676   /**
677    * Check if this memstore may contain the required keys
678    * @param scan
679    * @return False if the key definitely does not exist in this Memstore
680    */
681   public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
682     return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
683         snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange()))
684         && (Math.max(timeRangeTracker.getMaximumTimestamp(),
685                      snapshotTimeRangeTracker.getMaximumTimestamp()) >=
686             oldestUnexpiredTS);
687   }
688 
689   /*
690    * MemStoreScanner implements the KeyValueScanner.
691    * It lets the caller scan the contents of a memstore -- both current
692    * map and snapshot.
693    * This behaves as if it were a real scanner but does not maintain position.
694    */
695   protected class MemStoreScanner extends NonLazyKeyValueScanner {
696     // Next row information for either cellSet or snapshot
697     private Cell cellSetNextRow = null;
698     private Cell snapshotNextRow = null;
699 
700     // last iterated Cells for cellSet and snapshot (to restore iterator state after reseek)
701     private Cell cellSetItRow = null;
702     private Cell snapshotItRow = null;
703     
704     // iterator based scanning.
705     private Iterator<Cell> cellSetIt;
706     private Iterator<Cell> snapshotIt;
707 
708     // The cellSet and snapshot at the time of creating this scanner
709     private CellSkipListSet cellSetAtCreation;
710     private CellSkipListSet snapshotAtCreation;
711 
712     // the pre-calculated Cell to be returned by peek() or next()
713     private Cell theNext;
714 
715     // The allocator and snapshot allocator at the time of creating this scanner
716     volatile MemStoreLAB allocatorAtCreation;
717     volatile MemStoreLAB snapshotAllocatorAtCreation;
718     
719     // A flag represents whether could stop skipping Cells for MVCC
720     // if have encountered the next row. Only used for reversed scan
721     private boolean stopSkippingCellsIfNextRow = false;
722 
723     private long readPoint;
724 
725     /*
726     Some notes...
727 
728      So memstorescanner is fixed at creation time. this includes pointers/iterators into
729     existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
730     snapshot is moved.  since kvset is null there is no point on reseeking on both,
731       we can save us the trouble. During the snapshot->hfile transition, the memstore
732       scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
733       potentially do something smarter by adjusting the existing memstore scanner.
734 
735       But there is a greater problem here, that being once a scanner has progressed
736       during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
737       if a scan lasts a little while, there is a chance for new entries in kvset to
738       become available but we will never see them.  This needs to be handled at the
739       StoreScanner level with coordination with MemStoreScanner.
740 
741       Currently, this problem is only partly managed: during the small amount of time
742       when the StoreScanner has not yet created a new MemStoreScanner, we will miss
743       the adds to kvset in the MemStoreScanner.
744     */
745 
746     MemStoreScanner(long readPoint) {
747       super();
748 
749       this.readPoint = readPoint;
750       cellSetAtCreation = cellSet;
751       snapshotAtCreation = snapshot;
752       if (allocator != null) {
753         this.allocatorAtCreation = allocator;
754         this.allocatorAtCreation.incScannerCount();
755       }
756       if (snapshotAllocator != null) {
757         this.snapshotAllocatorAtCreation = snapshotAllocator;
758         this.snapshotAllocatorAtCreation.incScannerCount();
759       }
760       if (Trace.isTracing() && Trace.currentSpan() != null) {
761         Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
762       }
763     }
764 
765     /**
766      * Lock on 'this' must be held by caller.
767      * @param it
768      * @return Next Cell
769      */
770     private Cell getNext(Iterator<Cell> it) {
771       Cell startCell = theNext;
772       Cell v = null;
773       try {
774         while (it.hasNext()) {
775           v = it.next();
776           if (v.getSequenceId() <= this.readPoint) {
777             return v;
778           }
779           if (stopSkippingCellsIfNextRow && startCell != null
780               && comparator.compareRows(v, startCell) > 0) {
781             return null;
782           }
783         }
784 
785         return null;
786       } finally {
787         if (v != null) {
788           // in all cases, remember the last Cell iterated to
789           if (it == snapshotIt) {
790             snapshotItRow = v;
791           } else {
792             cellSetItRow = v;
793           }
794         }
795       }
796     }
797 
798     /**
799      *  Set the scanner at the seek key.
800      *  Must be called only once: there is no thread safety between the scanner
801      *   and the memStore.
802      * @param key seek value
803      * @return false if the key is null or if there is no data
804      */
805     @Override
806     public synchronized boolean seek(Cell key) {
807       if (key == null) {
808         close();
809         return false;
810       }
811       // kvset and snapshot will never be null.
812       // if tailSet can't find anything, SortedSet is empty (not null).
813       cellSetIt = cellSetAtCreation.tailSet(key).iterator();
814       snapshotIt = snapshotAtCreation.tailSet(key).iterator();
815       cellSetItRow = null;
816       snapshotItRow = null;
817 
818       return seekInSubLists(key);
819     }
820 
821 
822     /**
823      * (Re)initialize the iterators after a seek or a reseek.
824      */
825     private synchronized boolean seekInSubLists(Cell key){
826       cellSetNextRow = getNext(cellSetIt);
827       snapshotNextRow = getNext(snapshotIt);
828 
829       // Calculate the next value
830       theNext = getLowest(cellSetNextRow, snapshotNextRow);
831 
832       // has data
833       return (theNext != null);
834     }
835 
836 
837     /**
838      * Move forward on the sub-lists set previously by seek.
839      * @param key seek value (should be non-null)
840      * @return true if there is at least one KV to read, false otherwise
841      */
842     @Override
843     public synchronized boolean reseek(Cell key) {
844       /*
845       See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
846       This code is executed concurrently with flush and puts, without locks.
847       Two points must be known when working on this code:
848       1) It's not possible to use the 'kvTail' and 'snapshot'
849        variables, as they are modified during a flush.
850       2) The ideal implementation for performance would use the sub skip list
851        implicitly pointed by the iterators 'kvsetIt' and
852        'snapshotIt'. Unfortunately the Java API does not offer a method to
853        get it. So we remember the last keys we iterated to and restore
854        the reseeked set to at least that point.
855        */
856       cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator();
857       snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
858 
859       return seekInSubLists(key);
860     }
861 
862 
863     @Override
864     public synchronized Cell peek() {
865       //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
866       return theNext;
867     }
868 
869     @Override
870     public synchronized Cell next() {
871       if (theNext == null) {
872           return null;
873       }
874 
875       final Cell ret = theNext;
876 
877       // Advance one of the iterators
878       if (theNext == cellSetNextRow) {
879         cellSetNextRow = getNext(cellSetIt);
880       } else {
881         snapshotNextRow = getNext(snapshotIt);
882       }
883 
884       // Calculate the next value
885       theNext = getLowest(cellSetNextRow, snapshotNextRow);
886 
887       //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
888       //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
889       //    getLowest() + " threadpoint=" + readpoint);
890       return ret;
891     }
892 
893     /*
894      * Returns the lower of the two key values, or null if they are both null.
895      * This uses comparator.compare() to compare the KeyValue using the memstore
896      * comparator.
897      */
898     private Cell getLowest(Cell first, Cell second) {
899       if (first == null && second == null) {
900         return null;
901       }
902       if (first != null && second != null) {
903         int compare = comparator.compare(first, second);
904         return (compare <= 0 ? first : second);
905       }
906       return (first != null ? first : second);
907     }
908 
909     /*
910      * Returns the higher of the two cells, or null if they are both null.
911      * This uses comparator.compare() to compare the Cell using the memstore
912      * comparator.
913      */
914     private Cell getHighest(Cell first, Cell second) {
915       if (first == null && second == null) {
916         return null;
917       }
918       if (first != null && second != null) {
919         int compare = comparator.compare(first, second);
920         return (compare > 0 ? first : second);
921       }
922       return (first != null ? first : second);
923     }
924 
925     public synchronized void close() {
926       this.cellSetNextRow = null;
927       this.snapshotNextRow = null;
928 
929       this.cellSetIt = null;
930       this.snapshotIt = null;
931       
932       if (allocatorAtCreation != null) {
933         this.allocatorAtCreation.decScannerCount();
934         this.allocatorAtCreation = null;
935       }
936       if (snapshotAllocatorAtCreation != null) {
937         this.snapshotAllocatorAtCreation.decScannerCount();
938         this.snapshotAllocatorAtCreation = null;
939       }
940 
941       this.cellSetItRow = null;
942       this.snapshotItRow = null;
943     }
944 
945     /**
946      * MemStoreScanner returns max value as sequence id because it will
947      * always have the latest data among all files.
948      */
949     @Override
950     public long getSequenceID() {
951       return Long.MAX_VALUE;
952     }
953 
954     @Override
955     public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns,
956         long oldestUnexpiredTS) {
957       return shouldSeek(scan, oldestUnexpiredTS);
958     }
959 
960     /**
961      * Seek scanner to the given key first. If it returns false(means
962      * peek()==null) or scanner's peek row is bigger than row of given key, seek
963      * the scanner to the previous row of given key
964      */
965     @Override
966     public synchronized boolean backwardSeek(Cell key) {
967       seek(key);
968       if (peek() == null || comparator.compareRows(peek(), key) > 0) {
969         return seekToPreviousRow(key);
970       }
971       return true;
972     }
973 
974     /**
975      * Separately get the KeyValue before the specified key from kvset and
976      * snapshotset, and use the row of higher one as the previous row of
977      * specified key, then seek to the first KeyValue of previous row
978      */
979     @Override
980     public synchronized boolean seekToPreviousRow(Cell key) {
981       Cell firstKeyOnRow = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
982           key.getRowLength());
983       SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow);
984       Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
985       SortedSet<Cell> snapshotHead = snapshotAtCreation
986           .headSet(firstKeyOnRow);
987       Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
988           .last();
989       Cell lastCellBeforeRow = getHighest(cellSetBeforeRow, snapshotBeforeRow);
990       if (lastCellBeforeRow == null) {
991         theNext = null;
992         return false;
993       }
994       Cell firstKeyOnPreviousRow = KeyValueUtil.createFirstOnRow(lastCellBeforeRow.getRowArray(),
995           lastCellBeforeRow.getRowOffset(), lastCellBeforeRow.getRowLength());
996       this.stopSkippingCellsIfNextRow = true;
997       seek(firstKeyOnPreviousRow);
998       this.stopSkippingCellsIfNextRow = false;
999       if (peek() == null
1000           || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
1001         return seekToPreviousRow(lastCellBeforeRow);
1002       }
1003       return true;
1004     }
1005 
1006     @Override
1007     public synchronized boolean seekToLastRow() {
1008       Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation
1009           .last();
1010       Cell second = snapshotAtCreation.isEmpty() ? null
1011           : snapshotAtCreation.last();
1012       Cell higherCell = getHighest(first, second);
1013       if (higherCell == null) {
1014         return false;
1015       }
1016       Cell firstCellOnLastRow = KeyValueUtil.createFirstOnRow(higherCell.getRowArray(),
1017           higherCell.getRowOffset(), higherCell.getRowLength());
1018       if (seek(firstCellOnLastRow)) {
1019         return true;
1020       } else {
1021         return seekToPreviousRow(higherCell);
1022       }
1023 
1024     }
1025   }
1026 
1027   public final static long FIXED_OVERHEAD = ClassSize.align(
1028       ClassSize.OBJECT + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG));
1029 
1030   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
1031       ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
1032       (2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
1033 
1034   /*
1035    * Calculate how the MemStore size has changed.  Includes overhead of the
1036    * backing Map.
1037    * @param cell
1038    * @param notpresent True if the cell was NOT present in the set.
1039    * @return Size
1040    */
1041   static long heapSizeChange(final Cell cell, final boolean notpresent) {
1042     return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
1043         + CellUtil.estimatedHeapSizeOf(cell)) : 0;
1044   }
1045 
1046   private long keySize() {
1047     return heapSize() - DEEP_OVERHEAD;
1048   }
1049 
1050   /**
1051    * Get the entire heap usage for this MemStore not including keys in the
1052    * snapshot.
1053    */
1054   @Override
1055   public long heapSize() {
1056     return size.get();
1057   }
1058 
1059   @Override
1060   public long size() {
1061     return heapSize();
1062   }
1063 
1064   /**
1065    * Code to help figure if our approximation of object heap sizes is close
1066    * enough.  See hbase-900.  Fills memstores then waits so user can heap
1067    * dump and bring up resultant hprof in something like jprofiler which
1068    * allows you get 'deep size' on objects.
1069    * @param args main args
1070    */
1071   public static void main(String [] args) {
1072     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
1073     LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
1074       runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
1075     LOG.info("vmInputArguments=" + runtime.getInputArguments());
1076     DefaultMemStore memstore1 = new DefaultMemStore();
1077     // TODO: x32 vs x64
1078     long size = 0;
1079     final int count = 10000;
1080     byte [] fam = Bytes.toBytes("col");
1081     byte [] qf = Bytes.toBytes("umn");
1082     byte [] empty = new byte[0];
1083     for (int i = 0; i < count; i++) {
1084       // Give each its own ts
1085       Pair<Long, Cell> ret = memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
1086       size += ret.getFirst();
1087     }
1088     LOG.info("memstore1 estimated size=" + size);
1089     for (int i = 0; i < count; i++) {
1090       Pair<Long, Cell> ret = memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
1091       size += ret.getFirst();
1092     }
1093     LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
1094     // Make a variably sized memstore.
1095     DefaultMemStore memstore2 = new DefaultMemStore();
1096     for (int i = 0; i < count; i++) {
1097       Pair<Long, Cell> ret = memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
1098         new byte[i]));
1099       size += ret.getFirst();
1100     }
1101     LOG.info("memstore2 estimated size=" + size);
1102     final int seconds = 30;
1103     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
1104     for (int i = 0; i < seconds; i++) {
1105       // Thread.sleep(1000);
1106     }
1107     LOG.info("Exiting.");
1108   }
1109 
1110 }