001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
022import java.io.IOException;
023import java.util.List;
024import java.util.NavigableSet;
025import java.util.SortedSet;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.ExtendedCell;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.ClassSize;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037
038/**
039 * An abstract class, which implements the behaviour shared by all concrete memstore instances.
040 */
041@InterfaceAudience.Private
042public abstract class AbstractMemStore implements MemStore {
043
044  private static final long NO_SNAPSHOT_ID = -1;
045
046  private final Configuration conf;
047  private final CellComparator comparator;
048
049  // active segment absorbs write operations
050  protected volatile MutableSegment active;
051  // Snapshot of memstore.  Made for flusher.
052  protected volatile ImmutableSegment snapshot;
053  protected volatile long snapshotId;
054  // Used to track when to flush
055  private volatile long timeOfOldestEdit;
056
057  protected RegionServicesForStores regionServices;
058
059  public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT
060          + (5 * ClassSize.REFERENCE)
061          + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit
062
063  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
064
065  public static void addToScanners(List<? extends Segment> segments, long readPt,
066      List<KeyValueScanner> scanners) {
067    for (Segment item : segments) {
068      addToScanners(item, readPt, scanners);
069    }
070  }
071
072  protected static void addToScanners(Segment segment, long readPt,
073      List<KeyValueScanner> scanners) {
074    scanners.add(segment.getScanner(readPt));
075  }
076
077  protected AbstractMemStore(final Configuration conf, final CellComparator c,
078      final RegionServicesForStores regionServices) {
079    this.conf = conf;
080    this.comparator = c;
081    this.regionServices = regionServices;
082    resetActive();
083    this.snapshot = SegmentFactory.instance().createImmutableSegment(c);
084    this.snapshotId = NO_SNAPSHOT_ID;
085  }
086
087  protected void resetActive() {
088    // Record the MutableSegment' heap overhead when initialing
089    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
090    // Reset heap to not include any keys
091    this.active = SegmentFactory.instance()
092        .createMutableSegment(conf, comparator, memstoreAccounting);
093    // regionServices can be null when testing
094    if (regionServices != null) {
095      regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
096        memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
097        memstoreAccounting.getCellsCount());
098    }
099    this.timeOfOldestEdit = Long.MAX_VALUE;
100  }
101
102  /**
103   * Updates the wal with the lowest sequence id (oldest entry) that is still in memory
104   * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
105   *                      only if it is greater than the previous sequence id
106   */
107  public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
108
109  @Override
110  public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
111    for (Cell cell : cells) {
112      add(cell, memstoreSizing);
113    }
114  }
115
116  @Override
117  public void add(Cell cell, MemStoreSizing memstoreSizing) {
118    Cell toAdd = maybeCloneWithAllocator(cell, false);
119    boolean mslabUsed = (toAdd != cell);
120    // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By
121    // default MSLAB is ON and we might have copied cell to MSLAB area. If not we must do below deep
122    // copy. Or else we will keep referring to the bigger chunk of memory and prevent it from
123    // getting GCed.
124    // Copy to MSLAB would not have happened if
125    // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled"
126    // 2. When the size of the cell is bigger than the max size supported by MSLAB. See
127    // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB
128    // 3. When cells are from Append/Increment operation.
129    if (!mslabUsed) {
130      toAdd = deepCopyIfNeeded(toAdd);
131    }
132    internalAdd(toAdd, mslabUsed, memstoreSizing);
133  }
134
135  private static Cell deepCopyIfNeeded(Cell cell) {
136    if (cell instanceof ExtendedCell) {
137      return ((ExtendedCell) cell).deepClone();
138    }
139    return cell;
140  }
141
142  @Override
143  public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) {
144    for (Cell cell : cells) {
145      upsert(cell, readpoint, memstoreSizing);
146    }
147  }
148
149  /**
150   * @return Oldest timestamp of all the Cells in the MemStore
151   */
152  @Override
153  public long timeOfOldestEdit() {
154    return timeOfOldestEdit;
155  }
156
157  /**
158   * The passed snapshot was successfully persisted; it can be let go.
159   * @param id Id of the snapshot to clean out.
160   * @see MemStore#snapshot()
161   */
162  @Override
163  public void clearSnapshot(long id) throws UnexpectedStateException {
164    if (this.snapshotId == -1) return;  // already cleared
165    if (this.snapshotId != id) {
166      throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
167          + id);
168    }
169    // OK. Passed in snapshot is same as current snapshot. If not-empty,
170    // create a new snapshot and let the old one go.
171    Segment oldSnapshot = this.snapshot;
172    if (!this.snapshot.isEmpty()) {
173      this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);
174    }
175    this.snapshotId = NO_SNAPSHOT_ID;
176    oldSnapshot.close();
177  }
178
179  @Override
180  public MemStoreSize getSnapshotSize() {
181    return this.snapshot.getMemStoreSize();
182  }
183
184  @Override
185  public String toString() {
186    StringBuilder buf = new StringBuilder();
187    int i = 1;
188    try {
189      for (Segment segment : getSegments()) {
190        buf.append("Segment (").append(i).append(") ").append(segment.toString()).append("; ");
191        i++;
192      }
193    } catch (IOException e){
194      return e.toString();
195    }
196    return buf.toString();
197  }
198
199  protected Configuration getConfiguration() {
200    return conf;
201  }
202
203  protected void dump(Logger log) {
204    active.dump(log);
205    snapshot.dump(log);
206  }
207
208
209  /*
210   * Inserts the specified Cell into MemStore and deletes any existing
211   * versions of the same row/family/qualifier as the specified Cell.
212   * <p>
213   * First, the specified Cell is inserted into the Memstore.
214   * <p>
215   * If there are any existing Cell in this MemStore with the same row,
216   * family, and qualifier, they are removed.
217   * <p>
218   * Callers must hold the read lock.
219   *
220   * @param cell the cell to be updated
221   * @param readpoint readpoint below which we can safely remove duplicate KVs
222   * @param memstoreSize
223   */
224  private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
225    // Add the Cell to the MemStore
226    // Use the internalAdd method here since we (a) already have a lock
227    // and (b) cannot safely use the MSLAB here without potentially
228    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
229    // test that triggers the pathological case if we don't avoid MSLAB
230    // here.
231    // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). We
232    // must do below deep copy. Or else we will keep referring to the bigger chunk of memory and
233    // prevent it from getting GCed.
234    cell = deepCopyIfNeeded(cell);
235    this.active.upsert(cell, readpoint, memstoreSizing);
236    setOldestEditTimeToNow();
237    checkActiveSize();
238  }
239
240  /*
241   * @param a
242   * @param b
243   * @return Return lowest of a or b or null if both a and b are null
244   */
245  protected Cell getLowest(final Cell a, final Cell b) {
246    if (a == null) {
247      return b;
248    }
249    if (b == null) {
250      return a;
251    }
252    return comparator.compareRows(a, b) <= 0? a: b;
253  }
254
255  /*
256   * @param key Find row that follows this one.  If null, return first.
257   * @param set Set to look in for a row beyond <code>row</code>.
258   * @return Next row or null if none found.  If one found, will be a new
259   * KeyValue -- can be destroyed by subsequent calls to this method.
260   */
261  protected Cell getNextRow(final Cell key,
262      final NavigableSet<Cell> set) {
263    Cell result = null;
264    SortedSet<Cell> tail = key == null? set: set.tailSet(key);
265    // Iterate until we fall into the next row; i.e. move off current row
266    for (Cell cell: tail) {
267      if (comparator.compareRows(cell, key) <= 0) {
268        continue;
269      }
270      // Note: Not suppressing deletes or expired cells.  Needs to be handled
271      // by higher up functions.
272      result = cell;
273      break;
274    }
275    return result;
276  }
277
278  /**
279   * If the segment has a memory allocator the cell is being cloned to this space, and returned;
280   * Otherwise the given cell is returned
281   *
282   * When a cell's size is too big (bigger than maxAlloc), it is not allocated on MSLAB.
283   * Since the process of flattening to CellChunkMap assumes that all cells are allocated on MSLAB,
284   * during this process, the input parameter forceCloneOfBigCell is set to 'true'
285   * and the cell is copied into MSLAB.
286   *
287   * @param cell the cell to clone
288   * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap.
289   * @return either the given cell or its clone
290   */
291  private Cell maybeCloneWithAllocator(Cell cell, boolean forceCloneOfBigCell) {
292    return active.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
293  }
294
295  /*
296   * Internal version of add() that doesn't clone Cells with the
297   * allocator, and doesn't take the lock.
298   *
299   * Callers should ensure they already have the read lock taken
300   * @param toAdd the cell to add
301   * @param mslabUsed whether using MSLAB
302   * @param memstoreSize
303   */
304  private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSizing memstoreSizing) {
305    active.add(toAdd, mslabUsed, memstoreSizing);
306    setOldestEditTimeToNow();
307    checkActiveSize();
308  }
309
310  private void setOldestEditTimeToNow() {
311    if (timeOfOldestEdit == Long.MAX_VALUE) {
312      timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
313    }
314  }
315
316  /**
317   * @return The total size of cells in this memstore. We will not consider cells in the snapshot
318   */
319  protected abstract long keySize();
320
321  /**
322   * @return The total heap size of cells in this memstore. We will not consider cells in the
323   *         snapshot
324   */
325  protected abstract long heapSize();
326
327  protected CellComparator getComparator() {
328    return comparator;
329  }
330
331  @VisibleForTesting
332  MutableSegment getActive() {
333    return active;
334  }
335
336  @VisibleForTesting
337  ImmutableSegment getSnapshot() {
338    return snapshot;
339  }
340
341  /**
342   * Check whether anything need to be done based on the current active set size
343   */
344  protected abstract void checkActiveSize();
345
346  /**
347   * @return an ordered list of segments from most recent to oldest in memstore
348   */
349  protected abstract List<Segment> getSegments() throws IOException;
350
351}