View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.lang.management.ManagementFactory;
23  import java.lang.management.RuntimeMXBean;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.NavigableSet;
29  import java.util.SortedSet;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.KeyValueUtil;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.io.TimeRange;
44  import org.apache.hadoop.hbase.util.ByteRange;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.CollectionBackedScanner;
48  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49  import org.apache.hadoop.hbase.util.ReflectionUtils;
50  import org.apache.htrace.Trace;
51  
52  import com.google.common.annotations.VisibleForTesting;
53  
54  /**
55   * The MemStore holds in-memory modifications to the Store.  Modifications
56   * are {@link Cell}s.  When asked to flush, current memstore is moved
57   * to snapshot and is cleared.  We continue to serve edits out of new memstore
58   * and backing snapshot until flusher reports in that the flush succeeded. At
59   * this point we let the snapshot go.
60   *  <p>
61   * The MemStore functions should not be called in parallel. Callers should hold
62   *  write and read locks. This is done in {@link HStore}.
63   *  </p>
64   *
65   * TODO: Adjust size of the memstore when we remove items because they have
66   * been deleted.
67   * TODO: With new KVSLS, need to make sure we update HeapSize with difference
68   * in KV size.
69   */
70  @InterfaceAudience.Private
71  public class DefaultMemStore implements MemStore {
72    private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
73    static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
74    private static final boolean USEMSLAB_DEFAULT = true;
75    static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
76  
77    private Configuration conf;
78  
79    // MemStore.  Use a CellSkipListSet rather than SkipListSet because of the
80    // better semantics.  The Map will overwrite if passed a key it already had
81    // whereas the Set will not add new Cell if key is same though value might be
82    // different.  Value is not important -- just make sure always same
83    // reference passed.
84    volatile CellSkipListSet cellSet;
85  
86    // Snapshot of memstore.  Made for flusher.
87    volatile CellSkipListSet snapshot;
88  
89    final KeyValue.KVComparator comparator;
90  
91    // Used to track own heapSize
92    final AtomicLong size;
93    private volatile long snapshotSize;
94  
95    // Used to track when to flush
96    volatile long timeOfOldestEdit = Long.MAX_VALUE;
97  
98    TimeRangeTracker timeRangeTracker;
99    TimeRangeTracker snapshotTimeRangeTracker;
100 
101   volatile MemStoreLAB allocator;
102   volatile MemStoreLAB snapshotAllocator;
103   volatile long snapshotId;
104   volatile boolean tagsPresent;
105 
106   /**
107    * Default constructor. Used for tests.
108    */
109   public DefaultMemStore() {
110     this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
111   }
112 
113   /**
114    * Constructor.
115    * @param c Comparator
116    */
117   public DefaultMemStore(final Configuration conf,
118                   final KeyValue.KVComparator c) {
119     this.conf = conf;
120     this.comparator = c;
121     this.cellSet = new CellSkipListSet(c);
122     this.snapshot = new CellSkipListSet(c);
123     timeRangeTracker = new TimeRangeTracker();
124     snapshotTimeRangeTracker = new TimeRangeTracker();
125     this.size = new AtomicLong(DEEP_OVERHEAD);
126     this.snapshotSize = 0;
127     if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
128       String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
129       this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
130           new Class[] { Configuration.class }, new Object[] { conf });
131     } else {
132       this.allocator = null;
133     }
134   }
135 
136   void dump() {
137     for (Cell cell: this.cellSet) {
138       LOG.info(cell);
139     }
140     for (Cell cell: this.snapshot) {
141       LOG.info(cell);
142     }
143   }
144 
145   /**
146    * Creates a snapshot of the current memstore.
147    * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
148    */
149   @Override
150   public MemStoreSnapshot snapshot() {
151     // If snapshot currently has entries, then flusher failed or didn't call
152     // cleanup.  Log a warning.
153     if (!this.snapshot.isEmpty()) {
154       LOG.warn("Snapshot called again without clearing previous. " +
155           "Doing nothing. Another ongoing flush or did we fail last attempt?");
156     } else {
157       this.snapshotId = EnvironmentEdgeManager.currentTime();
158       this.snapshotSize = keySize();
159       if (!this.cellSet.isEmpty()) {
160         this.snapshot = this.cellSet;
161         this.cellSet = new CellSkipListSet(this.comparator);
162         this.snapshotTimeRangeTracker = this.timeRangeTracker;
163         this.timeRangeTracker = new TimeRangeTracker();
164         // Reset heap to not include any keys
165         this.size.set(DEEP_OVERHEAD);
166         this.snapshotAllocator = this.allocator;
167         // Reset allocator so we get a fresh buffer for the new memstore
168         if (allocator != null) {
169           String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
170           this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
171               new Class[] { Configuration.class }, new Object[] { conf });
172         }
173         timeOfOldestEdit = Long.MAX_VALUE;
174       }
175     }
176     MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
177         this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator),
178         this.tagsPresent);
179     this.tagsPresent = false;
180     return memStoreSnapshot;
181   }
182 
183   /**
184    * The passed snapshot was successfully persisted; it can be let go.
185    * @param id Id of the snapshot to clean out.
186    * @throws UnexpectedStateException
187    * @see #snapshot()
188    */
189   @Override
190   public void clearSnapshot(long id) throws UnexpectedStateException {
191     MemStoreLAB tmpAllocator = null;
192     if (this.snapshotId == -1) return;  // already cleared
193     if (this.snapshotId != id) {
194       throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
195           + id);
196     }
197     // OK. Passed in snapshot is same as current snapshot. If not-empty,
198     // create a new snapshot and let the old one go.
199     if (!this.snapshot.isEmpty()) {
200       this.snapshot = new CellSkipListSet(this.comparator);
201       this.snapshotTimeRangeTracker = new TimeRangeTracker();
202     }
203     this.snapshotSize = 0;
204     this.snapshotId = -1;
205     if (this.snapshotAllocator != null) {
206       tmpAllocator = this.snapshotAllocator;
207       this.snapshotAllocator = null;
208     }
209     if (tmpAllocator != null) {
210       tmpAllocator.close();
211     }
212   }
213 
214   @Override
215   public long getFlushableSize() {
216     return this.snapshotSize > 0 ? this.snapshotSize : keySize();
217   }
218 
219   @Override
220   public long getSnapshotSize() {
221     return this.snapshotSize;
222   }
223 
224   /**
225    * Write an update
226    * @param cell
227    * @return approximate size of the passed cell.
228    */
229   @Override
230   public long add(Cell cell) {
231     Cell toAdd = maybeCloneWithAllocator(cell);
232     boolean mslabUsed = (toAdd != cell);
233     return internalAdd(toAdd, mslabUsed);
234   }
235 
236   @Override
237   public long timeOfOldestEdit() {
238     return timeOfOldestEdit;
239   }
240 
241   private boolean addToCellSet(Cell e) {
242     boolean b = this.cellSet.add(e);
243     // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
244     // When we use ACL CP or Visibility CP which deals with Tags during
245     // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
246     // parse the byte[] to identify the tags length.
247     if(e.getTagsLength() > 0) {
248       tagsPresent = true;
249     }
250     setOldestEditTimeToNow();
251     return b;
252   }
253 
254   private boolean removeFromCellSet(Cell e) {
255     boolean b = this.cellSet.remove(e);
256     setOldestEditTimeToNow();
257     return b;
258   }
259 
260   void setOldestEditTimeToNow() {
261     if (timeOfOldestEdit == Long.MAX_VALUE) {
262       timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
263     }
264   }
265 
266   /**
267    * Internal version of add() that doesn't clone Cells with the
268    * allocator, and doesn't take the lock.
269    *
270    * Callers should ensure they already have the read lock taken
271    * @param toAdd the cell to add
272    * @param mslabUsed whether using MSLAB
273    * @return the heap size change in bytes
274    */
275   private long internalAdd(final Cell toAdd, boolean mslabUsed) {
276     boolean notPresent = addToCellSet(toAdd);
277     long s = heapSizeChange(toAdd, notPresent);
278     // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the
279     // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger
280     // than the counted number)
281     if (!notPresent && mslabUsed) {
282       s += getCellLength(toAdd);
283     }
284     timeRangeTracker.includeTimestamp(toAdd);
285     this.size.addAndGet(s);
286     return s;
287   }
288 
289   /**
290    * Get cell length after serialized in {@link KeyValue}
291    */
292   @VisibleForTesting
293   int getCellLength(Cell cell) {
294     return KeyValueUtil.length(cell);
295   }
296 
297   private Cell maybeCloneWithAllocator(Cell cell) {
298     if (allocator == null) {
299       return cell;
300     }
301 
302     int len = getCellLength(cell);
303     ByteRange alloc = allocator.allocateBytes(len);
304     if (alloc == null) {
305       // The allocation was too large, allocator decided
306       // not to do anything with it.
307       return cell;
308     }
309     assert alloc.getBytes() != null;
310     KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset());
311     KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
312     newKv.setSequenceId(cell.getSequenceId());
313     return newKv;
314   }
315 
316   /**
317    * Remove n key from the memstore. Only cells that have the same key and the
318    * same memstoreTS are removed.  It is ok to not update timeRangeTracker
319    * in this call. It is possible that we can optimize this method by using
320    * tailMap/iterator, but since this method is called rarely (only for
321    * error recovery), we can leave those optimization for the future.
322    * @param cell
323    */
324   @Override
325   public void rollback(Cell cell) {
326     // If the key is in the snapshot, delete it. We should not update
327     // this.size, because that tracks the size of only the memstore and
328     // not the snapshot. The flush of this snapshot to disk has not
329     // yet started because Store.flush() waits for all rwcc transactions to
330     // commit before starting the flush to disk.
331     Cell found = this.snapshot.get(cell);
332     if (found != null && found.getSequenceId() == cell.getSequenceId()) {
333       this.snapshot.remove(cell);
334       long sz = heapSizeChange(cell, true);
335       this.snapshotSize -= sz;
336     }
337     // If the key is in the memstore, delete it. Update this.size.
338     found = this.cellSet.get(cell);
339     if (found != null && found.getSequenceId() == cell.getSequenceId()) {
340       removeFromCellSet(cell);
341       long s = heapSizeChange(cell, true);
342       this.size.addAndGet(-s);
343     }
344   }
345 
346   /**
347    * Write a delete
348    * @param deleteCell
349    * @return approximate size of the passed key and value.
350    */
351   @Override
352   public long delete(Cell deleteCell) {
353     Cell toAdd = maybeCloneWithAllocator(deleteCell);
354     boolean mslabUsed = (toAdd != deleteCell);
355     return internalAdd(toAdd, mslabUsed);
356   }
357 
358   /**
359    * @param cell Find the row that comes after this one.  If null, we return the
360    * first.
361    * @return Next row or null if none found.
362    */
363   Cell getNextRow(final Cell cell) {
364     return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot));
365   }
366 
367   /*
368    * @param a
369    * @param b
370    * @return Return lowest of a or b or null if both a and b are null
371    */
372   private Cell getLowest(final Cell a, final Cell b) {
373     if (a == null) {
374       return b;
375     }
376     if (b == null) {
377       return a;
378     }
379     return comparator.compareRows(a, b) <= 0? a: b;
380   }
381 
382   /*
383    * @param key Find row that follows this one.  If null, return first.
384    * @param map Set to look in for a row beyond <code>row</code>.
385    * @return Next row or null if none found.  If one found, will be a new
386    * KeyValue -- can be destroyed by subsequent calls to this method.
387    */
388   private Cell getNextRow(final Cell key,
389       final NavigableSet<Cell> set) {
390     Cell result = null;
391     SortedSet<Cell> tail = key == null? set: set.tailSet(key);
392     // Iterate until we fall into the next row; i.e. move off current row
393     for (Cell cell: tail) {
394       if (comparator.compareRows(cell, key) <= 0)
395         continue;
396       // Note: Not suppressing deletes or expired cells.  Needs to be handled
397       // by higher up functions.
398       result = cell;
399       break;
400     }
401     return result;
402   }
403 
404   /**
405    * @param state column/delete tracking state
406    */
407   @Override
408   public void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
409     getRowKeyAtOrBefore(cellSet, state);
410     getRowKeyAtOrBefore(snapshot, state);
411   }
412 
413   /*
414    * @param set
415    * @param state Accumulates deletes and candidates.
416    */
417   private void getRowKeyAtOrBefore(final NavigableSet<Cell> set,
418       final GetClosestRowBeforeTracker state) {
419     if (set.isEmpty()) {
420       return;
421     }
422     if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
423       // Found nothing in row.  Try backing up.
424       getRowKeyBefore(set, state);
425     }
426   }
427 
428   /*
429    * Walk forward in a row from <code>firstOnRow</code>.  Presumption is that
430    * we have been passed the first possible key on a row.  As we walk forward
431    * we accumulate deletes until we hit a candidate on the row at which point
432    * we return.
433    * @param set
434    * @param firstOnRow First possible key on this row.
435    * @param state
436    * @return True if we found a candidate walking this row.
437    */
438   private boolean walkForwardInSingleRow(final SortedSet<Cell> set,
439       final Cell firstOnRow, final GetClosestRowBeforeTracker state) {
440     boolean foundCandidate = false;
441     SortedSet<Cell> tail = set.tailSet(firstOnRow);
442     if (tail.isEmpty()) return foundCandidate;
443     for (Iterator<Cell> i = tail.iterator(); i.hasNext();) {
444       Cell kv = i.next();
445       // Did we go beyond the target row? If so break.
446       if (state.isTooFar(kv, firstOnRow)) break;
447       if (state.isExpired(kv)) {
448         i.remove();
449         continue;
450       }
451       // If we added something, this row is a contender. break.
452       if (state.handle(kv)) {
453         foundCandidate = true;
454         break;
455       }
456     }
457     return foundCandidate;
458   }
459 
460   /*
461    * Walk backwards through the passed set a row at a time until we run out of
462    * set or until we get a candidate.
463    * @param set
464    * @param state
465    */
466   private void getRowKeyBefore(NavigableSet<Cell> set,
467       final GetClosestRowBeforeTracker state) {
468     Cell firstOnRow = state.getTargetKey();
469     for (Member p = memberOfPreviousRow(set, state, firstOnRow);
470         p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
471       // Make sure we don't fall out of our table.
472       if (!state.isTargetTable(p.cell)) break;
473       // Stop looking if we've exited the better candidate range.
474       if (!state.isBetterCandidate(p.cell)) break;
475       // Make into firstOnRow
476       firstOnRow = new KeyValue(p.cell.getRowArray(), p.cell.getRowOffset(), p.cell.getRowLength(),
477           HConstants.LATEST_TIMESTAMP);
478       // If we find something, break;
479       if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
480     }
481   }
482 
483   /**
484    * Only used by tests. TODO: Remove
485    *
486    * Given the specs of a column, update it, first by inserting a new record,
487    * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
488    * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
489    * store will ensure that the insert/delete each are atomic. A scanner/reader will either
490    * get the new value, or the old value and all readers will eventually only see the new
491    * value after the old was removed.
492    *
493    * @param row
494    * @param family
495    * @param qualifier
496    * @param newValue
497    * @param now
498    * @return  Timestamp
499    */
500   @Override
501   public long updateColumnValue(byte[] row,
502                                 byte[] family,
503                                 byte[] qualifier,
504                                 long newValue,
505                                 long now) {
506     Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
507     // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
508     SortedSet<Cell> snSs = snapshot.tailSet(firstCell);
509     if (!snSs.isEmpty()) {
510       Cell snc = snSs.first();
511       // is there a matching Cell in the snapshot?
512       if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
513         if (snc.getTimestamp() == now) {
514           // poop,
515           now += 1;
516         }
517       }
518     }
519 
520     // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
521     // But the timestamp should also be max(now, mostRecentTsInMemstore)
522 
523     // so we cant add the new Cell w/o knowing what's there already, but we also
524     // want to take this chance to delete some cells. So two loops (sad)
525 
526     SortedSet<Cell> ss = cellSet.tailSet(firstCell);
527     for (Cell cell : ss) {
528       // if this isnt the row we are interested in, then bail:
529       if (!CellUtil.matchingColumn(cell, family, qualifier)
530           || !CellUtil.matchingRow(cell, firstCell)) {
531         break; // rows dont match, bail.
532       }
533 
534       // if the qualifier matches and it's a put, just RM it out of the cellSet.
535       if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
536           cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
537         now = cell.getTimestamp();
538       }
539     }
540 
541     // create or update (upsert) a new Cell with
542     // 'now' and a 0 memstoreTS == immediately visible
543     List<Cell> cells = new ArrayList<Cell>(1);
544     cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
545     return upsert(cells, 1L);
546   }
547 
548   /**
549    * Update or insert the specified KeyValues.
550    * <p>
551    * For each KeyValue, insert into MemStore.  This will atomically upsert the
552    * value for that row/family/qualifier.  If a KeyValue did already exist,
553    * it will then be removed.
554    * <p>
555    * Currently the memstoreTS is kept at 0 so as each insert happens, it will
556    * be immediately visible.  May want to change this so it is atomic across
557    * all KeyValues.
558    * <p>
559    * This is called under row lock, so Get operations will still see updates
560    * atomically.  Scans will only see each KeyValue update as atomic.
561    *
562    * @param cells
563    * @param readpoint readpoint below which we can safely remove duplicate KVs
564    * @return change in memstore size
565    */
566   @Override
567   public long upsert(Iterable<Cell> cells, long readpoint) {
568     long size = 0;
569     for (Cell cell : cells) {
570       size += upsert(cell, readpoint);
571     }
572     return size;
573   }
574 
575   /**
576    * Inserts the specified KeyValue into MemStore and deletes any existing
577    * versions of the same row/family/qualifier as the specified KeyValue.
578    * <p>
579    * First, the specified KeyValue is inserted into the Memstore.
580    * <p>
581    * If there are any existing KeyValues in this MemStore with the same row,
582    * family, and qualifier, they are removed.
583    * <p>
584    * Callers must hold the read lock.
585    *
586    * @param cell
587    * @return change in size of MemStore
588    */
589   private long upsert(Cell cell, long readpoint) {
590     // Add the Cell to the MemStore
591     // Use the internalAdd method here since we (a) already have a lock
592     // and (b) cannot safely use the MSLAB here without potentially
593     // hitting OOME - see TestMemStore.testUpsertMSLAB for a
594     // test that triggers the pathological case if we don't avoid MSLAB
595     // here.
596     long addedSize = internalAdd(cell, false);
597 
598     // Get the Cells for the row/family/qualifier regardless of timestamp.
599     // For this case we want to clean up any other puts
600     Cell firstCell = KeyValueUtil.createFirstOnRow(
601         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
602         cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
603         cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
604     SortedSet<Cell> ss = cellSet.tailSet(firstCell);
605     Iterator<Cell> it = ss.iterator();
606     // versions visible to oldest scanner
607     int versionsVisible = 0;
608     while ( it.hasNext() ) {
609       Cell cur = it.next();
610 
611       if (cell == cur) {
612         // ignore the one just put in
613         continue;
614       }
615       // check that this is the row and column we are interested in, otherwise bail
616       if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
617         // only remove Puts that concurrent scanners cannot possibly see
618         if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
619             cur.getSequenceId() <= readpoint) {
620           if (versionsVisible >= 1) {
621             // if we get here we have seen at least one version visible to the oldest scanner,
622             // which means we can prove that no scanner will see this version
623 
624             // false means there was a change, so give us the size.
625             long delta = heapSizeChange(cur, true);
626             addedSize -= delta;
627             this.size.addAndGet(-delta);
628             it.remove();
629             setOldestEditTimeToNow();
630           } else {
631             versionsVisible++;
632           }
633         }
634       } else {
635         // past the row or column, done
636         break;
637       }
638     }
639     return addedSize;
640   }
641 
642   /*
643    * Immutable data structure to hold member found in set and the set it was
644    * found in. Include set because it is carrying context.
645    */
646   private static class Member {
647     final Cell cell;
648     final NavigableSet<Cell> set;
649     Member(final NavigableSet<Cell> s, final Cell kv) {
650       this.cell = kv;
651       this.set = s;
652     }
653   }
654 
655   /*
656    * @param set Set to walk back in.  Pass a first in row or we'll return
657    * same row (loop).
658    * @param state Utility and context.
659    * @param firstOnRow First item on the row after the one we want to find a
660    * member in.
661    * @return Null or member of row previous to <code>firstOnRow</code>
662    */
663   private Member memberOfPreviousRow(NavigableSet<Cell> set,
664       final GetClosestRowBeforeTracker state, final Cell firstOnRow) {
665     NavigableSet<Cell> head = set.headSet(firstOnRow, false);
666     if (head.isEmpty()) return null;
667     for (Iterator<Cell> i = head.descendingIterator(); i.hasNext();) {
668       Cell found = i.next();
669       if (state.isExpired(found)) {
670         i.remove();
671         continue;
672       }
673       return new Member(head, found);
674     }
675     return null;
676   }
677 
678   /**
679    * @return scanner on memstore and snapshot in this order.
680    */
681   @Override
682   public List<KeyValueScanner> getScanners(long readPt) {
683     MemStoreScanner scanner = new MemStoreScanner(readPt);
684     scanner.seek(CellUtil.createCell(HConstants.EMPTY_START_ROW));
685     if (scanner.peek() == null) {
686       scanner.close();
687       return null;
688     }
689     return Collections.<KeyValueScanner> singletonList(scanner);
690   }
691 
692   /**
693    * Check if this memstore may contain the required keys
694    * @param scan scan
695    * @param store holds reference to cf
696    * @param oldestUnexpiredTS
697    * @return False if the key definitely does not exist in this Memstore
698    */
699   public boolean shouldSeek(Scan scan, Store store, long oldestUnexpiredTS) {
700     byte[] cf = store.getFamily().getName();
701     TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
702     if (timeRange == null) {
703       timeRange = scan.getTimeRange();
704     }
705     return (timeRangeTracker.includesTimeRange(timeRange) ||
706         snapshotTimeRangeTracker.includesTimeRange(timeRange))
707         && (Math.max(timeRangeTracker.getMaximumTimestamp(),
708                      snapshotTimeRangeTracker.getMaximumTimestamp()) >=
709             oldestUnexpiredTS);
710   }
711 
712   /*
713    * MemStoreScanner implements the KeyValueScanner.
714    * It lets the caller scan the contents of a memstore -- both current
715    * map and snapshot.
716    * This behaves as if it were a real scanner but does not maintain position.
717    */
718   protected class MemStoreScanner extends NonLazyKeyValueScanner {
719     // Next row information for either cellSet or snapshot
720     private Cell cellSetNextRow = null;
721     private Cell snapshotNextRow = null;
722 
723     // last iterated Cells for cellSet and snapshot (to restore iterator state after reseek)
724     private Cell cellSetItRow = null;
725     private Cell snapshotItRow = null;
726     
727     // iterator based scanning.
728     private Iterator<Cell> cellSetIt;
729     private Iterator<Cell> snapshotIt;
730 
731     // The cellSet and snapshot at the time of creating this scanner
732     private CellSkipListSet cellSetAtCreation;
733     private CellSkipListSet snapshotAtCreation;
734 
735     // the pre-calculated Cell to be returned by peek() or next()
736     private Cell theNext;
737 
738     // The allocator and snapshot allocator at the time of creating this scanner
739     volatile MemStoreLAB allocatorAtCreation;
740     volatile MemStoreLAB snapshotAllocatorAtCreation;
741     
742     // A flag represents whether could stop skipping Cells for MVCC
743     // if have encountered the next row. Only used for reversed scan
744     private boolean stopSkippingCellsIfNextRow = false;
745     // Stop skipping KeyValues for MVCC if finish this row. Only used for reversed scan
746     private Cell stopSkippingKVsRow;
747 
748     private long readPoint;
749 
750     /*
751     Some notes...
752 
753      So memstorescanner is fixed at creation time. this includes pointers/iterators into
754     existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
755     snapshot is moved.  since kvset is null there is no point on reseeking on both,
756       we can save us the trouble. During the snapshot->hfile transition, the memstore
757       scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
758       potentially do something smarter by adjusting the existing memstore scanner.
759 
760       But there is a greater problem here, that being once a scanner has progressed
761       during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
762       if a scan lasts a little while, there is a chance for new entries in kvset to
763       become available but we will never see them.  This needs to be handled at the
764       StoreScanner level with coordination with MemStoreScanner.
765 
766       Currently, this problem is only partly managed: during the small amount of time
767       when the StoreScanner has not yet created a new MemStoreScanner, we will miss
768       the adds to kvset in the MemStoreScanner.
769     */
770 
771     MemStoreScanner(long readPoint) {
772       super();
773 
774       this.readPoint = readPoint;
775       cellSetAtCreation = cellSet;
776       snapshotAtCreation = snapshot;
777       if (allocator != null) {
778         this.allocatorAtCreation = allocator;
779         this.allocatorAtCreation.incScannerCount();
780       }
781       if (snapshotAllocator != null) {
782         this.snapshotAllocatorAtCreation = snapshotAllocator;
783         this.snapshotAllocatorAtCreation.incScannerCount();
784       }
785       if (Trace.isTracing() && Trace.currentSpan() != null) {
786         Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
787       }
788     }
789 
790     /**
791      * Lock on 'this' must be held by caller.
792      * @param it
793      * @return Next Cell
794      */
795     private Cell getNext(Iterator<Cell> it) {
796       Cell v = null;
797       try {
798         while (it.hasNext()) {
799           v = it.next();
800           if (v.getSequenceId() <= this.readPoint) {
801             return v;
802           }
803           if (stopSkippingCellsIfNextRow && stopSkippingKVsRow != null
804               && comparator.compareRows(v, stopSkippingKVsRow) > 0) {
805             return null;
806           }
807         }
808 
809         return null;
810       } finally {
811         if (v != null) {
812           // in all cases, remember the last Cell iterated to
813           if (it == snapshotIt) {
814             snapshotItRow = v;
815           } else {
816             cellSetItRow = v;
817           }
818         }
819       }
820     }
821 
822     /**
823      *  Set the scanner at the seek key.
824      *  Must be called only once: there is no thread safety between the scanner
825      *   and the memStore.
826      * @param key seek value
827      * @return false if the key is null or if there is no data
828      */
829     @Override
830     public synchronized boolean seek(Cell key) {
831       if (key == null) {
832         close();
833         return false;
834       }
835       // kvset and snapshot will never be null.
836       // if tailSet can't find anything, SortedSet is empty (not null).
837       cellSetIt = cellSetAtCreation.tailSet(key).iterator();
838       snapshotIt = snapshotAtCreation.tailSet(key).iterator();
839       cellSetItRow = null;
840       snapshotItRow = null;
841 
842       return seekInSubLists(key);
843     }
844 
845 
846     /**
847      * (Re)initialize the iterators after a seek or a reseek.
848      */
849     private synchronized boolean seekInSubLists(Cell key){
850       cellSetNextRow = getNext(cellSetIt);
851       snapshotNextRow = getNext(snapshotIt);
852 
853       // Calculate the next value
854       theNext = getLowest(cellSetNextRow, snapshotNextRow);
855 
856       // has data
857       return (theNext != null);
858     }
859 
860 
861     /**
862      * Move forward on the sub-lists set previously by seek.
863      * @param key seek value (should be non-null)
864      * @return true if there is at least one KV to read, false otherwise
865      */
866     @Override
867     public synchronized boolean reseek(Cell key) {
868       /*
869       See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
870       This code is executed concurrently with flush and puts, without locks.
871       Two points must be known when working on this code:
872       1) It's not possible to use the 'kvTail' and 'snapshot'
873        variables, as they are modified during a flush.
874       2) The ideal implementation for performance would use the sub skip list
875        implicitly pointed by the iterators 'kvsetIt' and
876        'snapshotIt'. Unfortunately the Java API does not offer a method to
877        get it. So we remember the last keys we iterated to and restore
878        the reseeked set to at least that point.
879        */
880       cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator();
881       snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
882 
883       return seekInSubLists(key);
884     }
885 
886 
887     @Override
888     public synchronized Cell peek() {
889       //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
890       return theNext;
891     }
892 
893     @Override
894     public synchronized Cell next() {
895       if (theNext == null) {
896           return null;
897       }
898 
899       final Cell ret = theNext;
900 
901       // Advance one of the iterators
902       if (theNext == cellSetNextRow) {
903         cellSetNextRow = getNext(cellSetIt);
904       } else {
905         snapshotNextRow = getNext(snapshotIt);
906       }
907 
908       // Calculate the next value
909       theNext = getLowest(cellSetNextRow, snapshotNextRow);
910 
911       //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
912       //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
913       //    getLowest() + " threadpoint=" + readpoint);
914       return ret;
915     }
916 
917     /*
918      * Returns the lower of the two key values, or null if they are both null.
919      * This uses comparator.compare() to compare the KeyValue using the memstore
920      * comparator.
921      */
922     private Cell getLowest(Cell first, Cell second) {
923       if (first == null && second == null) {
924         return null;
925       }
926       if (first != null && second != null) {
927         int compare = comparator.compare(first, second);
928         return (compare <= 0 ? first : second);
929       }
930       return (first != null ? first : second);
931     }
932 
933     /*
934      * Returns the higher of the two cells, or null if they are both null.
935      * This uses comparator.compare() to compare the Cell using the memstore
936      * comparator.
937      */
938     private Cell getHighest(Cell first, Cell second) {
939       if (first == null && second == null) {
940         return null;
941       }
942       if (first != null && second != null) {
943         int compare = comparator.compare(first, second);
944         return (compare > 0 ? first : second);
945       }
946       return (first != null ? first : second);
947     }
948 
949     public synchronized void close() {
950       this.cellSetNextRow = null;
951       this.snapshotNextRow = null;
952 
953       this.cellSetIt = null;
954       this.snapshotIt = null;
955       
956       if (allocatorAtCreation != null) {
957         this.allocatorAtCreation.decScannerCount();
958         this.allocatorAtCreation = null;
959       }
960       if (snapshotAllocatorAtCreation != null) {
961         this.snapshotAllocatorAtCreation.decScannerCount();
962         this.snapshotAllocatorAtCreation = null;
963       }
964 
965       this.cellSetItRow = null;
966       this.snapshotItRow = null;
967     }
968 
969     /**
970      * MemStoreScanner returns max value as sequence id because it will
971      * always have the latest data among all files.
972      */
973     @Override
974     public long getSequenceID() {
975       return Long.MAX_VALUE;
976     }
977 
978     @Override
979     public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
980       return shouldSeek(scan, store, oldestUnexpiredTS);
981     }
982 
983     /**
984      * Seek scanner to the given key first. If it returns false(means
985      * peek()==null) or scanner's peek row is bigger than row of given key, seek
986      * the scanner to the previous row of given key
987      */
988     @Override
989     public synchronized boolean backwardSeek(Cell key) {
990       seek(key);
991       if (peek() == null || comparator.compareRows(peek(), key) > 0) {
992         return seekToPreviousRow(key);
993       }
994       return true;
995     }
996 
997     /**
998      * Separately get the KeyValue before the specified key from kvset and
999      * snapshotset, and use the row of higher one as the previous row of
1000      * specified key, then seek to the first KeyValue of previous row
1001      */
1002     @Override
1003     public synchronized boolean seekToPreviousRow(Cell originalKey) {
1004       boolean keepSeeking = false;
1005       Cell key = originalKey;
1006       do {
1007         Cell firstKeyOnRow = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
1008             key.getRowLength());
1009         SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow);
1010         Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
1011         SortedSet<Cell> snapshotHead = snapshotAtCreation
1012             .headSet(firstKeyOnRow);
1013         Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
1014             .last();
1015         Cell lastCellBeforeRow = getHighest(cellSetBeforeRow, snapshotBeforeRow);
1016         if (lastCellBeforeRow == null) {
1017           theNext = null;
1018           return false;
1019         }
1020         Cell firstKeyOnPreviousRow = KeyValueUtil.createFirstOnRow(lastCellBeforeRow.getRowArray(),
1021             lastCellBeforeRow.getRowOffset(), lastCellBeforeRow.getRowLength());
1022         this.stopSkippingCellsIfNextRow = true;
1023         this.stopSkippingKVsRow = firstKeyOnPreviousRow;
1024         seek(firstKeyOnPreviousRow);
1025         this.stopSkippingCellsIfNextRow = false;
1026         if (peek() == null
1027             || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
1028           keepSeeking = true;
1029           key = firstKeyOnPreviousRow;
1030           continue;
1031         } else {
1032           keepSeeking = false;
1033         }
1034       } while (keepSeeking);
1035       return true;
1036     }
1037 
1038     @Override
1039     public synchronized boolean seekToLastRow() {
1040       Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation
1041           .last();
1042       Cell second = snapshotAtCreation.isEmpty() ? null
1043           : snapshotAtCreation.last();
1044       Cell higherCell = getHighest(first, second);
1045       if (higherCell == null) {
1046         return false;
1047       }
1048       Cell firstCellOnLastRow = KeyValueUtil.createFirstOnRow(higherCell.getRowArray(),
1049           higherCell.getRowOffset(), higherCell.getRowLength());
1050       if (seek(firstCellOnLastRow)) {
1051         return true;
1052       } else {
1053         return seekToPreviousRow(higherCell);
1054       }
1055 
1056     }
1057   }
1058 
1059   public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
1060       + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
1061 
1062   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
1063       ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
1064       (2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
1065 
1066   /*
1067    * Calculate how the MemStore size has changed.  Includes overhead of the
1068    * backing Map.
1069    * @param cell
1070    * @param notpresent True if the cell was NOT present in the set.
1071    * @return Size
1072    */
1073   static long heapSizeChange(final Cell cell, final boolean notpresent) {
1074     return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
1075         + CellUtil.estimatedHeapSizeOf(cell)) : 0;
1076   }
1077 
1078   private long keySize() {
1079     return heapSize() - DEEP_OVERHEAD;
1080   }
1081 
1082   /**
1083    * Get the entire heap usage for this MemStore not including keys in the
1084    * snapshot.
1085    */
1086   @Override
1087   public long heapSize() {
1088     return size.get();
1089   }
1090 
1091   @Override
1092   public long size() {
1093     return heapSize();
1094   }
1095 
1096   /**
1097    * Code to help figure if our approximation of object heap sizes is close
1098    * enough.  See hbase-900.  Fills memstores then waits so user can heap
1099    * dump and bring up resultant hprof in something like jprofiler which
1100    * allows you get 'deep size' on objects.
1101    * @param args main args
1102    */
1103   public static void main(String [] args) {
1104     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
1105     LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
1106       runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
1107     LOG.info("vmInputArguments=" + runtime.getInputArguments());
1108     DefaultMemStore memstore1 = new DefaultMemStore();
1109     // TODO: x32 vs x64
1110     long size = 0;
1111     final int count = 10000;
1112     byte [] fam = Bytes.toBytes("col");
1113     byte [] qf = Bytes.toBytes("umn");
1114     byte [] empty = new byte[0];
1115     for (int i = 0; i < count; i++) {
1116       // Give each its own ts
1117       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
1118     }
1119     LOG.info("memstore1 estimated size=" + size);
1120     for (int i = 0; i < count; i++) {
1121       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
1122     }
1123     LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
1124     // Make a variably sized memstore.
1125     DefaultMemStore memstore2 = new DefaultMemStore();
1126     for (int i = 0; i < count; i++) {
1127       size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]));
1128     }
1129     LOG.info("memstore2 estimated size=" + size);
1130     final int seconds = 30;
1131     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
1132     for (int i = 0; i < seconds; i++) {
1133       // Thread.sleep(1000);
1134     }
1135     LOG.info("Exiting.");
1136   }
1137 
1138 }