001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
022import java.io.IOException;
023import java.util.List;
024import java.util.NavigableSet;
025import java.util.SortedSet;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.ExtendedCell;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.ClassSize;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037
038/**
039 * An abstract class, which implements the behaviour shared by all concrete memstore instances.
040 */
041@InterfaceAudience.Private
042public abstract class AbstractMemStore implements MemStore {
043
044  private static final long NO_SNAPSHOT_ID = -1;
045
046  private final Configuration conf;
047  private final CellComparator comparator;
048
049  // active segment absorbs write operations
050  protected volatile MutableSegment active;
051  // Snapshot of memstore.  Made for flusher.
052  protected volatile ImmutableSegment snapshot;
053  protected volatile long snapshotId;
054  // Used to track when to flush
055  private volatile long timeOfOldestEdit;
056
057  protected RegionServicesForStores regionServices;
058
059  public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT
060          + (5 * ClassSize.REFERENCE)
061          + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit
062
063  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
064
065  public static long addToScanners(List<? extends Segment> segments, long readPt, long order,
066      List<KeyValueScanner> scanners) {
067    for (Segment item : segments) {
068      order = addToScanners(item, readPt, order, scanners);
069    }
070    return order;
071  }
072
073  protected static long addToScanners(Segment segment, long readPt, long order,
074      List<KeyValueScanner> scanners) {
075    scanners.add(segment.getScanner(readPt, order));
076    return order - 1;
077  }
078
079  protected AbstractMemStore(final Configuration conf, final CellComparator c,
080      final RegionServicesForStores regionServices) {
081    this.conf = conf;
082    this.comparator = c;
083    this.regionServices = regionServices;
084    resetActive();
085    this.snapshot = SegmentFactory.instance().createImmutableSegment(c);
086    this.snapshotId = NO_SNAPSHOT_ID;
087  }
088
089  protected void resetActive() {
090    // Record the MutableSegment' heap overhead when initialing
091    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
092    // Reset heap to not include any keys
093    this.active = SegmentFactory.instance()
094        .createMutableSegment(conf, comparator, memstoreAccounting);
095    // regionServices can be null when testing
096    if (regionServices != null) {
097      regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
098        memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
099        memstoreAccounting.getCellsCount());
100    }
101    this.timeOfOldestEdit = Long.MAX_VALUE;
102  }
103
104  /**
105   * Updates the wal with the lowest sequence id (oldest entry) that is still in memory
106   * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
107   *                      only if it is greater than the previous sequence id
108   */
109  public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
110
111  @Override
112  public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
113    for (Cell cell : cells) {
114      add(cell, memstoreSizing);
115    }
116  }
117
118  @Override
119  public void add(Cell cell, MemStoreSizing memstoreSizing) {
120    Cell toAdd = maybeCloneWithAllocator(cell, false);
121    boolean mslabUsed = (toAdd != cell);
122    // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By
123    // default MSLAB is ON and we might have copied cell to MSLAB area. If not we must do below deep
124    // copy. Or else we will keep referring to the bigger chunk of memory and prevent it from
125    // getting GCed.
126    // Copy to MSLAB would not have happened if
127    // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled"
128    // 2. When the size of the cell is bigger than the max size supported by MSLAB. See
129    // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB
130    // 3. When cells are from Append/Increment operation.
131    if (!mslabUsed) {
132      toAdd = deepCopyIfNeeded(toAdd);
133    }
134    internalAdd(toAdd, mslabUsed, memstoreSizing);
135  }
136
137  private static Cell deepCopyIfNeeded(Cell cell) {
138    if (cell instanceof ExtendedCell) {
139      return ((ExtendedCell) cell).deepClone();
140    }
141    return cell;
142  }
143
144  @Override
145  public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) {
146    for (Cell cell : cells) {
147      upsert(cell, readpoint, memstoreSizing);
148    }
149  }
150
151  /**
152   * @return Oldest timestamp of all the Cells in the MemStore
153   */
154  @Override
155  public long timeOfOldestEdit() {
156    return timeOfOldestEdit;
157  }
158
159  /**
160   * The passed snapshot was successfully persisted; it can be let go.
161   * @param id Id of the snapshot to clean out.
162   * @see MemStore#snapshot()
163   */
164  @Override
165  public void clearSnapshot(long id) throws UnexpectedStateException {
166    if (this.snapshotId == -1) return;  // already cleared
167    if (this.snapshotId != id) {
168      throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
169          + id);
170    }
171    // OK. Passed in snapshot is same as current snapshot. If not-empty,
172    // create a new snapshot and let the old one go.
173    Segment oldSnapshot = this.snapshot;
174    if (!this.snapshot.isEmpty()) {
175      this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);
176    }
177    this.snapshotId = NO_SNAPSHOT_ID;
178    oldSnapshot.close();
179  }
180
181  @Override
182  public MemStoreSize getSnapshotSize() {
183    return this.snapshot.getMemStoreSize();
184  }
185
186  @Override
187  public String toString() {
188    StringBuilder buf = new StringBuilder();
189    int i = 1;
190    try {
191      for (Segment segment : getSegments()) {
192        buf.append("Segment (" + i + ") " + segment.toString() + "; ");
193        i++;
194      }
195    } catch (IOException e){
196      return e.toString();
197    }
198    return buf.toString();
199  }
200
201  protected Configuration getConfiguration() {
202    return conf;
203  }
204
205  protected void dump(Logger log) {
206    active.dump(log);
207    snapshot.dump(log);
208  }
209
210
211  /*
212   * Inserts the specified Cell into MemStore and deletes any existing
213   * versions of the same row/family/qualifier as the specified Cell.
214   * <p>
215   * First, the specified Cell is inserted into the Memstore.
216   * <p>
217   * If there are any existing Cell in this MemStore with the same row,
218   * family, and qualifier, they are removed.
219   * <p>
220   * Callers must hold the read lock.
221   *
222   * @param cell the cell to be updated
223   * @param readpoint readpoint below which we can safely remove duplicate KVs
224   * @param memstoreSize
225   */
226  private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
227    // Add the Cell to the MemStore
228    // Use the internalAdd method here since we (a) already have a lock
229    // and (b) cannot safely use the MSLAB here without potentially
230    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
231    // test that triggers the pathological case if we don't avoid MSLAB
232    // here.
233    // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). We
234    // must do below deep copy. Or else we will keep referring to the bigger chunk of memory and
235    // prevent it from getting GCed.
236    cell = deepCopyIfNeeded(cell);
237    this.active.upsert(cell, readpoint, memstoreSizing);
238    setOldestEditTimeToNow();
239    checkActiveSize();
240  }
241
242  /*
243   * @param a
244   * @param b
245   * @return Return lowest of a or b or null if both a and b are null
246   */
247  protected Cell getLowest(final Cell a, final Cell b) {
248    if (a == null) {
249      return b;
250    }
251    if (b == null) {
252      return a;
253    }
254    return comparator.compareRows(a, b) <= 0? a: b;
255  }
256
257  /*
258   * @param key Find row that follows this one.  If null, return first.
259   * @param set Set to look in for a row beyond <code>row</code>.
260   * @return Next row or null if none found.  If one found, will be a new
261   * KeyValue -- can be destroyed by subsequent calls to this method.
262   */
263  protected Cell getNextRow(final Cell key,
264      final NavigableSet<Cell> set) {
265    Cell result = null;
266    SortedSet<Cell> tail = key == null? set: set.tailSet(key);
267    // Iterate until we fall into the next row; i.e. move off current row
268    for (Cell cell: tail) {
269      if (comparator.compareRows(cell, key) <= 0) {
270        continue;
271      }
272      // Note: Not suppressing deletes or expired cells.  Needs to be handled
273      // by higher up functions.
274      result = cell;
275      break;
276    }
277    return result;
278  }
279
280  /**
281   * If the segment has a memory allocator the cell is being cloned to this space, and returned;
282   * Otherwise the given cell is returned
283   *
284   * When a cell's size is too big (bigger than maxAlloc), it is not allocated on MSLAB.
285   * Since the process of flattening to CellChunkMap assumes that all cells are allocated on MSLAB,
286   * during this process, the input parameter forceCloneOfBigCell is set to 'true'
287   * and the cell is copied into MSLAB.
288   *
289   * @param cell the cell to clone
290   * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap.
291   * @return either the given cell or its clone
292   */
293  private Cell maybeCloneWithAllocator(Cell cell, boolean forceCloneOfBigCell) {
294    return active.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
295  }
296
297  /*
298   * Internal version of add() that doesn't clone Cells with the
299   * allocator, and doesn't take the lock.
300   *
301   * Callers should ensure they already have the read lock taken
302   * @param toAdd the cell to add
303   * @param mslabUsed whether using MSLAB
304   * @param memstoreSize
305   */
306  private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSizing memstoreSizing) {
307    active.add(toAdd, mslabUsed, memstoreSizing);
308    setOldestEditTimeToNow();
309    checkActiveSize();
310  }
311
312  private void setOldestEditTimeToNow() {
313    if (timeOfOldestEdit == Long.MAX_VALUE) {
314      timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
315    }
316  }
317
318  /**
319   * @return The total size of cells in this memstore. We will not consider cells in the snapshot
320   */
321  protected abstract long keySize();
322
323  /**
324   * @return The total heap size of cells in this memstore. We will not consider cells in the
325   *         snapshot
326   */
327  protected abstract long heapSize();
328
329  protected CellComparator getComparator() {
330    return comparator;
331  }
332
333  @VisibleForTesting
334  MutableSegment getActive() {
335    return active;
336  }
337
338  @VisibleForTesting
339  ImmutableSegment getSnapshot() {
340    return snapshot;
341  }
342
343  /**
344   * Check whether anything need to be done based on the current active set size
345   */
346  protected abstract void checkActiveSize();
347
348  /**
349   * @return an ordered list of segments from most recent to oldest in memstore
350   */
351  protected abstract List<Segment> getSegments() throws IOException;
352
353}