001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
022import java.io.IOException;
023import java.util.List;
024import java.util.NavigableSet;
025import java.util.SortedSet;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.ExtendedCell;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.ClassSize;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037
038/**
039 * An abstract class, which implements the behaviour shared by all concrete memstore instances.
040 */
041@InterfaceAudience.Private
042public abstract class AbstractMemStore implements MemStore {
043
044  private static final long NO_SNAPSHOT_ID = -1;
045
046  private final Configuration conf;
047  private final CellComparator comparator;
048
049  // active segment absorbs write operations
050  private volatile MutableSegment active;
051  // Snapshot of memstore.  Made for flusher.
052  protected volatile ImmutableSegment snapshot;
053  protected volatile long snapshotId;
054  // Used to track when to flush
055  private volatile long timeOfOldestEdit;
056
057  protected RegionServicesForStores regionServices;
058
059  public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT
060          + (5 * ClassSize.REFERENCE)
061          + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit
062
063  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
064
065  public static void addToScanners(List<? extends Segment> segments, long readPt,
066      List<KeyValueScanner> scanners) {
067    for (Segment item : segments) {
068      addToScanners(item, readPt, scanners);
069    }
070  }
071
072  protected static void addToScanners(Segment segment, long readPt,
073      List<KeyValueScanner> scanners) {
074    scanners.add(segment.getScanner(readPt));
075  }
076
077  protected AbstractMemStore(final Configuration conf, final CellComparator c,
078      final RegionServicesForStores regionServices) {
079    this.conf = conf;
080    this.comparator = c;
081    this.regionServices = regionServices;
082    resetActive();
083    this.snapshot = SegmentFactory.instance().createImmutableSegment(c);
084    this.snapshotId = NO_SNAPSHOT_ID;
085  }
086
087  protected void resetActive() {
088    // Record the MutableSegment' heap overhead when initialing
089    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
090    // Reset heap to not include any keys
091    active = SegmentFactory.instance().createMutableSegment(conf, comparator, memstoreAccounting);
092    // regionServices can be null when testing
093    if (regionServices != null) {
094      regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
095        memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
096        memstoreAccounting.getCellsCount());
097    }
098    timeOfOldestEdit = Long.MAX_VALUE;
099  }
100
101  /**
102   * Updates the wal with the lowest sequence id (oldest entry) that is still in memory
103   * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
104   *                      only if it is greater than the previous sequence id
105   */
106  public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
107
108  @Override
109  public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
110    for (Cell cell : cells) {
111      add(cell, memstoreSizing);
112    }
113  }
114
115  @Override
116  public void add(Cell cell, MemStoreSizing memstoreSizing) {
117    doAddOrUpsert(cell, 0, memstoreSizing, true);  }
118
119  /*
120   * Inserts the specified Cell into MemStore and deletes any existing
121   * versions of the same row/family/qualifier as the specified Cell.
122   * <p>
123   * First, the specified Cell is inserted into the Memstore.
124   * <p>
125   * If there are any existing Cell in this MemStore with the same row,
126   * family, and qualifier, they are removed.
127   * <p>
128   * Callers must hold the read lock.
129   *
130   * @param cell the cell to be updated
131   * @param readpoint readpoint below which we can safely remove duplicate KVs
132   * @param memstoreSizing object to accumulate changed size
133   */
134  private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
135    doAddOrUpsert(cell, readpoint, memstoreSizing, false);
136  }
137
138  private void doAddOrUpsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing, boolean
139      doAdd) {
140    MutableSegment currentActive;
141    boolean succ = false;
142    while (!succ) {
143      currentActive = getActive();
144      succ = preUpdate(currentActive, cell, memstoreSizing);
145      if (succ) {
146        if(doAdd) {
147          doAdd(currentActive, cell, memstoreSizing);
148        } else {
149          doUpsert(currentActive, cell, readpoint, memstoreSizing);
150        }
151        postUpdate(currentActive);
152      }
153    }
154  }
155
156  private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
157    Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
158    boolean mslabUsed = (toAdd != cell);
159    // This cell data is backed by the same byte[] where we read request in RPC(See
160    // HBASE-15180). By default MSLAB is ON and we might have copied cell to MSLAB area. If
161    // not we must do below deep copy. Or else we will keep referring to the bigger chunk of
162    // memory and prevent it from getting GCed.
163    // Copy to MSLAB would not have happened if
164    // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled"
165    // 2. When the size of the cell is bigger than the max size supported by MSLAB. See
166    // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB
167    // 3. When cells are from Append/Increment operation.
168    if (!mslabUsed) {
169      toAdd = deepCopyIfNeeded(toAdd);
170    }
171    internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing);
172  }
173
174  private void doUpsert(MutableSegment currentActive, Cell cell, long readpoint, MemStoreSizing
175      memstoreSizing) {
176    // Add the Cell to the MemStore
177    // Use the internalAdd method here since we (a) already have a lock
178    // and (b) cannot safely use the MSLAB here without potentially
179    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
180    // test that triggers the pathological case if we don't avoid MSLAB
181    // here.
182    // This cell data is backed by the same byte[] where we read request in RPC(See
183    // HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger
184    // chunk of memory and prevent it from getting GCed.
185    cell = deepCopyIfNeeded(cell);
186    boolean sizeAddedPreOperation = sizeAddedPreOperation();
187    currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation);
188    setOldestEditTimeToNow();
189  }
190
191    /**
192     * Issue any synchronization and test needed before applying the update
193     * @param currentActive the segment to be updated
194     * @param cell the cell to be added
195     * @param memstoreSizing object to accumulate region size changes
196     * @return true iff can proceed with applying the update
197     */
198  protected abstract boolean preUpdate(MutableSegment currentActive, Cell cell,
199      MemStoreSizing memstoreSizing);
200
201  /**
202   * Issue any post update synchronization and tests
203   * @param currentActive updated segment
204   */
205  protected abstract void postUpdate(MutableSegment currentActive);
206
207  private static Cell deepCopyIfNeeded(Cell cell) {
208    if (cell instanceof ExtendedCell) {
209      return ((ExtendedCell) cell).deepClone();
210    }
211    return cell;
212  }
213
214  @Override
215  public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) {
216    for (Cell cell : cells) {
217      upsert(cell, readpoint, memstoreSizing);
218    }
219  }
220
221  /**
222   * @return Oldest timestamp of all the Cells in the MemStore
223   */
224  @Override
225  public long timeOfOldestEdit() {
226    return timeOfOldestEdit;
227  }
228
229  /**
230   * The passed snapshot was successfully persisted; it can be let go.
231   * @param id Id of the snapshot to clean out.
232   * @see MemStore#snapshot()
233   */
234  @Override
235  public void clearSnapshot(long id) throws UnexpectedStateException {
236    if (this.snapshotId == -1) return;  // already cleared
237    if (this.snapshotId != id) {
238      throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
239          + id);
240    }
241    // OK. Passed in snapshot is same as current snapshot. If not-empty,
242    // create a new snapshot and let the old one go.
243    Segment oldSnapshot = this.snapshot;
244    if (!this.snapshot.isEmpty()) {
245      this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);
246    }
247    this.snapshotId = NO_SNAPSHOT_ID;
248    oldSnapshot.close();
249  }
250
251  @Override
252  public MemStoreSize getSnapshotSize() {
253    return this.snapshot.getMemStoreSize();
254  }
255
256  @Override
257  public String toString() {
258    StringBuilder buf = new StringBuilder();
259    int i = 1;
260    try {
261      for (Segment segment : getSegments()) {
262        buf.append("Segment (").append(i).append(") ").append(segment.toString()).append("; ");
263        i++;
264      }
265    } catch (IOException e){
266      return e.toString();
267    }
268    return buf.toString();
269  }
270
271  protected Configuration getConfiguration() {
272    return conf;
273  }
274
275  protected void dump(Logger log) {
276    getActive().dump(log);
277    snapshot.dump(log);
278  }
279
280
281  /*
282   * @param a
283   * @param b
284   * @return Return lowest of a or b or null if both a and b are null
285   */
286  protected Cell getLowest(final Cell a, final Cell b) {
287    if (a == null) {
288      return b;
289    }
290    if (b == null) {
291      return a;
292    }
293    return comparator.compareRows(a, b) <= 0? a: b;
294  }
295
296  /*
297   * @param key Find row that follows this one.  If null, return first.
298   * @param set Set to look in for a row beyond <code>row</code>.
299   * @return Next row or null if none found.  If one found, will be a new
300   * KeyValue -- can be destroyed by subsequent calls to this method.
301   */
302  protected Cell getNextRow(final Cell key,
303      final NavigableSet<Cell> set) {
304    Cell result = null;
305    SortedSet<Cell> tail = key == null? set: set.tailSet(key);
306    // Iterate until we fall into the next row; i.e. move off current row
307    for (Cell cell: tail) {
308      if (comparator.compareRows(cell, key) <= 0) {
309        continue;
310      }
311      // Note: Not suppressing deletes or expired cells.  Needs to be handled
312      // by higher up functions.
313      result = cell;
314      break;
315    }
316    return result;
317  }
318
319  /**
320   * If the segment has a memory allocator the cell is being cloned to this space, and returned;
321   * Otherwise the given cell is returned
322   *
323   * When a cell's size is too big (bigger than maxAlloc), it is not allocated on MSLAB.
324   * Since the process of flattening to CellChunkMap assumes that all cells are allocated on MSLAB,
325   * during this process, the input parameter forceCloneOfBigCell is set to 'true'
326   * and the cell is copied into MSLAB.
327   *
328   * @param cell the cell to clone
329   * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap.
330   * @return either the given cell or its clone
331   */
332  private Cell maybeCloneWithAllocator(MutableSegment currentActive, Cell cell, boolean
333      forceCloneOfBigCell) {
334    return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
335  }
336
337  /*
338   * Internal version of add() that doesn't clone Cells with the
339   * allocator, and doesn't take the lock.
340   *
341   * Callers should ensure they already have the read lock taken
342   * @param toAdd the cell to add
343   * @param mslabUsed whether using MSLAB
344   * @param memstoreSizing object to accumulate changed size
345   */
346  private void internalAdd(MutableSegment currentActive, final Cell toAdd, final boolean
347      mslabUsed, MemStoreSizing memstoreSizing) {
348    boolean sizeAddedPreOperation = sizeAddedPreOperation();
349    currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation);
350    setOldestEditTimeToNow();
351  }
352
353  protected abstract boolean sizeAddedPreOperation();
354
355  private void setOldestEditTimeToNow() {
356    if (timeOfOldestEdit == Long.MAX_VALUE) {
357      timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
358    }
359  }
360
361  /**
362   * @return The total size of cells in this memstore. We will not consider cells in the snapshot
363   */
364  protected abstract long keySize();
365
366  /**
367   * @return The total heap size of cells in this memstore. We will not consider cells in the
368   *         snapshot
369   */
370  protected abstract long heapSize();
371
372  protected CellComparator getComparator() {
373    return comparator;
374  }
375
376  @VisibleForTesting
377  MutableSegment getActive() {
378    return active;
379  }
380
381  @VisibleForTesting
382  ImmutableSegment getSnapshot() {
383    return snapshot;
384  }
385
386  /**
387   * @return an ordered list of segments from most recent to oldest in memstore
388   */
389  protected abstract List<Segment> getSegments() throws IOException;
390
391}