View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver;
22  
23  import java.lang.management.ManagementFactory;
24  import java.lang.management.RuntimeMXBean;
25  import java.rmi.UnexpectedException;
26  import java.util.Arrays;
27  import java.util.Collections;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.NavigableSet;
31  import java.util.SortedSet;
32  import java.util.concurrent.atomic.AtomicLong;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.HBaseConfiguration;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.client.Scan;
41  import org.apache.hadoop.hbase.io.HeapSize;
42  import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.ClassSize;
45  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
46  
47  /**
48   * The MemStore holds in-memory modifications to the Store.  Modifications
49   * are {@link KeyValue}s.  When asked to flush, current memstore is moved
50   * to snapshot and is cleared.  We continue to serve edits out of new memstore
51   * and backing snapshot until flusher reports in that the flush succeeded. At
52   * this point we let the snapshot go.
53   * <p>
54   * The MemStore functions should not be called in parallel. Callers should hold
55   * write and read locks. This is done in {@link Store}.
56   * </p>
57   * TODO: Adjust size of the memstore when we remove items because they have
58   * been deleted.
59   * TODO: With new KVSLS, need to make sure we update HeapSize with difference
60   * in KV size.
61   */
62  public class MemStore implements HeapSize {
63    private static final Log LOG = LogFactory.getLog(MemStore.class);
64  
65    static final String USEMSLAB_KEY =
66      "hbase.hregion.memstore.mslab.enabled";
67    private static final boolean USEMSLAB_DEFAULT = true;
68  
69    private Configuration conf;
70  
71    // MemStore.  Use a KeyValueSkipListSet rather than SkipListSet because of the
72    // better semantics.  The Map will overwrite if passed a key it already had
73    // whereas the Set will not add new KV if key is same though value might be
74    // different.  Value is not important -- just make sure always same
75    // reference passed.
76    volatile KeyValueSkipListSet kvset;
77  
78    // Snapshot of memstore.  Made for flusher.
79    volatile KeyValueSkipListSet snapshot;
80  
81    final KeyValue.KVComparator comparator;
82  
83    // Used comparing versions -- same r/c and ts but different type.
84    final KeyValue.KVComparator comparatorIgnoreType;
85  
86    // Used comparing versions -- same r/c and type but different timestamp.
87    final KeyValue.KVComparator comparatorIgnoreTimestamp;
88  
89    // Used to track own heapSize
90    final AtomicLong size;
91    private volatile long snapshotSize;
92  
93    // Used to track when to flush
94    volatile long timeOfOldestEdit = Long.MAX_VALUE;
95  
96    TimeRangeTracker timeRangeTracker;
97    TimeRangeTracker snapshotTimeRangeTracker;
98  
99    MemStoreLAB allocator;
100 
101 
102 
103   /**
104    * Default constructor. Used for tests.
105    */
106   public MemStore() {
107     this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
108   }
109 
110   /**
111    * Constructor.
112    * @param c Comparator
113    */
114   public MemStore(final Configuration conf,
115                   final KeyValue.KVComparator c) {
116     this.conf = conf;
117     this.comparator = c;
118     this.comparatorIgnoreTimestamp =
119       this.comparator.getComparatorIgnoringTimestamps();
120     this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
121     this.kvset = new KeyValueSkipListSet(c);
122     this.snapshot = new KeyValueSkipListSet(c);
123     timeRangeTracker = new TimeRangeTracker();
124     snapshotTimeRangeTracker = new TimeRangeTracker();
125     this.size = new AtomicLong(DEEP_OVERHEAD);
126     this.snapshotSize = 0;
127     if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
128       this.allocator = new MemStoreLAB(conf);
129     } else {
130       this.allocator = null;
131     }
132   }
133 
134   void dump() {
135     for (KeyValue kv: this.kvset) {
136       LOG.info(kv);
137     }
138     for (KeyValue kv: this.snapshot) {
139       LOG.info(kv);
140     }
141   }
142 
143   /**
144    * Creates a snapshot of the current memstore.
145    * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet<KeyValue>)}
146    * To get the snapshot made by this method, use {@link #getSnapshot()}
147    */
148   void snapshot() {
149     // If snapshot currently has entries, then flusher failed or didn't call
150     // cleanup.  Log a warning.
151     if (!this.snapshot.isEmpty()) {
152       LOG.warn("Snapshot called again without clearing previous. " +
153         "Doing nothing. Another ongoing flush or did we fail last attempt?");
154     } else {
155       if (!this.kvset.isEmpty()) {
156         this.snapshotSize = keySize();
157         this.snapshot = this.kvset;
158         this.kvset = new KeyValueSkipListSet(this.comparator);
159         this.snapshotTimeRangeTracker = this.timeRangeTracker;
160         this.timeRangeTracker = new TimeRangeTracker();
161         // Reset heap to not include any keys
162         this.size.set(DEEP_OVERHEAD);
163         // Reset allocator so we get a fresh buffer for the new memstore
164         if (allocator != null) {
165           this.allocator = new MemStoreLAB(conf);
166         }
167         timeOfOldestEdit = Long.MAX_VALUE;
168       }
169     }
170   }
171 
172   /**
173    * Return the current snapshot.
174    * Called by flusher to get current snapshot made by a previous
175    * call to {@link #snapshot()}
176    * @return Return snapshot.
177    * @see {@link #snapshot()}
178    * @see {@link #clearSnapshot(SortedSet<KeyValue>)}
179    */
180   KeyValueSkipListSet getSnapshot() {
181     return this.snapshot;
182   }
183 
184   /**
185    * On flush, how much memory we will clear.
186    * Flush will first clear out the data in snapshot if any (It will take a second flush
187    * invocation to clear the current Cell set). If snapshot is empty, current
188    * Cell set will be flushed.
189    *
190    * @return size of data that is going to be flushed
191    */
192   long getFlushableSize() {
193     return this.snapshotSize > 0 ? this.snapshotSize : keySize();
194   }
195 
196   /**
197    * The passed snapshot was successfully persisted; it can be let go.
198    * @param ss The snapshot to clean out.
199    * @throws UnexpectedException
200    * @see {@link #snapshot()}
201    */
202   void clearSnapshot(final SortedSet<KeyValue> ss)
203   throws UnexpectedException {
204     if (this.snapshot != ss) {
205       throw new UnexpectedException("Current snapshot is " +
206         this.snapshot + ", was passed " + ss);
207     }
208     // OK. Passed in snapshot is same as current snapshot.  If not-empty,
209     // create a new snapshot and let the old one go.
210     if (!ss.isEmpty()) {
211       this.snapshot = new KeyValueSkipListSet(this.comparator);
212       this.snapshotTimeRangeTracker = new TimeRangeTracker();
213     }
214     this.snapshotSize = 0;
215   }
216 
217   /**
218    * Write an update
219    * @param kv
220    * @return approximate size of the passed key and value.
221    */
222   long add(final KeyValue kv) {
223     KeyValue toAdd = maybeCloneWithAllocator(kv);
224     return internalAdd(toAdd);
225   }
226 
227   long timeOfOldestEdit() {
228     return timeOfOldestEdit;
229   }
230 
231   private boolean addToKVSet(KeyValue e) {
232     boolean b = this.kvset.add(e);
233     setOldestEditTimeToNow();
234     return b;
235   }
236 
237   private boolean removeFromKVSet(KeyValue e) {
238     boolean b = this.kvset.remove(e);
239     setOldestEditTimeToNow();
240     return b;
241   }
242 
243   void setOldestEditTimeToNow() {
244     if (timeOfOldestEdit == Long.MAX_VALUE) {
245       timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis();
246     }
247   }
248 
249   /**
250    * Internal version of add() that doesn't clone KVs with the
251    * allocator, and doesn't take the lock.
252    *
253    * Callers should ensure they already have the read lock taken
254    */
255   private long internalAdd(final KeyValue toAdd) {
256     long s = heapSizeChange(toAdd, addToKVSet(toAdd));
257     timeRangeTracker.includeTimestamp(toAdd);
258     this.size.addAndGet(s);
259     return s;
260   }
261 
262   private KeyValue maybeCloneWithAllocator(KeyValue kv) {
263     if (allocator == null) {
264       return kv;
265     }
266 
267     int len = kv.getLength();
268     Allocation alloc = allocator.allocateBytes(len);
269     if (alloc == null) {
270       // The allocation was too large, allocator decided
271       // not to do anything with it.
272       return kv;
273     }
274     assert alloc != null && alloc.getData() != null;
275     System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
276     KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
277     newKv.setMemstoreTS(kv.getMemstoreTS());
278     return newKv;
279   }
280 
281   /**
282    * Remove n key from the memstore. Only kvs that have the same key and the
283    * same memstoreTS are removed.  It is ok to not update timeRangeTracker
284    * in this call. It is possible that we can optimize this method by using
285    * tailMap/iterator, but since this method is called rarely (only for
286    * error recovery), we can leave those optimization for the future.
287    * @param kv
288    */
289   void rollback(final KeyValue kv) {
290     // If the key is in the snapshot, delete it. We should not update
291     // this.size, because that tracks the size of only the memstore and
292     // not the snapshot. The flush of this snapshot to disk has not
293     // yet started because Store.flush() waits for all rwcc transactions to
294     // commit before starting the flush to disk.
295     KeyValue found = this.snapshot.get(kv);
296     if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
297       this.snapshot.remove(kv);
298       long sz = heapSizeChange(kv, true);
299       this.snapshotSize -= sz;
300     }
301     // If the key is in the memstore, delete it. Update this.size.
302     found = this.kvset.get(kv);
303     if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
304       removeFromKVSet(kv);
305       long s = heapSizeChange(kv, true);
306       this.size.addAndGet(-s);
307     }
308   }
309 
310   /**
311    * Write a delete
312    * @param delete
313    * @return approximate size of the passed key and value.
314    */
315   long delete(final KeyValue delete) {
316     KeyValue toAdd = maybeCloneWithAllocator(delete);
317     long s = heapSizeChange(toAdd, addToKVSet(toAdd));
318     timeRangeTracker.includeTimestamp(toAdd);
319     this.size.addAndGet(s);
320     return s;
321   }
322 
323   /**
324    * @param kv Find the row that comes after this one.  If null, we return the
325    * first.
326    * @return Next row or null if none found.
327    */
328   KeyValue getNextRow(final KeyValue kv) {
329     return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
330   }
331 
332   /*
333    * @param a
334    * @param b
335    * @return Return lowest of a or b or null if both a and b are null
336    */
337   private KeyValue getLowest(final KeyValue a, final KeyValue b) {
338     if (a == null) {
339       return b;
340     }
341     if (b == null) {
342       return a;
343     }
344     return comparator.compareRows(a, b) <= 0? a: b;
345   }
346 
347   /*
348    * @param key Find row that follows this one.  If null, return first.
349    * @param map Set to look in for a row beyond <code>row</code>.
350    * @return Next row or null if none found.  If one found, will be a new
351    * KeyValue -- can be destroyed by subsequent calls to this method.
352    */
353   private KeyValue getNextRow(final KeyValue key,
354       final NavigableSet<KeyValue> set) {
355     KeyValue result = null;
356     SortedSet<KeyValue> tail = key == null? set: set.tailSet(key);
357     // Iterate until we fall into the next row; i.e. move off current row
358     for (KeyValue kv: tail) {
359       if (comparator.compareRows(kv, key) <= 0)
360         continue;
361       // Note: Not suppressing deletes or expired cells.  Needs to be handled
362       // by higher up functions.
363       result = kv;
364       break;
365     }
366     return result;
367   }
368 
369   /**
370    * @param state column/delete tracking state
371    */
372   void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
373     getRowKeyAtOrBefore(kvset, state);
374     getRowKeyAtOrBefore(snapshot, state);
375   }
376 
377   /*
378    * @param set
379    * @param state Accumulates deletes and candidates.
380    */
381   private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
382       final GetClosestRowBeforeTracker state) {
383     if (set.isEmpty()) {
384       return;
385     }
386     if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
387       // Found nothing in row.  Try backing up.
388       getRowKeyBefore(set, state);
389     }
390   }
391 
392   /*
393    * Walk forward in a row from <code>firstOnRow</code>.  Presumption is that
394    * we have been passed the first possible key on a row.  As we walk forward
395    * we accumulate deletes until we hit a candidate on the row at which point
396    * we return.
397    * @param set
398    * @param firstOnRow First possible key on this row.
399    * @param state
400    * @return True if we found a candidate walking this row.
401    */
402   private boolean walkForwardInSingleRow(final SortedSet<KeyValue> set,
403       final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
404     boolean foundCandidate = false;
405     SortedSet<KeyValue> tail = set.tailSet(firstOnRow);
406     if (tail.isEmpty()) return foundCandidate;
407     for (Iterator<KeyValue> i = tail.iterator(); i.hasNext();) {
408       KeyValue kv = i.next();
409       // Did we go beyond the target row? If so break.
410       if (state.isTooFar(kv, firstOnRow)) break;
411       if (state.isExpired(kv)) {
412         i.remove();
413         continue;
414       }
415       // If we added something, this row is a contender. break.
416       if (state.handle(kv)) {
417         foundCandidate = true;
418         break;
419       }
420     }
421     return foundCandidate;
422   }
423 
424   /*
425    * Walk backwards through the passed set a row at a time until we run out of
426    * set or until we get a candidate.
427    * @param set
428    * @param state
429    */
430   private void getRowKeyBefore(NavigableSet<KeyValue> set,
431       final GetClosestRowBeforeTracker state) {
432     KeyValue firstOnRow = state.getTargetKey();
433     for (Member p = memberOfPreviousRow(set, state, firstOnRow);
434         p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
435       // Make sure we don't fall out of our table.
436       if (!state.isTargetTable(p.kv)) break;
437       // Stop looking if we've exited the better candidate range.
438       if (!state.isBetterCandidate(p.kv)) break;
439       // Make into firstOnRow
440       firstOnRow = new KeyValue(p.kv.getRow(), HConstants.LATEST_TIMESTAMP);
441       // If we find something, break;
442       if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
443     }
444   }
445 
446   /**
447    * Given the specs of a column, update it, first by inserting a new record,
448    * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
449    * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
450    * store will ensure that the insert/delete each are atomic. A scanner/reader will either
451    * get the new value, or the old value and all readers will eventually only see the new
452    * value after the old was removed.
453    *
454    * @param row
455    * @param family
456    * @param qualifier
457    * @param newValue
458    * @param now
459    * @return  Timestamp
460    */
461   public long updateColumnValue(byte[] row,
462                                 byte[] family,
463                                 byte[] qualifier,
464                                 long newValue,
465                                 long now) {
466     KeyValue firstKv = KeyValue.createFirstOnRow(
467         row, family, qualifier);
468     // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
469     SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
470     if (!snSs.isEmpty()) {
471       KeyValue snKv = snSs.first();
472       // is there a matching KV in the snapshot?
473       if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
474         if (snKv.getTimestamp() == now) {
475           // poop,
476           now += 1;
477         }
478       }
479     }
480 
481     // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
482     // But the timestamp should also be max(now, mostRecentTsInMemstore)
483 
484     // so we cant add the new KV w/o knowing what's there already, but we also
485     // want to take this chance to delete some kvs. So two loops (sad)
486 
487     SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
488     Iterator<KeyValue> it = ss.iterator();
489     while ( it.hasNext() ) {
490       KeyValue kv = it.next();
491 
492       // if this isnt the row we are interested in, then bail:
493       if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) {
494         break; // rows dont match, bail.
495       }
496 
497       // if the qualifier matches and it's a put, just RM it out of the kvset.
498       if (kv.getType() == KeyValue.Type.Put.getCode() &&
499           kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
500         now = kv.getTimestamp();
501       }
502     }
503 
504     // create or update (upsert) a new KeyValue with
505     // 'now' and a 0 memstoreTS == immediately visible
506     return upsert(Arrays.asList(
507         new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)))
508     );
509   }
510 
511   /**
512    * Update or insert the specified KeyValues.
513    * <p>
514    * For each KeyValue, insert into MemStore.  This will atomically upsert the
515    * value for that row/family/qualifier.  If a KeyValue did already exist,
516    * it will then be removed.
517    * <p>
518    * Currently the memstoreTS is kept at 0 so as each insert happens, it will
519    * be immediately visible.  May want to change this so it is atomic across
520    * all KeyValues.
521    * <p>
522    * This is called under row lock, so Get operations will still see updates
523    * atomically.  Scans will only see each KeyValue update as atomic.
524    *
525    * @param kvs
526    * @return change in memstore size
527    */
528   public long upsert(List<KeyValue> kvs) {
529     long size = 0;
530     for (KeyValue kv : kvs) {
531       kv.setMemstoreTS(0);
532       size += upsert(kv);
533     }
534     return size;
535   }
536 
537   /**
538    * Inserts the specified KeyValue into MemStore and deletes any existing
539    * versions of the same row/family/qualifier as the specified KeyValue.
540    * <p>
541    * First, the specified KeyValue is inserted into the Memstore.
542    * <p>
543    * If there are any existing KeyValues in this MemStore with the same row,
544    * family, and qualifier, they are removed.
545    * <p>
546    * Callers must hold the read lock.
547    *
548    * @param kv
549    * @return change in size of MemStore
550    */
551   private long upsert(KeyValue kv) {
552     // Add the KeyValue to the MemStore
553     // Use the internalAdd method here since we (a) already have a lock
554     // and (b) cannot safely use the MSLAB here without potentially
555     // hitting OOME - see TestMemStore.testUpsertMSLAB for a
556     // test that triggers the pathological case if we don't avoid MSLAB
557     // here.
558     long addedSize = internalAdd(kv);
559 
560     // Get the KeyValues for the row/family/qualifier regardless of timestamp.
561     // For this case we want to clean up any other puts
562     KeyValue firstKv = KeyValue.createFirstOnRow(
563         kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
564         kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
565         kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
566     SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
567     Iterator<KeyValue> it = ss.iterator();
568     while ( it.hasNext() ) {
569       KeyValue cur = it.next();
570 
571       if (kv == cur) {
572         // ignore the one just put in
573         continue;
574       }
575       // if this isn't the row we are interested in, then bail
576       if (!kv.matchingRow(cur)) {
577         break;
578       }
579 
580       // if the qualifier matches and it's a put, remove it
581       if (kv.matchingQualifier(cur)) {
582 
583         // to be extra safe we only remove Puts that have a memstoreTS==0
584         if (kv.getType() == KeyValue.Type.Put.getCode() &&
585             kv.getMemstoreTS() == 0) {
586           // false means there was a change, so give us the size.
587           long delta = heapSizeChange(cur, true);
588           addedSize -= delta;
589           this.size.addAndGet(-delta);
590           it.remove();
591           setOldestEditTimeToNow();
592         }
593       } else {
594         // past the column, done
595         break;
596       }
597     }
598     return addedSize;
599   }
600 
601   /*
602    * Immutable data structure to hold member found in set and the set it was
603    * found in.  Include set because it is carrying context.
604    */
605   private static class Member {
606     final KeyValue kv;
607     final NavigableSet<KeyValue> set;
608     Member(final NavigableSet<KeyValue> s, final KeyValue kv) {
609       this.kv = kv;
610       this.set = s;
611     }
612   }
613 
614   /*
615    * @param set Set to walk back in.  Pass a first in row or we'll return
616    * same row (loop).
617    * @param state Utility and context.
618    * @param firstOnRow First item on the row after the one we want to find a
619    * member in.
620    * @return Null or member of row previous to <code>firstOnRow</code>
621    */
622   private Member memberOfPreviousRow(NavigableSet<KeyValue> set,
623       final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
624     NavigableSet<KeyValue> head = set.headSet(firstOnRow, false);
625     if (head.isEmpty()) return null;
626     for (Iterator<KeyValue> i = head.descendingIterator(); i.hasNext();) {
627       KeyValue found = i.next();
628       if (state.isExpired(found)) {
629         i.remove();
630         continue;
631       }
632       return new Member(head, found);
633     }
634     return null;
635   }
636 
637   /**
638    * @return scanner on memstore and snapshot in this order.
639    */
640   List<KeyValueScanner> getScanners() {
641     return Collections.<KeyValueScanner>singletonList(
642         new MemStoreScanner(MultiVersionConsistencyControl.getThreadReadPoint()));
643   }
644 
645   /**
646    * Check if this memstore may contain the required keys
647    * @param scan
648    * @return False if the key definitely does not exist in this Memstore
649    */
650   public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
651     return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
652         snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange()))
653         && (Math.max(timeRangeTracker.getMaximumTimestamp(),
654                      snapshotTimeRangeTracker.getMaximumTimestamp()) >=
655             oldestUnexpiredTS);
656   }
657 
658   public TimeRangeTracker getSnapshotTimeRangeTracker() {
659     return this.snapshotTimeRangeTracker;
660   }
661 
662   /*
663    * MemStoreScanner implements the KeyValueScanner.
664    * It lets the caller scan the contents of a memstore -- both current
665    * map and snapshot.
666    * This behaves as if it were a real scanner but does not maintain position.
667    */
668   protected class MemStoreScanner extends NonLazyKeyValueScanner {
669     // Next row information for either kvset or snapshot
670     private KeyValue kvsetNextRow = null;
671     private KeyValue snapshotNextRow = null;
672 
673     // last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
674     private KeyValue kvsetItRow = null;
675     private KeyValue snapshotItRow = null;
676     
677     // iterator based scanning.
678     private Iterator<KeyValue> kvsetIt;
679     private Iterator<KeyValue> snapshotIt;
680 
681     // The kvset and snapshot at the time of creating this scanner
682     volatile KeyValueSkipListSet kvsetAtCreation;
683     volatile KeyValueSkipListSet snapshotAtCreation;
684 
685     // the pre-calculated KeyValue to be returned by peek() or next()
686     private KeyValue theNext;
687     private final long readPoint;
688 
689     /*
690     Some notes...
691 
692      So memstorescanner is fixed at creation time. this includes pointers/iterators into
693     existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
694     snapshot is moved.  since kvset is null there is no point on reseeking on both,
695       we can save us the trouble. During the snapshot->hfile transition, the memstore
696       scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
697       potentially do something smarter by adjusting the existing memstore scanner.
698 
699       But there is a greater problem here, that being once a scanner has progressed
700       during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
701       if a scan lasts a little while, there is a chance for new entries in kvset to
702       become available but we will never see them.  This needs to be handled at the
703       StoreScanner level with coordination with MemStoreScanner.
704 
705       Currently, this problem is only partly managed: during the small amount of time
706       when the StoreScanner has not yet created a new MemStoreScanner, we will miss
707       the adds to kvset in the MemStoreScanner.
708     */
709 
710     MemStoreScanner(long readPoint) {
711       super();
712 
713       this.readPoint = readPoint;
714       kvsetAtCreation = kvset;
715       snapshotAtCreation = snapshot;
716     }
717 
718     private KeyValue getNext(Iterator<KeyValue> it) {
719       KeyValue v = null;
720       try {
721         while (it.hasNext()) {
722           v = it.next();
723           if (v.getMemstoreTS() <= readPoint) {
724             return v;
725           }
726         }
727 
728         return null;
729       } finally {
730         if (v != null) {
731           // in all cases, remember the last KV iterated to
732           if (it == snapshotIt) {
733             snapshotItRow = v;
734           } else {
735             kvsetItRow = v;
736           }
737         }
738       }
739     }
740 
741     /**
742      *  Set the scanner at the seek key.
743      *  Must be called only once: there is no thread safety between the scanner
744      *   and the memStore.
745      * @param key seek value
746      * @return false if the key is null or if there is no data
747      */
748     @Override
749     public synchronized boolean seek(KeyValue key) {
750       if (key == null) {
751         close();
752         return false;
753       }
754 
755       // kvset and snapshot will never be null.
756       // if tailSet can't find anything, SortedSet is empty (not null).
757       kvsetIt = kvsetAtCreation.tailSet(key).iterator();
758       snapshotIt = snapshotAtCreation.tailSet(key).iterator();
759       kvsetItRow = null;
760       snapshotItRow = null;
761 
762       return seekInSubLists(key);
763     }
764 
765 
766     /**
767      * (Re)initialize the iterators after a seek or a reseek.
768      */
769     private synchronized boolean seekInSubLists(KeyValue key){
770       kvsetNextRow = getNext(kvsetIt);
771       snapshotNextRow = getNext(snapshotIt);
772 
773       // Calculate the next value
774       theNext = getLowest(kvsetNextRow, snapshotNextRow);
775 
776       // has data
777       return (theNext != null);
778     }
779 
780 
781     /**
782      * Move forward on the sub-lists set previously by seek.
783      * @param key seek value (should be non-null)
784      * @return true if there is at least one KV to read, false otherwise
785      */
786     @Override
787     public synchronized boolean reseek(KeyValue key) {
788       /*
789       See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
790       This code is executed concurrently with flush and puts, without locks.
791       Two points must be known when working on this code:
792       1) It's not possible to use the 'kvTail' and 'snapshot'
793        variables, as they are modified during a flush.
794       2) The ideal implementation for performance would use the sub skip list
795        implicitly pointed by the iterators 'kvsetIt' and
796        'snapshotIt'. Unfortunately the Java API does not offer a method to
797        get it. So we remember the last keys we iterated to and restore
798        the reseeked set to at least that point.
799        */
800 
801       kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator();
802       snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
803 
804       return seekInSubLists(key);
805     }
806 
807 
808     @Override
809     public synchronized KeyValue peek() {
810       //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
811       return theNext;
812     }
813 
814     @Override
815     public synchronized KeyValue next() {
816       if (theNext == null) {
817           return null;
818       }
819 
820       final KeyValue ret = theNext;
821 
822       // Advance one of the iterators
823       if (theNext == kvsetNextRow) {
824         kvsetNextRow = getNext(kvsetIt);
825       } else {
826         snapshotNextRow = getNext(snapshotIt);
827       }
828 
829       // Calculate the next value
830       theNext = getLowest(kvsetNextRow, snapshotNextRow);
831 
832       //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
833       //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
834       //    getLowest() + " threadpoint=" + readpoint);
835       return ret;
836     }
837 
838     /*
839      * Returns the lower of the two key values, or null if they are both null.
840      * This uses comparator.compare() to compare the KeyValue using the memstore
841      * comparator.
842      */
843     private KeyValue getLowest(KeyValue first, KeyValue second) {
844       if (first == null && second == null) {
845         return null;
846       }
847       if (first != null && second != null) {
848         int compare = comparator.compare(first, second);
849         return (compare <= 0 ? first : second);
850       }
851       return (first != null ? first : second);
852     }
853 
854     /*
855      * Returns the higher of the two key values, or null if they are both null.
856      * This uses comparator.compare() to compare the KeyValue using the memstore
857      * comparator.
858      */
859     private KeyValue getHighest(KeyValue first, KeyValue second) {
860       if (first == null && second == null) {
861         return null;
862       }
863       if (first != null && second != null) {
864         int compare = comparator.compare(first, second);
865         return (compare > 0 ? first : second);
866       }
867       return (first != null ? first : second);
868     }
869 
870     public synchronized void close() {
871       this.kvsetNextRow = null;
872       this.snapshotNextRow = null;
873 
874       this.kvsetIt = null;
875       this.snapshotIt = null;
876 
877       this.kvsetItRow = null;
878       this.snapshotItRow = null;
879     }
880 
881     /**
882      * MemStoreScanner returns max value as sequence id because it will
883      * always have the latest data among all files.
884      */
885     @Override
886     public long getSequenceID() {
887       return Long.MAX_VALUE;
888     }
889 
890     @Override
891     public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns,
892         long oldestUnexpiredTS) {
893       return shouldSeek(scan, oldestUnexpiredTS);
894     }
895   }
896 
897   public final static long FIXED_OVERHEAD = ClassSize.align(
898       ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG));
899 
900   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
901       ClassSize.ATOMIC_LONG +
902       (2 * ClassSize.TIMERANGE_TRACKER) +
903       (2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
904 
905   /** Used for readability when we don't store memstore timestamp in HFile */
906   public static final boolean NO_PERSISTENT_TS = false;
907 
908   /*
909    * Calculate how the MemStore size has changed.  Includes overhead of the
910    * backing Map.
911    * @param kv
912    * @param notpresent True if the kv was NOT present in the set.
913    * @return Size
914    */
915   long heapSizeChange(final KeyValue kv, final boolean notpresent) {
916     return notpresent ?
917         ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
918         0;
919   }
920 
921   /**
922    * Get the entire heap usage for this MemStore not including keys in the
923    * snapshot.
924    */
925   @Override
926   public long heapSize() {
927     return size.get();
928   }
929 
930   /**
931    * Get the heap usage of KVs in this MemStore.
932    */
933   public long keySize() {
934     return heapSize() - DEEP_OVERHEAD;
935   }
936 
937   /**
938    * Code to help figure if our approximation of object heap sizes is close
939    * enough.  See hbase-900.  Fills memstores then waits so user can heap
940    * dump and bring up resultant hprof in something like jprofiler which
941    * allows you get 'deep size' on objects.
942    * @param args main args
943    */
944   public static void main(String [] args) {
945     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
946     LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
947       runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
948     LOG.info("vmInputArguments=" + runtime.getInputArguments());
949     MemStore memstore1 = new MemStore();
950     // TODO: x32 vs x64
951     long size = 0;
952     final int count = 10000;
953     byte [] fam = Bytes.toBytes("col");
954     byte [] qf = Bytes.toBytes("umn");
955     byte [] empty = new byte[0];
956     for (int i = 0; i < count; i++) {
957       // Give each its own ts
958       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
959     }
960     LOG.info("memstore1 estimated size=" + size);
961     for (int i = 0; i < count; i++) {
962       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
963     }
964     LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
965     // Make a variably sized memstore.
966     MemStore memstore2 = new MemStore();
967     for (int i = 0; i < count; i++) {
968       size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
969         new byte[i]));
970     }
971     LOG.info("memstore2 estimated size=" + size);
972     final int seconds = 30;
973     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
974     for (int i = 0; i < seconds; i++) {
975       // Thread.sleep(1000);
976     }
977     LOG.info("Exiting.");
978   }
979 }