001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.NavigableSet;
023import java.util.SortedSet;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellComparator;
027import org.apache.hadoop.hbase.ExtendedCell;
028import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.util.ClassSize;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034
035/**
036 * An abstract class, which implements the behaviour shared by all concrete memstore instances.
037 */
038@InterfaceAudience.Private
039public abstract class AbstractMemStore implements MemStore {
040
041  private static final long NO_SNAPSHOT_ID = -1;
042
043  private final Configuration conf;
044  private final CellComparator comparator;
045
046  // active segment absorbs write operations
047  private volatile MutableSegment active;
048  // Snapshot of memstore. Made for flusher.
049  protected volatile ImmutableSegment snapshot;
050  protected volatile long snapshotId;
051  // Used to track when to flush
052  private volatile long timeOfOldestEdit;
053
054  protected RegionServicesForStores regionServices;
055
056  // @formatter:off
057  public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT
058    + (5 * ClassSize.REFERENCE)
059    + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit
060  // @formatter:on
061
062  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
063
064  public static void addToScanners(List<? extends Segment> segments, long readPt,
065    List<KeyValueScanner> scanners) {
066    for (Segment item : segments) {
067      addToScanners(item, readPt, scanners);
068    }
069  }
070
071  protected static void addToScanners(Segment segment, long readPt,
072    List<KeyValueScanner> scanners) {
073    if (!segment.isEmpty()) {
074      scanners.add(segment.getScanner(readPt));
075    }
076  }
077
078  protected AbstractMemStore(final Configuration conf, final CellComparator c,
079    final RegionServicesForStores regionServices) {
080    this.conf = conf;
081    this.comparator = c;
082    this.regionServices = regionServices;
083    resetActive();
084    resetTimeOfOldestEdit();
085    this.snapshot = SegmentFactory.instance().createImmutableSegment(c);
086    this.snapshotId = NO_SNAPSHOT_ID;
087  }
088
089  protected void resetActive() {
090    // Record the MutableSegment' heap overhead when initialing
091    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
092    // Reset heap to not include any keys
093    active = SegmentFactory.instance().createMutableSegment(conf, comparator, memstoreAccounting);
094    // regionServices can be null when testing
095    if (regionServices != null) {
096      regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
097        memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
098        memstoreAccounting.getCellsCount());
099    }
100  }
101
102  protected void resetTimeOfOldestEdit() {
103    this.timeOfOldestEdit = Long.MAX_VALUE;
104  }
105
106  /**
107   * Updates the wal with the lowest sequence id (oldest entry) that is still in memory
108   * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
109   *                         only if it is greater than the previous sequence id
110   */
111  public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
112
113  @Override
114  public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
115    for (Cell cell : cells) {
116      add(cell, memstoreSizing);
117    }
118  }
119
120  @Override
121  public void add(Cell cell, MemStoreSizing memstoreSizing) {
122    doAddOrUpsert(cell, 0, memstoreSizing, true);
123  }
124
125  /*
126   * Inserts the specified Cell into MemStore and deletes any existing versions of the same
127   * row/family/qualifier as the specified Cell. <p> First, the specified Cell is inserted into the
128   * Memstore. <p> If there are any existing Cell in this MemStore with the same row, family, and
129   * qualifier, they are removed. <p> Callers must hold the read lock.
130   * @param cell the cell to be updated
131   * @param readpoint readpoint below which we can safely remove duplicate KVs
132   * @param memstoreSizing object to accumulate changed size
133   */
134  private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
135    doAddOrUpsert(cell, readpoint, memstoreSizing, false);
136  }
137
138  private void doAddOrUpsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing,
139    boolean doAdd) {
140    MutableSegment currentActive;
141    boolean succ = false;
142    while (!succ) {
143      currentActive = getActive();
144      succ = preUpdate(currentActive, cell, memstoreSizing);
145      if (succ) {
146        if (doAdd) {
147          doAdd(currentActive, cell, memstoreSizing);
148        } else {
149          doUpsert(currentActive, cell, readpoint, memstoreSizing);
150        }
151        postUpdate(currentActive);
152      }
153    }
154  }
155
156  protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
157    Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
158    boolean mslabUsed = (toAdd != cell);
159    // This cell data is backed by the same byte[] where we read request in RPC(See
160    // HBASE-15180). By default, MSLAB is ON and we might have copied cell to MSLAB area. If
161    // not we must do below deep copy. Or else we will keep referring to the bigger chunk of
162    // memory and prevent it from getting GCed.
163    // Copy to MSLAB would not have happened if
164    // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled"
165    // 2. When the size of the cell is bigger than the max size supported by MSLAB. See
166    // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB
167    // 3. When cells are from Append/Increment operation.
168    if (!mslabUsed) {
169      toAdd = deepCopyIfNeeded(toAdd);
170    }
171    internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing);
172  }
173
174  private void doUpsert(MutableSegment currentActive, Cell cell, long readpoint,
175    MemStoreSizing memstoreSizing) {
176    // Add the Cell to the MemStore
177    // Use the internalAdd method here since we (a) already have a lock
178    // and (b) cannot safely use the MSLAB here without potentially
179    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
180    // test that triggers the pathological case if we don't avoid MSLAB
181    // here.
182    // This cell data is backed by the same byte[] where we read request in RPC(See
183    // HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger
184    // chunk of memory and prevent it from getting GCed.
185    cell = deepCopyIfNeeded(cell);
186    boolean sizeAddedPreOperation = sizeAddedPreOperation();
187    currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation);
188    setOldestEditTimeToNow();
189  }
190
191  /**
192   * Issue any synchronization and test needed before applying the update
193   * @param currentActive  the segment to be updated
194   * @param cell           the cell to be added
195   * @param memstoreSizing object to accumulate region size changes
196   * @return true iff can proceed with applying the update
197   */
198  protected abstract boolean preUpdate(MutableSegment currentActive, Cell cell,
199    MemStoreSizing memstoreSizing);
200
201  /**
202   * Issue any post update synchronization and tests
203   * @param currentActive updated segment
204   */
205  protected abstract void postUpdate(MutableSegment currentActive);
206
207  private static Cell deepCopyIfNeeded(Cell cell) {
208    if (cell instanceof ExtendedCell) {
209      return ((ExtendedCell) cell).deepClone();
210    }
211    return cell;
212  }
213
214  @Override
215  public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) {
216    for (Cell cell : cells) {
217      upsert(cell, readpoint, memstoreSizing);
218    }
219  }
220
221  /** Returns Oldest timestamp of all the Cells in the MemStore */
222  @Override
223  public long timeOfOldestEdit() {
224    return timeOfOldestEdit;
225  }
226
227  /**
228   * This method is protected under {@link HStore#lock} write lock,<br/>
229   * and this method is used by {@link HStore#updateStorefiles} after flushing is completed.<br/>
230   * The passed snapshot was successfully persisted; it can be let go.
231   * @param id Id of the snapshot to clean out.
232   * @see MemStore#snapshot()
233   */
234  @Override
235  public void clearSnapshot(long id) throws UnexpectedStateException {
236    if (this.snapshotId == -1) return; // already cleared
237    if (this.snapshotId != id) {
238      throw new UnexpectedStateException(
239        "Current snapshot id is " + this.snapshotId + ",passed " + id);
240    }
241    // OK. Passed in snapshot is same as current snapshot. If not-empty,
242    // create a new snapshot and let the old one go.
243    doClearSnapShot();
244  }
245
246  protected void doClearSnapShot() {
247    Segment oldSnapshot = this.snapshot;
248    if (!this.snapshot.isEmpty()) {
249      this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);
250    }
251    this.snapshotId = NO_SNAPSHOT_ID;
252    oldSnapshot.close();
253  }
254
255  @Override
256  public MemStoreSize getSnapshotSize() {
257    return this.snapshot.getMemStoreSize();
258  }
259
260  @Override
261  public String toString() {
262    StringBuilder buf = new StringBuilder();
263    int i = 1;
264    try {
265      for (Segment segment : getSegments()) {
266        buf.append("Segment (").append(i).append(") ").append(segment.toString()).append("; ");
267        i++;
268      }
269    } catch (IOException e) {
270      return e.toString();
271    }
272    return buf.toString();
273  }
274
275  protected Configuration getConfiguration() {
276    return conf;
277  }
278
279  protected void dump(Logger log) {
280    getActive().dump(log);
281    snapshot.dump(log);
282  }
283
284  /*
285   * @return Return lowest of a or b or null if both a and b are null
286   */
287  protected Cell getLowest(final Cell a, final Cell b) {
288    if (a == null) {
289      return b;
290    }
291    if (b == null) {
292      return a;
293    }
294    return comparator.compareRows(a, b) <= 0 ? a : b;
295  }
296
297  /*
298   * @param key Find row that follows this one. If null, return first.
299   * @param set Set to look in for a row beyond <code>row</code>.
300   * @return Next row or null if none found. If one found, will be a new KeyValue -- can be
301   * destroyed by subsequent calls to this method.
302   */
303  protected Cell getNextRow(final Cell key, final NavigableSet<Cell> set) {
304    Cell result = null;
305    SortedSet<Cell> tail = key == null ? set : set.tailSet(key);
306    // Iterate until we fall into the next row; i.e. move off current row
307    for (Cell cell : tail) {
308      if (comparator.compareRows(cell, key) <= 0) {
309        continue;
310      }
311      // Note: Not suppressing deletes or expired cells. Needs to be handled
312      // by higher up functions.
313      result = cell;
314      break;
315    }
316    return result;
317  }
318
319  /**
320   * If the segment has a memory allocator the cell is being cloned to this space, and returned;
321   * Otherwise the given cell is returned When a cell's size is too big (bigger than maxAlloc), it
322   * is not allocated on MSLAB. Since the process of flattening to CellChunkMap assumes that all
323   * cells are allocated on MSLAB, during this process, the input parameter forceCloneOfBigCell is
324   * set to 'true' and the cell is copied into MSLAB.
325   * @param cell                the cell to clone
326   * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap.
327   * @return either the given cell or its clone
328   */
329  private Cell maybeCloneWithAllocator(MutableSegment currentActive, Cell cell,
330    boolean forceCloneOfBigCell) {
331    return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
332  }
333
334  /*
335   * Internal version of add() that doesn't clone Cells with the allocator, and doesn't take the
336   * lock. Callers should ensure they already have the read lock taken
337   * @param toAdd the cell to add
338   * @param mslabUsed whether using MSLAB
339   * @param memstoreSizing object to accumulate changed size
340   */
341  private void internalAdd(MutableSegment currentActive, final Cell toAdd, final boolean mslabUsed,
342    MemStoreSizing memstoreSizing) {
343    boolean sizeAddedPreOperation = sizeAddedPreOperation();
344    currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation);
345    setOldestEditTimeToNow();
346  }
347
348  protected abstract boolean sizeAddedPreOperation();
349
350  private void setOldestEditTimeToNow() {
351    if (timeOfOldestEdit == Long.MAX_VALUE) {
352      timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
353    }
354  }
355
356  /**
357   * Returns The total size of cells in this memstore. We will not consider cells in the snapshot
358   */
359  protected abstract long keySize();
360
361  /**
362   * @return The total heap size of cells in this memstore. We will not consider cells in the
363   *         snapshot
364   */
365  protected abstract long heapSize();
366
367  protected CellComparator getComparator() {
368    return comparator;
369  }
370
371  MutableSegment getActive() {
372    return active;
373  }
374
375  ImmutableSegment getSnapshot() {
376    return snapshot;
377  }
378
379  @Override
380  public void close() {
381    // active should never be null
382    active.close();
383    // for snapshot, either it is empty, where we do not reference any real segment which contains a
384    // memstore lab, or it is during snapshot, where we will clear it when calling clearSnapshot, so
385    // we do not need to close it here
386  }
387
388  /** Returns an ordered list of segments from most recent to oldest in memstore */
389  protected abstract List<Segment> getSegments() throws IOException;
390
391}