001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.util.Collections;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Objects;
025import java.util.SortedSet;
026import java.util.concurrent.atomic.AtomicReference;
027import java.util.concurrent.locks.ReentrantReadWriteLock;
028
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.io.TimeRange;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.ClassSize;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037
038/**
039 * This is an abstraction of a segment maintained in a memstore, e.g., the active
040 * cell set or its snapshot.
041 *
042 * This abstraction facilitates the management of the compaction pipeline and the shifts of these
043 * segments from active set to snapshot set in the default implementation.
044 */
045@InterfaceAudience.Private
046public abstract class Segment implements MemStoreSizing {
047
048  public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
049      + 6 * ClassSize.REFERENCE // cellSet, comparator, updatesLock, memStoreLAB, memStoreSizing,
050                                // and timeRangeTracker
051      + Bytes.SIZEOF_LONG // minSequenceId
052      + Bytes.SIZEOF_BOOLEAN); // tagsPresent
053  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_REFERENCE
054      + ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG
055      + ClassSize.REENTRANT_LOCK;
056
057  private AtomicReference<CellSet> cellSet= new AtomicReference<>();
058  private final CellComparator comparator;
059  private ReentrantReadWriteLock updatesLock;
060  protected long minSequenceId;
061  private MemStoreLAB memStoreLAB;
062  // Sum of sizes of all Cells added to this Segment. Cell's HeapSize is considered. This is not
063  // including the heap overhead of this class.
064  protected final MemStoreSizing memStoreSizing;
065  protected final TimeRangeTracker timeRangeTracker;
066  protected volatile boolean tagsPresent;
067
068  // Empty constructor to be used when Segment is used as interface,
069  // and there is no need in true Segments state
070  protected Segment(CellComparator comparator, TimeRangeTracker trt) {
071    this.comparator = comparator;
072    // Do we need to be thread safe always? What if ImmutableSegment?
073    // DITTO for the TimeRangeTracker below.
074    this.memStoreSizing = new ThreadSafeMemStoreSizing();
075    this.timeRangeTracker = trt;
076  }
077
078  protected Segment(CellComparator comparator, List<ImmutableSegment> segments,
079      TimeRangeTracker trt) {
080    long dataSize = 0;
081    long heapSize = 0;
082    long OffHeapSize = 0;
083    int cellsCount = 0;
084    for (Segment segment : segments) {
085      MemStoreSize memStoreSize = segment.getMemStoreSize();
086      dataSize += memStoreSize.getDataSize();
087      heapSize += memStoreSize.getHeapSize();
088      OffHeapSize += memStoreSize.getOffHeapSize();
089      cellsCount += memStoreSize.getCellsCount();
090    }
091    this.comparator = comparator;
092    this.updatesLock = new ReentrantReadWriteLock();
093    // Do we need to be thread safe always? What if ImmutableSegment?
094    // DITTO for the TimeRangeTracker below.
095    this.memStoreSizing = new ThreadSafeMemStoreSizing(dataSize, heapSize, OffHeapSize, cellsCount);
096    this.timeRangeTracker = trt;
097  }
098
099  // This constructor is used to create empty Segments.
100  protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, TimeRangeTracker trt) {
101    this.cellSet.set(cellSet);
102    this.comparator = comparator;
103    this.updatesLock = new ReentrantReadWriteLock();
104    this.minSequenceId = Long.MAX_VALUE;
105    this.memStoreLAB = memStoreLAB;
106    // Do we need to be thread safe always? What if ImmutableSegment?
107    // DITTO for the TimeRangeTracker below.
108    this.memStoreSizing = new ThreadSafeMemStoreSizing();
109    this.tagsPresent = false;
110    this.timeRangeTracker = trt;
111  }
112
113  protected Segment(Segment segment) {
114    this.cellSet.set(segment.getCellSet());
115    this.comparator = segment.getComparator();
116    this.updatesLock = segment.getUpdatesLock();
117    this.minSequenceId = segment.getMinSequenceId();
118    this.memStoreLAB = segment.getMemStoreLAB();
119    this.memStoreSizing = segment.memStoreSizing;
120    this.tagsPresent = segment.isTagsPresent();
121    this.timeRangeTracker = segment.getTimeRangeTracker();
122  }
123
124  /**
125   * Creates the scanner for the given read point
126   * @return a scanner for the given read point
127   */
128  protected KeyValueScanner getScanner(long readPoint) {
129    return new SegmentScanner(this, readPoint);
130  }
131
132  public List<KeyValueScanner> getScanners(long readPoint) {
133    return Collections.singletonList(new SegmentScanner(this, readPoint));
134  }
135
136  /**
137   * @return whether the segment has any cells
138   */
139  public boolean isEmpty() {
140    return getCellSet().isEmpty();
141  }
142
143
144  /**
145   * Closing a segment before it is being discarded
146   */
147  public void close() {
148    if (this.memStoreLAB != null) {
149      this.memStoreLAB.close();
150    }
151    // do not set MSLab to null as scanners may still be reading the data here and need to decrease
152    // the counter when they finish
153  }
154
155  /**
156   * If the segment has a memory allocator the cell is being cloned to this space, and returned;
157   * otherwise the given cell is returned
158   *
159   * When a cell's size is too big (bigger than maxAlloc), it is not allocated on MSLAB.
160   * Since the process of flattening to CellChunkMap assumes that all cells
161   * are allocated on MSLAB, during this process, the input parameter
162   * forceCloneOfBigCell is set to 'true' and the cell is copied into MSLAB.
163   *
164   * @return either the given cell or its clone
165   */
166  public Cell maybeCloneWithAllocator(Cell cell, boolean forceCloneOfBigCell) {
167    if (this.memStoreLAB == null) {
168      return cell;
169    }
170
171    Cell cellFromMslab;
172    if (forceCloneOfBigCell) {
173      cellFromMslab = this.memStoreLAB.forceCopyOfBigCellInto(cell);
174    } else {
175      cellFromMslab = this.memStoreLAB.copyCellInto(cell);
176    }
177    return (cellFromMslab != null) ? cellFromMslab : cell;
178  }
179
180  /**
181   * Get cell length after serialized in {@link KeyValue}
182   */
183  static int getCellLength(Cell cell) {
184    return cell.getSerializedSize();
185  }
186
187  public boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS) {
188    return !isEmpty()
189        && (tr.isAllTime() || timeRangeTracker.includesTimeRange(tr))
190        && timeRangeTracker.getMax() >= oldestUnexpiredTS;
191  }
192
193  public boolean isTagsPresent() {
194    return tagsPresent;
195  }
196
197  public void incScannerCount() {
198    if (this.memStoreLAB != null) {
199      this.memStoreLAB.incScannerCount();
200    }
201  }
202
203  public void decScannerCount() {
204    if (this.memStoreLAB != null) {
205      this.memStoreLAB.decScannerCount();
206    }
207  }
208
209  /**
210   * Setting the CellSet of the segment - used only for flat immutable segment for setting
211   * immutable CellSet after its creation in immutable segment constructor
212   * @return this object
213   */
214
215  protected Segment setCellSet(CellSet cellSetOld, CellSet cellSetNew) {
216    this.cellSet.compareAndSet(cellSetOld, cellSetNew);
217    return this;
218  }
219
220  @Override
221  public MemStoreSize getMemStoreSize() {
222    return this.memStoreSizing.getMemStoreSize();
223  }
224
225  @Override
226  public long getDataSize() {
227    return this.memStoreSizing.getDataSize();
228  }
229
230  @Override
231  public long getHeapSize() {
232    return this.memStoreSizing.getHeapSize();
233  }
234
235  @Override
236  public long getOffHeapSize() {
237    return this.memStoreSizing.getOffHeapSize();
238  }
239
240  @Override
241  public int getCellsCount() {
242    return memStoreSizing.getCellsCount();
243  }
244
245  @Override
246  public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead, int cellsCount) {
247    return this.memStoreSizing.incMemStoreSize(delta, heapOverhead, offHeapOverhead, cellsCount);
248  }
249
250  public boolean sharedLock() {
251    return updatesLock.readLock().tryLock();
252  }
253
254  public void sharedUnlock() {
255    updatesLock.readLock().unlock();
256  }
257
258  public void waitForUpdates() {
259    if(!updatesLock.isWriteLocked()) {
260      updatesLock.writeLock().lock();
261    }
262  }
263
264  @Override
265  public boolean compareAndSetDataSize(long expected, long updated) {
266    return memStoreSizing.compareAndSetDataSize(expected, updated);
267  }
268
269  public long getMinSequenceId() {
270    return minSequenceId;
271  }
272
273  public TimeRangeTracker getTimeRangeTracker() {
274    return this.timeRangeTracker;
275  }
276
277  //*** Methods for SegmentsScanner
278  public Cell last() {
279    return getCellSet().last();
280  }
281
282  public Iterator<Cell> iterator() {
283    return getCellSet().iterator();
284  }
285
286  public SortedSet<Cell> headSet(Cell firstKeyOnRow) {
287    return getCellSet().headSet(firstKeyOnRow);
288  }
289
290  public int compare(Cell left, Cell right) {
291    return getComparator().compare(left, right);
292  }
293
294  public int compareRows(Cell left, Cell right) {
295    return getComparator().compareRows(left, right);
296  }
297
298  /**
299   * @return a set of all cells in the segment
300   */
301  protected CellSet getCellSet() {
302    return cellSet.get();
303  }
304
305  /**
306   * Returns the Cell comparator used by this segment
307   * @return the Cell comparator used by this segment
308   */
309  protected CellComparator getComparator() {
310    return comparator;
311  }
312
313  protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing,
314      boolean sizeAddedPreOperation) {
315    boolean succ = getCellSet().add(cell);
316    updateMetaInfo(cell, succ, mslabUsed, memstoreSizing, sizeAddedPreOperation);
317  }
318
319  protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
320      MemStoreSizing memstoreSizing, boolean sizeAddedPreOperation) {
321    long delta = 0;
322    long cellSize = getCellLength(cellToAdd);
323    int cellsCount = succ ? 1 : 0;
324    // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the
325    // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger
326    // than the counted number)
327    if (succ || mslabUsed) {
328      delta = cellSize;
329    }
330    if (sizeAddedPreOperation) {
331      delta -= cellSize;
332    }
333    long heapSize = heapSizeChange(cellToAdd, succ || mslabUsed);
334    long offHeapSize = offHeapSizeChange(cellToAdd, succ || mslabUsed);
335    incMemStoreSize(delta, heapSize, offHeapSize, cellsCount);
336    if (memstoreSizing != null) {
337      memstoreSizing.incMemStoreSize(delta, heapSize, offHeapSize, cellsCount);
338    }
339    getTimeRangeTracker().includeTimestamp(cellToAdd);
340    minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId());
341    // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
342    // When we use ACL CP or Visibility CP which deals with Tags during
343    // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
344    // parse the byte[] to identify the tags length.
345    if (cellToAdd.getTagsLength() > 0) {
346      tagsPresent = true;
347    }
348  }
349
350  protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemStoreSizing memstoreSizing) {
351    updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSizing, false);
352  }
353
354  /**
355   * @return The increase in heap size because of this cell addition. This includes this cell POJO's
356   *         heap size itself and additional overhead because of addition on to CSLM.
357   */
358  protected long heapSizeChange(Cell cell, boolean allocated) {
359    long res = 0;
360    if (allocated) {
361      boolean onHeap = true;
362      MemStoreLAB memStoreLAB = getMemStoreLAB();
363      if(memStoreLAB != null) {
364        onHeap = memStoreLAB.isOnHeap();
365      }
366      res += indexEntryOnHeapSize(onHeap);
367      if(onHeap) {
368        res += cell.heapSize();
369      }
370      res = ClassSize.align(res);
371    }
372    return res;
373  }
374
375  protected long offHeapSizeChange(Cell cell, boolean allocated) {
376    long res = 0;
377    if (allocated) {
378      boolean offHeap = false;
379      MemStoreLAB memStoreLAB = getMemStoreLAB();
380      if(memStoreLAB != null) {
381        offHeap = memStoreLAB.isOffHeap();
382      }
383      res += indexEntryOffHeapSize(offHeap);
384      if(offHeap) {
385        res += cell.heapSize();
386      }
387      res = ClassSize.align(res);
388    }
389    return res;
390  }
391
392  protected long indexEntryOnHeapSize(boolean onHeap) {
393    // in most cases index is allocated on-heap
394    // override this method when it is not always the case, e.g., in CCM
395    return indexEntrySize();
396  }
397
398  protected long indexEntryOffHeapSize(boolean offHeap) {
399    // in most cases index is allocated on-heap
400    // override this method when it is not always the case, e.g., in CCM
401    return 0;
402  }
403
404  protected abstract long indexEntrySize();
405
406  /**
407   * Returns a subset of the segment cell set, which starts with the given cell
408   * @param firstCell a cell in the segment
409   * @return a subset of the segment cell set, which starts with the given cell
410   */
411  protected SortedSet<Cell> tailSet(Cell firstCell) {
412    return getCellSet().tailSet(firstCell);
413  }
414
415  MemStoreLAB getMemStoreLAB() {
416    return memStoreLAB;
417  }
418
419  // Debug methods
420  /**
421   * Dumps all cells of the segment into the given log
422   */
423  void dump(Logger log) {
424    for (Cell cell: getCellSet()) {
425      log.debug(Objects.toString(cell));
426    }
427  }
428
429  @Override
430  public String toString() {
431    String res = "type=" + this.getClass().getSimpleName() + ", ";
432    res += "empty=" + (isEmpty()? "yes": "no") + ", ";
433    res += "cellCount=" + getCellsCount() + ", ";
434    res += "cellSize=" + getDataSize() + ", ";
435    res += "totalHeapSize=" + getHeapSize() + ", ";
436    res += "min timestamp=" + timeRangeTracker.getMin() + ", ";
437    res += "max timestamp=" + timeRangeTracker.getMax();
438    return res;
439  }
440
441  private ReentrantReadWriteLock getUpdatesLock() {
442    return updatesLock;
443  }
444}