001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.util.Collections;
021import java.util.Iterator;
022import java.util.List;
023import java.util.Objects;
024import java.util.SortedSet;
025import java.util.concurrent.atomic.AtomicReference;
026import java.util.concurrent.locks.ReentrantReadWriteLock;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellComparator;
029import org.apache.hadoop.hbase.ExtendedCell;
030import org.apache.hadoop.hbase.KeyValue;
031import org.apache.hadoop.hbase.io.TimeRange;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.ClassSize;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036
037/**
038 * This is an abstraction of a segment maintained in a memstore, e.g., the active cell set or its
039 * snapshot. This abstraction facilitates the management of the compaction pipeline and the shifts
040 * of these segments from active set to snapshot set in the default implementation.
041 */
042@InterfaceAudience.Private
043public abstract class Segment implements MemStoreSizing {
044
045  public final static long FIXED_OVERHEAD =
046    ClassSize.align(ClassSize.OBJECT + 6 * ClassSize.REFERENCE // cellSet, comparator, updatesLock,
047                                                               // memStoreLAB, memStoreSizing,
048                                                               // and timeRangeTracker
049      + Bytes.SIZEOF_LONG // minSequenceId
050      + Bytes.SIZEOF_BOOLEAN); // tagsPresent
051  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_REFERENCE
052    + ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG + ClassSize.REENTRANT_LOCK;
053
054  private AtomicReference<CellSet<ExtendedCell>> cellSet = new AtomicReference<>();
055  private final CellComparator comparator;
056  private ReentrantReadWriteLock updatesLock;
057  protected long minSequenceId;
058  private MemStoreLAB memStoreLAB;
059  // Sum of sizes of all Cells added to this Segment. Cell's HeapSize is considered. This is not
060  // including the heap overhead of this class.
061  protected final MemStoreSizing memStoreSizing;
062  protected final TimeRangeTracker timeRangeTracker;
063  protected volatile boolean tagsPresent;
064
065  // Empty constructor to be used when Segment is used as interface,
066  // and there is no need in true Segments state
067  protected Segment(CellComparator comparator, TimeRangeTracker trt) {
068    this.comparator = comparator;
069    // Do we need to be thread safe always? What if ImmutableSegment?
070    // DITTO for the TimeRangeTracker below.
071    this.memStoreSizing = new ThreadSafeMemStoreSizing();
072    this.timeRangeTracker = trt;
073  }
074
075  protected Segment(CellComparator comparator, List<ImmutableSegment> segments,
076    TimeRangeTracker trt) {
077    long dataSize = 0;
078    long heapSize = 0;
079    long OffHeapSize = 0;
080    int cellsCount = 0;
081    for (Segment segment : segments) {
082      MemStoreSize memStoreSize = segment.getMemStoreSize();
083      dataSize += memStoreSize.getDataSize();
084      heapSize += memStoreSize.getHeapSize();
085      OffHeapSize += memStoreSize.getOffHeapSize();
086      cellsCount += memStoreSize.getCellsCount();
087    }
088    this.comparator = comparator;
089    this.updatesLock = new ReentrantReadWriteLock();
090    // Do we need to be thread safe always? What if ImmutableSegment?
091    // DITTO for the TimeRangeTracker below.
092    this.memStoreSizing = new ThreadSafeMemStoreSizing(dataSize, heapSize, OffHeapSize, cellsCount);
093    this.timeRangeTracker = trt;
094  }
095
096  // This constructor is used to create empty Segments.
097  protected Segment(CellSet<ExtendedCell> cellSet, CellComparator comparator,
098    MemStoreLAB memStoreLAB, TimeRangeTracker trt) {
099    this.cellSet.set(cellSet);
100    this.comparator = comparator;
101    this.updatesLock = new ReentrantReadWriteLock();
102    this.minSequenceId = Long.MAX_VALUE;
103    this.memStoreLAB = memStoreLAB;
104    // Do we need to be thread safe always? What if ImmutableSegment?
105    // DITTO for the TimeRangeTracker below.
106    this.memStoreSizing = new ThreadSafeMemStoreSizing();
107    this.tagsPresent = false;
108    this.timeRangeTracker = trt;
109  }
110
111  protected Segment(Segment segment) {
112    this.cellSet.set(segment.getCellSet());
113    this.comparator = segment.getComparator();
114    this.updatesLock = segment.getUpdatesLock();
115    this.minSequenceId = segment.getMinSequenceId();
116    this.memStoreLAB = segment.getMemStoreLAB();
117    this.memStoreSizing = segment.memStoreSizing;
118    this.tagsPresent = segment.isTagsPresent();
119    this.timeRangeTracker = segment.getTimeRangeTracker();
120  }
121
122  /**
123   * Creates the scanner for the given read point
124   * @return a scanner for the given read point
125   */
126  protected KeyValueScanner getScanner(long readPoint) {
127    return new SegmentScanner(this, readPoint);
128  }
129
130  public List<KeyValueScanner> getScanners(long readPoint) {
131    return Collections.singletonList(new SegmentScanner(this, readPoint));
132  }
133
134  /** Returns whether the segment has any cells */
135  public boolean isEmpty() {
136    return getCellSet().isEmpty();
137  }
138
139  /**
140   * Closing a segment before it is being discarded
141   */
142  public void close() {
143    if (this.memStoreLAB != null) {
144      this.memStoreLAB.close();
145    }
146    // do not set MSLab to null as scanners may still be reading the data here and need to decrease
147    // the counter when they finish
148  }
149
150  /**
151   * If the segment has a memory allocator the cell is being cloned to this space, and returned;
152   * otherwise the given cell is returned When a cell's size is too big (bigger than maxAlloc), it
153   * is not allocated on MSLAB. Since the process of flattening to CellChunkMap assumes that all
154   * cells are allocated on MSLAB, during this process, the input parameter forceCloneOfBigCell is
155   * set to 'true' and the cell is copied into MSLAB.
156   * @return either the given cell or its clone
157   */
158  public ExtendedCell maybeCloneWithAllocator(ExtendedCell cell, boolean forceCloneOfBigCell) {
159    if (this.memStoreLAB == null) {
160      return cell;
161    }
162
163    ExtendedCell cellFromMslab;
164    if (forceCloneOfBigCell) {
165      cellFromMslab = this.memStoreLAB.forceCopyOfBigCellInto(cell);
166    } else {
167      cellFromMslab = this.memStoreLAB.copyCellInto(cell);
168    }
169    return (cellFromMslab != null) ? cellFromMslab : cell;
170  }
171
172  /**
173   * Get cell length after serialized in {@link KeyValue}
174   */
175  static int getCellLength(Cell cell) {
176    return cell.getSerializedSize();
177  }
178
179  public boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS) {
180    return !isEmpty() && (tr.isAllTime() || timeRangeTracker.includesTimeRange(tr))
181      && timeRangeTracker.getMax() >= oldestUnexpiredTS;
182  }
183
184  public boolean isTagsPresent() {
185    return tagsPresent;
186  }
187
188  public void incScannerCount() {
189    if (this.memStoreLAB != null) {
190      this.memStoreLAB.incScannerCount();
191    }
192  }
193
194  public void decScannerCount() {
195    if (this.memStoreLAB != null) {
196      this.memStoreLAB.decScannerCount();
197    }
198  }
199
200  /**
201   * Setting the CellSet of the segment - used only for flat immutable segment for setting immutable
202   * CellSet after its creation in immutable segment constructor
203   * @return this object
204   */
205
206  protected Segment setCellSet(CellSet<ExtendedCell> cellSetOld, CellSet<ExtendedCell> cellSetNew) {
207    this.cellSet.compareAndSet(cellSetOld, cellSetNew);
208    return this;
209  }
210
211  @Override
212  public MemStoreSize getMemStoreSize() {
213    return this.memStoreSizing.getMemStoreSize();
214  }
215
216  @Override
217  public long getDataSize() {
218    return this.memStoreSizing.getDataSize();
219  }
220
221  @Override
222  public long getHeapSize() {
223    return this.memStoreSizing.getHeapSize();
224  }
225
226  @Override
227  public long getOffHeapSize() {
228    return this.memStoreSizing.getOffHeapSize();
229  }
230
231  @Override
232  public int getCellsCount() {
233    return memStoreSizing.getCellsCount();
234  }
235
236  @Override
237  public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead, int cellsCount) {
238    return this.memStoreSizing.incMemStoreSize(delta, heapOverhead, offHeapOverhead, cellsCount);
239  }
240
241  public boolean sharedLock() {
242    return updatesLock.readLock().tryLock();
243  }
244
245  public void sharedUnlock() {
246    updatesLock.readLock().unlock();
247  }
248
249  public void waitForUpdates() {
250    if (!updatesLock.isWriteLocked()) {
251      updatesLock.writeLock().lock();
252    }
253  }
254
255  @Override
256  public boolean compareAndSetDataSize(long expected, long updated) {
257    return memStoreSizing.compareAndSetDataSize(expected, updated);
258  }
259
260  public long getMinSequenceId() {
261    return minSequenceId;
262  }
263
264  public TimeRangeTracker getTimeRangeTracker() {
265    return this.timeRangeTracker;
266  }
267
268  // *** Methods for SegmentsScanner
269  public ExtendedCell last() {
270    return getCellSet().last();
271  }
272
273  public Iterator<ExtendedCell> iterator() {
274    return getCellSet().iterator();
275  }
276
277  public SortedSet<ExtendedCell> headSet(ExtendedCell firstKeyOnRow) {
278    return getCellSet().headSet(firstKeyOnRow);
279  }
280
281  public int compare(Cell left, Cell right) {
282    return getComparator().compare(left, right);
283  }
284
285  public int compareRows(Cell left, Cell right) {
286    return getComparator().compareRows(left, right);
287  }
288
289  /** Returns a set of all cells in the segment */
290  protected CellSet<ExtendedCell> getCellSet() {
291    return cellSet.get();
292  }
293
294  /**
295   * Returns the Cell comparator used by this segment
296   * @return the Cell comparator used by this segment
297   */
298  protected CellComparator getComparator() {
299    return comparator;
300  }
301
302  protected void internalAdd(ExtendedCell cell, boolean mslabUsed, MemStoreSizing memstoreSizing,
303    boolean sizeAddedPreOperation) {
304    boolean succ = getCellSet().add(cell);
305    updateMetaInfo(cell, succ, mslabUsed, memstoreSizing, sizeAddedPreOperation);
306  }
307
308  protected void updateMetaInfo(ExtendedCell cellToAdd, boolean succ, boolean mslabUsed,
309    MemStoreSizing memstoreSizing, boolean sizeAddedPreOperation) {
310    long delta = 0;
311    long cellSize = getCellLength(cellToAdd);
312    int cellsCount = succ ? 1 : 0;
313    // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the
314    // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger
315    // than the counted number)
316    if (succ || mslabUsed) {
317      delta = cellSize;
318    }
319    if (sizeAddedPreOperation) {
320      delta -= cellSize;
321    }
322    long heapSize = heapSizeChange(cellToAdd, succ || mslabUsed);
323    long offHeapSize = offHeapSizeChange(cellToAdd, succ || mslabUsed);
324    incMemStoreSize(delta, heapSize, offHeapSize, cellsCount);
325    if (memstoreSizing != null) {
326      memstoreSizing.incMemStoreSize(delta, heapSize, offHeapSize, cellsCount);
327    }
328    getTimeRangeTracker().includeTimestamp(cellToAdd);
329    minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId());
330    // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
331    // When we use ACL CP or Visibility CP which deals with Tags during
332    // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
333    // parse the byte[] to identify the tags length.
334    if (cellToAdd.getTagsLength() > 0) {
335      tagsPresent = true;
336    }
337  }
338
339  protected void updateMetaInfo(ExtendedCell cellToAdd, boolean succ,
340    MemStoreSizing memstoreSizing) {
341    updateMetaInfo(cellToAdd, succ, (getMemStoreLAB() != null), memstoreSizing, false);
342  }
343
344  /**
345   * @return The increase in heap size because of this cell addition. This includes this cell POJO's
346   *         heap size itself and additional overhead because of addition on to CSLM.
347   */
348  protected long heapSizeChange(Cell cell, boolean allocated) {
349    long res = 0;
350    if (allocated) {
351      boolean onHeap = true;
352      MemStoreLAB memStoreLAB = getMemStoreLAB();
353      if (memStoreLAB != null) {
354        onHeap = memStoreLAB.isOnHeap();
355      }
356      res += indexEntryOnHeapSize(onHeap);
357      if (onHeap) {
358        res += cell.heapSize();
359      }
360      res = ClassSize.align(res);
361    }
362    return res;
363  }
364
365  protected long offHeapSizeChange(Cell cell, boolean allocated) {
366    long res = 0;
367    if (allocated) {
368      boolean offHeap = false;
369      MemStoreLAB memStoreLAB = getMemStoreLAB();
370      if (memStoreLAB != null) {
371        offHeap = memStoreLAB.isOffHeap();
372      }
373      res += indexEntryOffHeapSize(offHeap);
374      if (offHeap) {
375        res += cell.heapSize();
376      }
377      res = ClassSize.align(res);
378    }
379    return res;
380  }
381
382  protected long indexEntryOnHeapSize(boolean onHeap) {
383    // in most cases index is allocated on-heap
384    // override this method when it is not always the case, e.g., in CCM
385    return indexEntrySize();
386  }
387
388  protected long indexEntryOffHeapSize(boolean offHeap) {
389    // in most cases index is allocated on-heap
390    // override this method when it is not always the case, e.g., in CCM
391    return 0;
392  }
393
394  protected abstract long indexEntrySize();
395
396  /**
397   * Returns a subset of the segment cell set, which starts with the given cell
398   * @param firstCell a cell in the segment
399   * @return a subset of the segment cell set, which starts with the given cell
400   */
401  protected SortedSet<ExtendedCell> tailSet(ExtendedCell firstCell) {
402    return getCellSet().tailSet(firstCell);
403  }
404
405  MemStoreLAB getMemStoreLAB() {
406    return memStoreLAB;
407  }
408
409  // Debug methods
410  /**
411   * Dumps all cells of the segment into the given log
412   */
413  void dump(Logger log) {
414    for (Cell cell : getCellSet()) {
415      log.debug(Objects.toString(cell));
416    }
417  }
418
419  @Override
420  public String toString() {
421    String res = "type=" + this.getClass().getSimpleName() + ", ";
422    res += "empty=" + (isEmpty() ? "yes" : "no") + ", ";
423    res += "cellCount=" + getCellsCount() + ", ";
424    res += "cellSize=" + getDataSize() + ", ";
425    res += "totalHeapSize=" + getHeapSize() + ", ";
426    res += "min timestamp=" + timeRangeTracker.getMin() + ", ";
427    res += "max timestamp=" + timeRangeTracker.getMax();
428    return res;
429  }
430
431  private ReentrantReadWriteLock getUpdatesLock() {
432    return updatesLock;
433  }
434}