001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.NavigableSet;
023import java.util.SortedSet;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.CellComparator;
026import org.apache.hadoop.hbase.ExtendedCell;
027import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.hbase.util.ClassSize;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033
034/**
035 * An abstract class, which implements the behaviour shared by all concrete memstore instances.
036 */
037@InterfaceAudience.Private
038public abstract class AbstractMemStore implements MemStore {
039
040  private static final long NO_SNAPSHOT_ID = -1;
041
042  private final Configuration conf;
043  private final CellComparator comparator;
044
045  // active segment absorbs write operations
046  private volatile MutableSegment active;
047  // Snapshot of memstore. Made for flusher.
048  protected volatile ImmutableSegment snapshot;
049  protected volatile long snapshotId;
050  // Used to track when to flush
051  private volatile long timeOfOldestEdit;
052
053  protected RegionServicesForStores regionServices;
054
055  // @formatter:off
056  public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT
057    + (5 * ClassSize.REFERENCE)
058    + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit
059  // @formatter:on
060
061  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
062
063  public static void addToScanners(List<? extends Segment> segments, long readPt,
064    List<KeyValueScanner> scanners) {
065    for (Segment item : segments) {
066      addToScanners(item, readPt, scanners);
067    }
068  }
069
070  protected static void addToScanners(Segment segment, long readPt,
071    List<KeyValueScanner> scanners) {
072    if (!segment.isEmpty()) {
073      scanners.add(segment.getScanner(readPt));
074    }
075  }
076
077  protected AbstractMemStore(final Configuration conf, final CellComparator c,
078    final RegionServicesForStores regionServices) {
079    this.conf = conf;
080    this.comparator = c;
081    this.regionServices = regionServices;
082    resetActive();
083    resetTimeOfOldestEdit();
084    this.snapshot = SegmentFactory.instance().createImmutableSegment(c);
085    this.snapshotId = NO_SNAPSHOT_ID;
086  }
087
088  protected void resetActive() {
089    // Record the MutableSegment' heap overhead when initialing
090    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
091    // Reset heap to not include any keys
092    active = SegmentFactory.instance().createMutableSegment(conf, comparator, memstoreAccounting);
093    // regionServices can be null when testing
094    if (regionServices != null) {
095      regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
096        memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
097        memstoreAccounting.getCellsCount());
098    }
099  }
100
101  protected void resetTimeOfOldestEdit() {
102    this.timeOfOldestEdit = Long.MAX_VALUE;
103  }
104
105  /**
106   * Updates the wal with the lowest sequence id (oldest entry) that is still in memory
107   * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
108   *                         only if it is greater than the previous sequence id
109   */
110  public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
111
112  @Override
113  public void add(Iterable<ExtendedCell> cells, MemStoreSizing memstoreSizing) {
114    for (ExtendedCell cell : cells) {
115      add(cell, memstoreSizing);
116    }
117  }
118
119  @Override
120  public void add(ExtendedCell cell, MemStoreSizing memstoreSizing) {
121    doAddOrUpsert(cell, 0, memstoreSizing, true);
122  }
123
124  /*
125   * Inserts the specified Cell into MemStore and deletes any existing versions of the same
126   * row/family/qualifier as the specified Cell. <p> First, the specified Cell is inserted into the
127   * Memstore. <p> If there are any existing Cell in this MemStore with the same row, family, and
128   * qualifier, they are removed. <p> Callers must hold the read lock.
129   * @param cell the cell to be updated
130   * @param readpoint readpoint below which we can safely remove duplicate KVs
131   * @param memstoreSizing object to accumulate changed size
132   */
133  private void upsert(ExtendedCell cell, long readpoint, MemStoreSizing memstoreSizing) {
134    doAddOrUpsert(cell, readpoint, memstoreSizing, false);
135  }
136
137  private void doAddOrUpsert(ExtendedCell cell, long readpoint, MemStoreSizing memstoreSizing,
138    boolean doAdd) {
139    MutableSegment currentActive;
140    boolean succ = false;
141    while (!succ) {
142      currentActive = getActive();
143      succ = preUpdate(currentActive, cell, memstoreSizing);
144      if (succ) {
145        if (doAdd) {
146          doAdd(currentActive, cell, memstoreSizing);
147        } else {
148          doUpsert(currentActive, cell, readpoint, memstoreSizing);
149        }
150        postUpdate(currentActive);
151      }
152    }
153  }
154
155  protected void doAdd(MutableSegment currentActive, ExtendedCell cell,
156    MemStoreSizing memstoreSizing) {
157    ExtendedCell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
158    boolean mslabUsed = (toAdd != cell);
159    // This cell data is backed by the same byte[] where we read request in RPC(See
160    // HBASE-15180). By default, MSLAB is ON and we might have copied cell to MSLAB area. If
161    // not we must do below deep copy. Or else we will keep referring to the bigger chunk of
162    // memory and prevent it from getting GCed.
163    // Copy to MSLAB would not have happened if
164    // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled"
165    // 2. When the size of the cell is bigger than the max size supported by MSLAB. See
166    // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB
167    // 3. When cells are from Append/Increment operation.
168    if (!mslabUsed) {
169      toAdd = deepCopyIfNeeded(toAdd);
170    }
171    internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing);
172  }
173
174  private void doUpsert(MutableSegment currentActive, ExtendedCell cell, long readpoint,
175    MemStoreSizing memstoreSizing) {
176    // Add the Cell to the MemStore
177    // Use the internalAdd method here since we
178    // (a) already have a lock and
179    // (b) cannot safely use the MSLAB here without potentially hitting OOME
180    // - see TestMemStore.testUpsertMSLAB for a test that triggers the pathological case if we don't
181    // avoid MSLAB here.
182    // This cell data is backed by the same byte[] where we read request in RPC(See
183    // HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger
184    // chunk of memory and prevent it from getting GCed.
185    cell = deepCopyIfNeeded(cell);
186    boolean sizeAddedPreOperation = sizeAddedPreOperation();
187    currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation);
188    setOldestEditTimeToNow();
189  }
190
191  /**
192   * Issue any synchronization and test needed before applying the update
193   * @param currentActive  the segment to be updated
194   * @param cell           the cell to be added
195   * @param memstoreSizing object to accumulate region size changes
196   * @return true iff can proceed with applying the update
197   */
198  protected abstract boolean preUpdate(MutableSegment currentActive, ExtendedCell cell,
199    MemStoreSizing memstoreSizing);
200
201  /**
202   * Issue any post update synchronization and tests
203   * @param currentActive updated segment
204   */
205  protected abstract void postUpdate(MutableSegment currentActive);
206
207  private static ExtendedCell deepCopyIfNeeded(ExtendedCell cell) {
208    return cell.deepClone();
209  }
210
211  @Override
212  public void upsert(Iterable<ExtendedCell> cells, long readpoint, MemStoreSizing memstoreSizing) {
213    for (ExtendedCell cell : cells) {
214      upsert(cell, readpoint, memstoreSizing);
215    }
216  }
217
218  /** Returns Oldest timestamp of all the Cells in the MemStore */
219  @Override
220  public long timeOfOldestEdit() {
221    return timeOfOldestEdit;
222  }
223
224  /**
225   * This method is protected under {@link HStore#lock} write lock,<br/>
226   * and this method is used by {@link HStore#updateStorefiles} after flushing is completed.<br/>
227   * The passed snapshot was successfully persisted; it can be let go.
228   * @param id Id of the snapshot to clean out.
229   * @see MemStore#snapshot()
230   */
231  @Override
232  public void clearSnapshot(long id) throws UnexpectedStateException {
233    if (this.snapshotId == -1) return; // already cleared
234    if (this.snapshotId != id) {
235      throw new UnexpectedStateException(
236        "Current snapshot id is " + this.snapshotId + ",passed " + id);
237    }
238    // OK. Passed in snapshot is same as current snapshot. If not-empty,
239    // create a new snapshot and let the old one go.
240    doClearSnapShot();
241  }
242
243  protected void doClearSnapShot() {
244    Segment oldSnapshot = this.snapshot;
245    if (!this.snapshot.isEmpty()) {
246      this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);
247    }
248    this.snapshotId = NO_SNAPSHOT_ID;
249    oldSnapshot.close();
250  }
251
252  @Override
253  public MemStoreSize getSnapshotSize() {
254    return this.snapshot.getMemStoreSize();
255  }
256
257  @Override
258  public String toString() {
259    StringBuilder buf = new StringBuilder();
260    int i = 1;
261    try {
262      for (Segment segment : getSegments()) {
263        buf.append("Segment (").append(i).append(") ").append(segment.toString()).append("; ");
264        i++;
265      }
266    } catch (IOException e) {
267      return e.toString();
268    }
269    return buf.toString();
270  }
271
272  protected Configuration getConfiguration() {
273    return conf;
274  }
275
276  protected void dump(Logger log) {
277    getActive().dump(log);
278    snapshot.dump(log);
279  }
280
281  /** Returns Return lowest of a or b or null if both a and b are null */
282  protected ExtendedCell getLowest(final ExtendedCell a, final ExtendedCell b) {
283    if (a == null) {
284      return b;
285    }
286    if (b == null) {
287      return a;
288    }
289    return comparator.compareRows(a, b) <= 0 ? a : b;
290  }
291
292  /**
293   * @param key Find row that follows this one. If null, return first.
294   * @param set Set to look in for a row beyond <code>row</code>.
295   * @return Next row or null if none found. If one found, will be a new KeyValue -- can be
296   *         destroyed by subsequent calls to this method.
297   */
298  protected ExtendedCell getNextRow(final ExtendedCell key, final NavigableSet<ExtendedCell> set) {
299    ExtendedCell result = null;
300    SortedSet<ExtendedCell> tail = key == null ? set : set.tailSet(key);
301    // Iterate until we fall into the next row; i.e. move off current row
302    for (ExtendedCell cell : tail) {
303      if (comparator.compareRows(cell, key) <= 0) {
304        continue;
305      }
306      // Note: Not suppressing deletes or expired cells. Needs to be handled
307      // by higher up functions.
308      result = cell;
309      break;
310    }
311    return result;
312  }
313
314  /**
315   * If the segment has a memory allocator the cell is being cloned to this space, and returned;
316   * Otherwise the given cell is returned When a cell's size is too big (bigger than maxAlloc), it
317   * is not allocated on MSLAB. Since the process of flattening to CellChunkMap assumes that all
318   * cells are allocated on MSLAB, during this process, the input parameter forceCloneOfBigCell is
319   * set to 'true' and the cell is copied into MSLAB.
320   * @param cell                the cell to clone
321   * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap.
322   * @return either the given cell or its clone
323   */
324  private ExtendedCell maybeCloneWithAllocator(MutableSegment currentActive, ExtendedCell cell,
325    boolean forceCloneOfBigCell) {
326    return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
327  }
328
329  /**
330   * Internal version of add() that doesn't clone Cells with the allocator, and doesn't take the
331   * lock. Callers should ensure they already have the read lock taken
332   * @param toAdd          the cell to add
333   * @param mslabUsed      whether using MSLAB
334   * @param memstoreSizing object to accumulate changed size
335   */
336  private void internalAdd(MutableSegment currentActive, final ExtendedCell toAdd,
337    final boolean mslabUsed, MemStoreSizing memstoreSizing) {
338    boolean sizeAddedPreOperation = sizeAddedPreOperation();
339    currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation);
340    setOldestEditTimeToNow();
341  }
342
343  protected abstract boolean sizeAddedPreOperation();
344
345  private void setOldestEditTimeToNow() {
346    if (timeOfOldestEdit == Long.MAX_VALUE) {
347      timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
348    }
349  }
350
351  /**
352   * Returns The total size of cells in this memstore. We will not consider cells in the snapshot
353   */
354  protected abstract long keySize();
355
356  /**
357   * @return The total heap size of cells in this memstore. We will not consider cells in the
358   *         snapshot
359   */
360  protected abstract long heapSize();
361
362  protected CellComparator getComparator() {
363    return comparator;
364  }
365
366  MutableSegment getActive() {
367    return active;
368  }
369
370  ImmutableSegment getSnapshot() {
371    return snapshot;
372  }
373
374  @Override
375  public void close() {
376    // active should never be null
377    active.close();
378    // for snapshot, either it is empty, where we do not reference any real segment which contains a
379    // memstore lab, or it is during snapshot, where we will clear it when calling clearSnapshot, so
380    // we do not need to close it here
381  }
382
383  /** Returns an ordered list of segments from most recent to oldest in memstore */
384  protected abstract List<Segment> getSegments() throws IOException;
385
386}