001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.NavigableSet;
023import java.util.SortedSet;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellComparator;
027import org.apache.hadoop.hbase.ExtendedCell;
028import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.util.ClassSize;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034
035/**
036 * An abstract class, which implements the behaviour shared by all concrete memstore instances.
037 */
038@InterfaceAudience.Private
039public abstract class AbstractMemStore implements MemStore {
040
041  private static final long NO_SNAPSHOT_ID = -1;
042
043  private final Configuration conf;
044  private final CellComparator comparator;
045
046  // active segment absorbs write operations
047  private volatile MutableSegment active;
048  // Snapshot of memstore. Made for flusher.
049  protected volatile ImmutableSegment snapshot;
050  protected volatile long snapshotId;
051  // Used to track when to flush
052  private volatile long timeOfOldestEdit;
053
054  protected RegionServicesForStores regionServices;
055
056  // @formatter:off
057  public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT
058    + (5 * ClassSize.REFERENCE)
059    + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit
060  // @formatter:on
061
062  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
063
064  public static void addToScanners(List<? extends Segment> segments, long readPt,
065    List<KeyValueScanner> scanners) {
066    for (Segment item : segments) {
067      addToScanners(item, readPt, scanners);
068    }
069  }
070
071  protected static void addToScanners(Segment segment, long readPt,
072    List<KeyValueScanner> scanners) {
073    if (!segment.isEmpty()) {
074      scanners.add(segment.getScanner(readPt));
075    }
076  }
077
078  protected AbstractMemStore(final Configuration conf, final CellComparator c,
079    final RegionServicesForStores regionServices) {
080    this.conf = conf;
081    this.comparator = c;
082    this.regionServices = regionServices;
083    resetActive();
084    resetTimeOfOldestEdit();
085    this.snapshot = SegmentFactory.instance().createImmutableSegment(c);
086    this.snapshotId = NO_SNAPSHOT_ID;
087  }
088
089  protected void resetActive() {
090    // Record the MutableSegment' heap overhead when initialing
091    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
092    // Reset heap to not include any keys
093    active = SegmentFactory.instance().createMutableSegment(conf, comparator, memstoreAccounting);
094    // regionServices can be null when testing
095    if (regionServices != null) {
096      regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
097        memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
098        memstoreAccounting.getCellsCount());
099    }
100  }
101
102  protected void resetTimeOfOldestEdit() {
103    this.timeOfOldestEdit = Long.MAX_VALUE;
104  }
105
106  /**
107   * Updates the wal with the lowest sequence id (oldest entry) that is still in memory
108   * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
109   *                         only if it is greater than the previous sequence id
110   */
111  public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
112
113  @Override
114  public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
115    for (Cell cell : cells) {
116      add(cell, memstoreSizing);
117    }
118  }
119
120  @Override
121  public void add(Cell cell, MemStoreSizing memstoreSizing) {
122    doAddOrUpsert(cell, 0, memstoreSizing, true);
123  }
124
125  /*
126   * Inserts the specified Cell into MemStore and deletes any existing versions of the same
127   * row/family/qualifier as the specified Cell. <p> First, the specified Cell is inserted into the
128   * Memstore. <p> If there are any existing Cell in this MemStore with the same row, family, and
129   * qualifier, they are removed. <p> Callers must hold the read lock.
130   * @param cell the cell to be updated
131   * @param readpoint readpoint below which we can safely remove duplicate KVs
132   * @param memstoreSizing object to accumulate changed size
133   */
134  private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
135    doAddOrUpsert(cell, readpoint, memstoreSizing, false);
136  }
137
138  private void doAddOrUpsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing,
139    boolean doAdd) {
140    MutableSegment currentActive;
141    boolean succ = false;
142    while (!succ) {
143      currentActive = getActive();
144      succ = preUpdate(currentActive, cell, memstoreSizing);
145      if (succ) {
146        if (doAdd) {
147          doAdd(currentActive, cell, memstoreSizing);
148        } else {
149          doUpsert(currentActive, cell, readpoint, memstoreSizing);
150        }
151        postUpdate(currentActive);
152      }
153    }
154  }
155
156  protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
157    Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
158    boolean mslabUsed = (toAdd != cell);
159    // This cell data is backed by the same byte[] where we read request in RPC(See
160    // HBASE-15180). By default MSLAB is ON and we might have copied cell to MSLAB area. If
161    // not we must do below deep copy. Or else we will keep referring to the bigger chunk of
162    // memory and prevent it from getting GCed.
163    // Copy to MSLAB would not have happened if
164    // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled"
165    // 2. When the size of the cell is bigger than the max size supported by MSLAB. See
166    // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB
167    // 3. When cells are from Append/Increment operation.
168    if (!mslabUsed) {
169      toAdd = deepCopyIfNeeded(toAdd);
170    }
171    internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing);
172  }
173
174  private void doUpsert(MutableSegment currentActive, Cell cell, long readpoint,
175    MemStoreSizing memstoreSizing) {
176    // Add the Cell to the MemStore
177    // Use the internalAdd method here since we (a) already have a lock
178    // and (b) cannot safely use the MSLAB here without potentially
179    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
180    // test that triggers the pathological case if we don't avoid MSLAB
181    // here.
182    // This cell data is backed by the same byte[] where we read request in RPC(See
183    // HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger
184    // chunk of memory and prevent it from getting GCed.
185    cell = deepCopyIfNeeded(cell);
186    boolean sizeAddedPreOperation = sizeAddedPreOperation();
187    currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation);
188    setOldestEditTimeToNow();
189  }
190
191  /**
192   * Issue any synchronization and test needed before applying the update
193   * @param currentActive  the segment to be updated
194   * @param cell           the cell to be added
195   * @param memstoreSizing object to accumulate region size changes
196   * @return true iff can proceed with applying the update
197   */
198  protected abstract boolean preUpdate(MutableSegment currentActive, Cell cell,
199    MemStoreSizing memstoreSizing);
200
201  /**
202   * Issue any post update synchronization and tests
203   * @param currentActive updated segment
204   */
205  protected abstract void postUpdate(MutableSegment currentActive);
206
207  private static Cell deepCopyIfNeeded(Cell cell) {
208    if (cell instanceof ExtendedCell) {
209      return ((ExtendedCell) cell).deepClone();
210    }
211    return cell;
212  }
213
214  @Override
215  public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) {
216    for (Cell cell : cells) {
217      upsert(cell, readpoint, memstoreSizing);
218    }
219  }
220
221  /** Returns Oldest timestamp of all the Cells in the MemStore */
222  @Override
223  public long timeOfOldestEdit() {
224    return timeOfOldestEdit;
225  }
226
227  /**
228   * This method is protected under HStore write lock.<br/>
229   * The passed snapshot was successfully persisted; it can be let go.
230   * @param id Id of the snapshot to clean out.
231   * @see MemStore#snapshot()
232   */
233  @Override
234  public void clearSnapshot(long id) throws UnexpectedStateException {
235    if (this.snapshotId == -1) return; // already cleared
236    if (this.snapshotId != id) {
237      throw new UnexpectedStateException(
238        "Current snapshot id is " + this.snapshotId + ",passed " + id);
239    }
240    // OK. Passed in snapshot is same as current snapshot. If not-empty,
241    // create a new snapshot and let the old one go.
242    doClearSnapShot();
243  }
244
245  protected void doClearSnapShot() {
246    Segment oldSnapshot = this.snapshot;
247    if (!this.snapshot.isEmpty()) {
248      this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);
249    }
250    this.snapshotId = NO_SNAPSHOT_ID;
251    oldSnapshot.close();
252  }
253
254  @Override
255  public MemStoreSize getSnapshotSize() {
256    return this.snapshot.getMemStoreSize();
257  }
258
259  @Override
260  public String toString() {
261    StringBuilder buf = new StringBuilder();
262    int i = 1;
263    try {
264      for (Segment segment : getSegments()) {
265        buf.append("Segment (").append(i).append(") ").append(segment.toString()).append("; ");
266        i++;
267      }
268    } catch (IOException e) {
269      return e.toString();
270    }
271    return buf.toString();
272  }
273
274  protected Configuration getConfiguration() {
275    return conf;
276  }
277
278  protected void dump(Logger log) {
279    getActive().dump(log);
280    snapshot.dump(log);
281  }
282
283  /*
284   * nn * @return Return lowest of a or b or null if both a and b are null
285   */
286  protected Cell getLowest(final Cell a, final Cell b) {
287    if (a == null) {
288      return b;
289    }
290    if (b == null) {
291      return a;
292    }
293    return comparator.compareRows(a, b) <= 0 ? a : b;
294  }
295
296  /*
297   * @param key Find row that follows this one. If null, return first.
298   * @param set Set to look in for a row beyond <code>row</code>.
299   * @return Next row or null if none found. If one found, will be a new KeyValue -- can be
300   * destroyed by subsequent calls to this method.
301   */
302  protected Cell getNextRow(final Cell key, final NavigableSet<Cell> set) {
303    Cell result = null;
304    SortedSet<Cell> tail = key == null ? set : set.tailSet(key);
305    // Iterate until we fall into the next row; i.e. move off current row
306    for (Cell cell : tail) {
307      if (comparator.compareRows(cell, key) <= 0) {
308        continue;
309      }
310      // Note: Not suppressing deletes or expired cells. Needs to be handled
311      // by higher up functions.
312      result = cell;
313      break;
314    }
315    return result;
316  }
317
318  /**
319   * If the segment has a memory allocator the cell is being cloned to this space, and returned;
320   * Otherwise the given cell is returned When a cell's size is too big (bigger than maxAlloc), it
321   * is not allocated on MSLAB. Since the process of flattening to CellChunkMap assumes that all
322   * cells are allocated on MSLAB, during this process, the input parameter forceCloneOfBigCell is
323   * set to 'true' and the cell is copied into MSLAB.
324   * @param cell                the cell to clone
325   * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap.
326   * @return either the given cell or its clone
327   */
328  private Cell maybeCloneWithAllocator(MutableSegment currentActive, Cell cell,
329    boolean forceCloneOfBigCell) {
330    return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
331  }
332
333  /*
334   * Internal version of add() that doesn't clone Cells with the allocator, and doesn't take the
335   * lock. Callers should ensure they already have the read lock taken
336   * @param toAdd the cell to add
337   * @param mslabUsed whether using MSLAB
338   * @param memstoreSizing object to accumulate changed size
339   */
340  private void internalAdd(MutableSegment currentActive, final Cell toAdd, final boolean mslabUsed,
341    MemStoreSizing memstoreSizing) {
342    boolean sizeAddedPreOperation = sizeAddedPreOperation();
343    currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation);
344    setOldestEditTimeToNow();
345  }
346
347  protected abstract boolean sizeAddedPreOperation();
348
349  private void setOldestEditTimeToNow() {
350    if (timeOfOldestEdit == Long.MAX_VALUE) {
351      timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
352    }
353  }
354
355  /**
356   * Returns The total size of cells in this memstore. We will not consider cells in the snapshot
357   */
358  protected abstract long keySize();
359
360  /**
361   * @return The total heap size of cells in this memstore. We will not consider cells in the
362   *         snapshot
363   */
364  protected abstract long heapSize();
365
366  protected CellComparator getComparator() {
367    return comparator;
368  }
369
370  MutableSegment getActive() {
371    return active;
372  }
373
374  ImmutableSegment getSnapshot() {
375    return snapshot;
376  }
377
378  /** Returns an ordered list of segments from most recent to oldest in memstore */
379  protected abstract List<Segment> getSegments() throws IOException;
380
381}