001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.NavigableSet;
024import java.util.SortedSet;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellComparator;
029import org.apache.hadoop.hbase.ExtendedCell;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.ClassSize;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036
037/**
038 * An abstract class, which implements the behaviour shared by all concrete memstore instances.
039 */
040@InterfaceAudience.Private
041public abstract class AbstractMemStore implements MemStore {
042
043  private static final long NO_SNAPSHOT_ID = -1;
044
045  private final Configuration conf;
046  private final CellComparator comparator;
047
048  // active segment absorbs write operations
049  private volatile MutableSegment active;
050  // Snapshot of memstore.  Made for flusher.
051  protected volatile ImmutableSegment snapshot;
052  protected volatile long snapshotId;
053  // Used to track when to flush
054  private volatile long timeOfOldestEdit;
055
056  protected RegionServicesForStores regionServices;
057
058  public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT
059          + (5 * ClassSize.REFERENCE)
060          + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit
061
062  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
063
064  public static void addToScanners(List<? extends Segment> segments, long readPt,
065      List<KeyValueScanner> scanners) {
066    for (Segment item : segments) {
067      addToScanners(item, readPt, scanners);
068    }
069  }
070
071  protected static void addToScanners(Segment segment, long readPt,
072      List<KeyValueScanner> scanners) {
073    scanners.add(segment.getScanner(readPt));
074  }
075
076  protected AbstractMemStore(final Configuration conf, final CellComparator c,
077      final RegionServicesForStores regionServices) {
078    this.conf = conf;
079    this.comparator = c;
080    this.regionServices = regionServices;
081    resetActive();
082    resetTimeOfOldestEdit();
083    this.snapshot = SegmentFactory.instance().createImmutableSegment(c);
084    this.snapshotId = NO_SNAPSHOT_ID;
085  }
086
087  protected void resetActive() {
088    // Record the MutableSegment' heap overhead when initialing
089    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
090    // Reset heap to not include any keys
091    active = SegmentFactory.instance().createMutableSegment(conf, comparator, memstoreAccounting);
092    // regionServices can be null when testing
093    if (regionServices != null) {
094      regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
095        memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
096        memstoreAccounting.getCellsCount());
097    }
098  }
099
100  protected void resetTimeOfOldestEdit() {
101    this.timeOfOldestEdit = Long.MAX_VALUE;
102  }
103
104  /**
105   * Updates the wal with the lowest sequence id (oldest entry) that is still in memory
106   * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
107   *                      only if it is greater than the previous sequence id
108   */
109  public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
110
111  @Override
112  public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
113    for (Cell cell : cells) {
114      add(cell, memstoreSizing);
115    }
116  }
117
118  @Override
119  public void add(Cell cell, MemStoreSizing memstoreSizing) {
120    doAddOrUpsert(cell, 0, memstoreSizing, true);  }
121
122  /*
123   * Inserts the specified Cell into MemStore and deletes any existing
124   * versions of the same row/family/qualifier as the specified Cell.
125   * <p>
126   * First, the specified Cell is inserted into the Memstore.
127   * <p>
128   * If there are any existing Cell in this MemStore with the same row,
129   * family, and qualifier, they are removed.
130   * <p>
131   * Callers must hold the read lock.
132   *
133   * @param cell the cell to be updated
134   * @param readpoint readpoint below which we can safely remove duplicate KVs
135   * @param memstoreSizing object to accumulate changed size
136   */
137  private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
138    doAddOrUpsert(cell, readpoint, memstoreSizing, false);
139  }
140
141  private void doAddOrUpsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing, boolean
142      doAdd) {
143    MutableSegment currentActive;
144    boolean succ = false;
145    while (!succ) {
146      currentActive = getActive();
147      succ = preUpdate(currentActive, cell, memstoreSizing);
148      if (succ) {
149        if(doAdd) {
150          doAdd(currentActive, cell, memstoreSizing);
151        } else {
152          doUpsert(currentActive, cell, readpoint, memstoreSizing);
153        }
154        postUpdate(currentActive);
155      }
156    }
157  }
158
159  private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
160    Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
161    boolean mslabUsed = (toAdd != cell);
162    // This cell data is backed by the same byte[] where we read request in RPC(See
163    // HBASE-15180). By default MSLAB is ON and we might have copied cell to MSLAB area. If
164    // not we must do below deep copy. Or else we will keep referring to the bigger chunk of
165    // memory and prevent it from getting GCed.
166    // Copy to MSLAB would not have happened if
167    // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled"
168    // 2. When the size of the cell is bigger than the max size supported by MSLAB. See
169    // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB
170    // 3. When cells are from Append/Increment operation.
171    if (!mslabUsed) {
172      toAdd = deepCopyIfNeeded(toAdd);
173    }
174    internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing);
175  }
176
177  private void doUpsert(MutableSegment currentActive, Cell cell, long readpoint, MemStoreSizing
178      memstoreSizing) {
179    // Add the Cell to the MemStore
180    // Use the internalAdd method here since we (a) already have a lock
181    // and (b) cannot safely use the MSLAB here without potentially
182    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
183    // test that triggers the pathological case if we don't avoid MSLAB
184    // here.
185    // This cell data is backed by the same byte[] where we read request in RPC(See
186    // HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger
187    // chunk of memory and prevent it from getting GCed.
188    cell = deepCopyIfNeeded(cell);
189    boolean sizeAddedPreOperation = sizeAddedPreOperation();
190    currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation);
191    setOldestEditTimeToNow();
192  }
193
194    /**
195     * Issue any synchronization and test needed before applying the update
196     * @param currentActive the segment to be updated
197     * @param cell the cell to be added
198     * @param memstoreSizing object to accumulate region size changes
199     * @return true iff can proceed with applying the update
200     */
201  protected abstract boolean preUpdate(MutableSegment currentActive, Cell cell,
202      MemStoreSizing memstoreSizing);
203
204  /**
205   * Issue any post update synchronization and tests
206   * @param currentActive updated segment
207   */
208  protected abstract void postUpdate(MutableSegment currentActive);
209
210  private static Cell deepCopyIfNeeded(Cell cell) {
211    if (cell instanceof ExtendedCell) {
212      return ((ExtendedCell) cell).deepClone();
213    }
214    return cell;
215  }
216
217  @Override
218  public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) {
219    for (Cell cell : cells) {
220      upsert(cell, readpoint, memstoreSizing);
221    }
222  }
223
224  /**
225   * @return Oldest timestamp of all the Cells in the MemStore
226   */
227  @Override
228  public long timeOfOldestEdit() {
229    return timeOfOldestEdit;
230  }
231
232  /**
233   * The passed snapshot was successfully persisted; it can be let go.
234   * @param id Id of the snapshot to clean out.
235   * @see MemStore#snapshot()
236   */
237  @Override
238  public void clearSnapshot(long id) throws UnexpectedStateException {
239    if (this.snapshotId == -1) return;  // already cleared
240    if (this.snapshotId != id) {
241      throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
242          + id);
243    }
244    // OK. Passed in snapshot is same as current snapshot. If not-empty,
245    // create a new snapshot and let the old one go.
246    Segment oldSnapshot = this.snapshot;
247    if (!this.snapshot.isEmpty()) {
248      this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);
249    }
250    this.snapshotId = NO_SNAPSHOT_ID;
251    oldSnapshot.close();
252  }
253
254  @Override
255  public MemStoreSize getSnapshotSize() {
256    return this.snapshot.getMemStoreSize();
257  }
258
259  @Override
260  public String toString() {
261    StringBuilder buf = new StringBuilder();
262    int i = 1;
263    try {
264      for (Segment segment : getSegments()) {
265        buf.append("Segment (").append(i).append(") ").append(segment.toString()).append("; ");
266        i++;
267      }
268    } catch (IOException e){
269      return e.toString();
270    }
271    return buf.toString();
272  }
273
274  protected Configuration getConfiguration() {
275    return conf;
276  }
277
278  protected void dump(Logger log) {
279    getActive().dump(log);
280    snapshot.dump(log);
281  }
282
283
284  /*
285   * @param a
286   * @param b
287   * @return Return lowest of a or b or null if both a and b are null
288   */
289  protected Cell getLowest(final Cell a, final Cell b) {
290    if (a == null) {
291      return b;
292    }
293    if (b == null) {
294      return a;
295    }
296    return comparator.compareRows(a, b) <= 0? a: b;
297  }
298
299  /*
300   * @param key Find row that follows this one.  If null, return first.
301   * @param set Set to look in for a row beyond <code>row</code>.
302   * @return Next row or null if none found.  If one found, will be a new
303   * KeyValue -- can be destroyed by subsequent calls to this method.
304   */
305  protected Cell getNextRow(final Cell key,
306      final NavigableSet<Cell> set) {
307    Cell result = null;
308    SortedSet<Cell> tail = key == null? set: set.tailSet(key);
309    // Iterate until we fall into the next row; i.e. move off current row
310    for (Cell cell: tail) {
311      if (comparator.compareRows(cell, key) <= 0) {
312        continue;
313      }
314      // Note: Not suppressing deletes or expired cells.  Needs to be handled
315      // by higher up functions.
316      result = cell;
317      break;
318    }
319    return result;
320  }
321
322  /**
323   * If the segment has a memory allocator the cell is being cloned to this space, and returned;
324   * Otherwise the given cell is returned
325   *
326   * When a cell's size is too big (bigger than maxAlloc), it is not allocated on MSLAB.
327   * Since the process of flattening to CellChunkMap assumes that all cells are allocated on MSLAB,
328   * during this process, the input parameter forceCloneOfBigCell is set to 'true'
329   * and the cell is copied into MSLAB.
330   *
331   * @param cell the cell to clone
332   * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap.
333   * @return either the given cell or its clone
334   */
335  private Cell maybeCloneWithAllocator(MutableSegment currentActive, Cell cell, boolean
336      forceCloneOfBigCell) {
337    return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
338  }
339
340  /*
341   * Internal version of add() that doesn't clone Cells with the
342   * allocator, and doesn't take the lock.
343   *
344   * Callers should ensure they already have the read lock taken
345   * @param toAdd the cell to add
346   * @param mslabUsed whether using MSLAB
347   * @param memstoreSizing object to accumulate changed size
348   */
349  private void internalAdd(MutableSegment currentActive, final Cell toAdd, final boolean
350      mslabUsed, MemStoreSizing memstoreSizing) {
351    boolean sizeAddedPreOperation = sizeAddedPreOperation();
352    currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation);
353    setOldestEditTimeToNow();
354  }
355
356  protected abstract boolean sizeAddedPreOperation();
357
358  private void setOldestEditTimeToNow() {
359    if (timeOfOldestEdit == Long.MAX_VALUE) {
360      timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
361    }
362  }
363
364  /**
365   * @return The total size of cells in this memstore. We will not consider cells in the snapshot
366   */
367  protected abstract long keySize();
368
369  /**
370   * @return The total heap size of cells in this memstore. We will not consider cells in the
371   *         snapshot
372   */
373  protected abstract long heapSize();
374
375  protected CellComparator getComparator() {
376    return comparator;
377  }
378
379  MutableSegment getActive() {
380    return active;
381  }
382
383  ImmutableSegment getSnapshot() {
384    return snapshot;
385  }
386
387  /**
388   * @return an ordered list of segments from most recent to oldest in memstore
389   */
390  protected abstract List<Segment> getSegments() throws IOException;
391
392}