001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.NavigableSet;
023import java.util.SortedSet;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellComparator;
027import org.apache.hadoop.hbase.ExtendedCell;
028import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.util.ClassSize;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034
035/**
036 * An abstract class, which implements the behaviour shared by all concrete memstore instances.
037 */
038@InterfaceAudience.Private
039public abstract class AbstractMemStore implements MemStore {
040
041  private static final long NO_SNAPSHOT_ID = -1;
042
043  private final Configuration conf;
044  private final CellComparator comparator;
045
046  // active segment absorbs write operations
047  private volatile MutableSegment active;
048  // Snapshot of memstore. Made for flusher.
049  protected volatile ImmutableSegment snapshot;
050  protected volatile long snapshotId;
051  // Used to track when to flush
052  private volatile long timeOfOldestEdit;
053
054  protected RegionServicesForStores regionServices;
055
056  // @formatter:off
057  public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT
058    + (5 * ClassSize.REFERENCE)
059    + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit
060  // @formatter:on
061
062  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
063
064  public static void addToScanners(List<? extends Segment> segments, long readPt,
065    List<KeyValueScanner> scanners) {
066    for (Segment item : segments) {
067      addToScanners(item, readPt, scanners);
068    }
069  }
070
071  protected static void addToScanners(Segment segment, long readPt,
072    List<KeyValueScanner> scanners) {
073    if (!segment.isEmpty()) {
074      scanners.add(segment.getScanner(readPt));
075    }
076  }
077
078  protected AbstractMemStore(final Configuration conf, final CellComparator c,
079    final RegionServicesForStores regionServices) {
080    this.conf = conf;
081    this.comparator = c;
082    this.regionServices = regionServices;
083    resetActive();
084    resetTimeOfOldestEdit();
085    this.snapshot = SegmentFactory.instance().createImmutableSegment(c);
086    this.snapshotId = NO_SNAPSHOT_ID;
087  }
088
089  protected void resetActive() {
090    // Record the MutableSegment' heap overhead when initialing
091    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
092    // Reset heap to not include any keys
093    active = SegmentFactory.instance().createMutableSegment(conf, comparator, memstoreAccounting);
094    // regionServices can be null when testing
095    if (regionServices != null) {
096      regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
097        memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
098        memstoreAccounting.getCellsCount());
099    }
100  }
101
102  protected void resetTimeOfOldestEdit() {
103    this.timeOfOldestEdit = Long.MAX_VALUE;
104  }
105
106  /**
107   * Updates the wal with the lowest sequence id (oldest entry) that is still in memory
108   * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
109   *                         only if it is greater than the previous sequence id
110   */
111  public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
112
113  @Override
114  public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
115    for (Cell cell : cells) {
116      add(cell, memstoreSizing);
117    }
118  }
119
120  @Override
121  public void add(Cell cell, MemStoreSizing memstoreSizing) {
122    doAddOrUpsert(cell, 0, memstoreSizing, true);
123  }
124
125  /*
126   * Inserts the specified Cell into MemStore and deletes any existing versions of the same
127   * row/family/qualifier as the specified Cell. <p> First, the specified Cell is inserted into the
128   * Memstore. <p> If there are any existing Cell in this MemStore with the same row, family, and
129   * qualifier, they are removed. <p> Callers must hold the read lock.
130   * @param cell the cell to be updated
131   * @param readpoint readpoint below which we can safely remove duplicate KVs
132   * @param memstoreSizing object to accumulate changed size
133   */
134  private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
135    doAddOrUpsert(cell, readpoint, memstoreSizing, false);
136  }
137
138  private void doAddOrUpsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing,
139    boolean doAdd) {
140    MutableSegment currentActive;
141    boolean succ = false;
142    while (!succ) {
143      currentActive = getActive();
144      succ = preUpdate(currentActive, cell, memstoreSizing);
145      if (succ) {
146        if (doAdd) {
147          doAdd(currentActive, cell, memstoreSizing);
148        } else {
149          doUpsert(currentActive, cell, readpoint, memstoreSizing);
150        }
151        postUpdate(currentActive);
152      }
153    }
154  }
155
156  protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
157    Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
158    boolean mslabUsed = (toAdd != cell);
159    // This cell data is backed by the same byte[] where we read request in RPC(See
160    // HBASE-15180). By default MSLAB is ON and we might have copied cell to MSLAB area. If
161    // not we must do below deep copy. Or else we will keep referring to the bigger chunk of
162    // memory and prevent it from getting GCed.
163    // Copy to MSLAB would not have happened if
164    // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled"
165    // 2. When the size of the cell is bigger than the max size supported by MSLAB. See
166    // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB
167    // 3. When cells are from Append/Increment operation.
168    if (!mslabUsed) {
169      toAdd = deepCopyIfNeeded(toAdd);
170    }
171    internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing);
172  }
173
174  private void doUpsert(MutableSegment currentActive, Cell cell, long readpoint,
175    MemStoreSizing memstoreSizing) {
176    // Add the Cell to the MemStore
177    // Use the internalAdd method here since we (a) already have a lock
178    // and (b) cannot safely use the MSLAB here without potentially
179    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
180    // test that triggers the pathological case if we don't avoid MSLAB
181    // here.
182    // This cell data is backed by the same byte[] where we read request in RPC(See
183    // HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger
184    // chunk of memory and prevent it from getting GCed.
185    cell = deepCopyIfNeeded(cell);
186    boolean sizeAddedPreOperation = sizeAddedPreOperation();
187    currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation);
188    setOldestEditTimeToNow();
189  }
190
191  /**
192   * Issue any synchronization and test needed before applying the update
193   * @param currentActive  the segment to be updated
194   * @param cell           the cell to be added
195   * @param memstoreSizing object to accumulate region size changes
196   * @return true iff can proceed with applying the update
197   */
198  protected abstract boolean preUpdate(MutableSegment currentActive, Cell cell,
199    MemStoreSizing memstoreSizing);
200
201  /**
202   * Issue any post update synchronization and tests
203   * @param currentActive updated segment
204   */
205  protected abstract void postUpdate(MutableSegment currentActive);
206
207  private static Cell deepCopyIfNeeded(Cell cell) {
208    if (cell instanceof ExtendedCell) {
209      return ((ExtendedCell) cell).deepClone();
210    }
211    return cell;
212  }
213
214  @Override
215  public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) {
216    for (Cell cell : cells) {
217      upsert(cell, readpoint, memstoreSizing);
218    }
219  }
220
221  /**
222   * @return Oldest timestamp of all the Cells in the MemStore
223   */
224  @Override
225  public long timeOfOldestEdit() {
226    return timeOfOldestEdit;
227  }
228
229  /**
230   * This method is protected under {@link HStore#lock} write lock,<br/>
231   * and this method is used by {@link HStore#updateStorefiles} after flushing is completed.<br/>
232   * The passed snapshot was successfully persisted; it can be let go.
233   * @param id Id of the snapshot to clean out.
234   * @see MemStore#snapshot()
235   */
236  @Override
237  public void clearSnapshot(long id) throws UnexpectedStateException {
238    if (this.snapshotId == -1) return; // already cleared
239    if (this.snapshotId != id) {
240      throw new UnexpectedStateException(
241        "Current snapshot id is " + this.snapshotId + ",passed " + id);
242    }
243    // OK. Passed in snapshot is same as current snapshot. If not-empty,
244    // create a new snapshot and let the old one go.
245    doClearSnapShot();
246  }
247
248  protected void doClearSnapShot() {
249    Segment oldSnapshot = this.snapshot;
250    if (!this.snapshot.isEmpty()) {
251      this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);
252    }
253    this.snapshotId = NO_SNAPSHOT_ID;
254    oldSnapshot.close();
255  }
256
257  @Override
258  public MemStoreSize getSnapshotSize() {
259    return this.snapshot.getMemStoreSize();
260  }
261
262  @Override
263  public String toString() {
264    StringBuilder buf = new StringBuilder();
265    int i = 1;
266    try {
267      for (Segment segment : getSegments()) {
268        buf.append("Segment (").append(i).append(") ").append(segment.toString()).append("; ");
269        i++;
270      }
271    } catch (IOException e) {
272      return e.toString();
273    }
274    return buf.toString();
275  }
276
277  protected Configuration getConfiguration() {
278    return conf;
279  }
280
281  protected void dump(Logger log) {
282    getActive().dump(log);
283    snapshot.dump(log);
284  }
285
286  /*
287   * nn * @return Return lowest of a or b or null if both a and b are null
288   */
289  protected Cell getLowest(final Cell a, final Cell b) {
290    if (a == null) {
291      return b;
292    }
293    if (b == null) {
294      return a;
295    }
296    return comparator.compareRows(a, b) <= 0 ? a : b;
297  }
298
299  /*
300   * @param key Find row that follows this one. If null, return first.
301   * @param set Set to look in for a row beyond <code>row</code>.
302   * @return Next row or null if none found. If one found, will be a new KeyValue -- can be
303   * destroyed by subsequent calls to this method.
304   */
305  protected Cell getNextRow(final Cell key, final NavigableSet<Cell> set) {
306    Cell result = null;
307    SortedSet<Cell> tail = key == null ? set : set.tailSet(key);
308    // Iterate until we fall into the next row; i.e. move off current row
309    for (Cell cell : tail) {
310      if (comparator.compareRows(cell, key) <= 0) {
311        continue;
312      }
313      // Note: Not suppressing deletes or expired cells. Needs to be handled
314      // by higher up functions.
315      result = cell;
316      break;
317    }
318    return result;
319  }
320
321  /**
322   * If the segment has a memory allocator the cell is being cloned to this space, and returned;
323   * Otherwise the given cell is returned When a cell's size is too big (bigger than maxAlloc), it
324   * is not allocated on MSLAB. Since the process of flattening to CellChunkMap assumes that all
325   * cells are allocated on MSLAB, during this process, the input parameter forceCloneOfBigCell is
326   * set to 'true' and the cell is copied into MSLAB.
327   * @param cell                the cell to clone
328   * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap.
329   * @return either the given cell or its clone
330   */
331  private Cell maybeCloneWithAllocator(MutableSegment currentActive, Cell cell,
332    boolean forceCloneOfBigCell) {
333    return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
334  }
335
336  /*
337   * Internal version of add() that doesn't clone Cells with the allocator, and doesn't take the
338   * lock. Callers should ensure they already have the read lock taken
339   * @param toAdd the cell to add
340   * @param mslabUsed whether using MSLAB
341   * @param memstoreSizing object to accumulate changed size
342   */
343  private void internalAdd(MutableSegment currentActive, final Cell toAdd, final boolean mslabUsed,
344    MemStoreSizing memstoreSizing) {
345    boolean sizeAddedPreOperation = sizeAddedPreOperation();
346    currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation);
347    setOldestEditTimeToNow();
348  }
349
350  protected abstract boolean sizeAddedPreOperation();
351
352  private void setOldestEditTimeToNow() {
353    if (timeOfOldestEdit == Long.MAX_VALUE) {
354      timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
355    }
356  }
357
358  /**
359   * @return The total size of cells in this memstore. We will not consider cells in the snapshot
360   */
361  protected abstract long keySize();
362
363  /**
364   * @return The total heap size of cells in this memstore. We will not consider cells in the
365   *         snapshot
366   */
367  protected abstract long heapSize();
368
369  protected CellComparator getComparator() {
370    return comparator;
371  }
372
373  MutableSegment getActive() {
374    return active;
375  }
376
377  ImmutableSegment getSnapshot() {
378    return snapshot;
379  }
380
381  /**
382   * @return an ordered list of segments from most recent to oldest in memstore
383   */
384  protected abstract List<Segment> getSegments() throws IOException;
385
386}