001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.LinkedList;
023import java.util.List;
024import java.util.ListIterator;
025import org.apache.hadoop.hbase.util.Bytes;
026import org.apache.hadoop.hbase.util.ClassSize;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. It supports
033 * pushing a segment at the head of the pipeline and removing a segment from the tail when it is
034 * flushed to disk. It also supports swap method to allow the in-memory compaction swap a subset of
035 * the segments at the tail of the pipeline with a new (compacted) one. This swap succeeds only if
036 * the version number passed with the list of segments to swap is the same as the current version of
037 * the pipeline. Essentially, there are two methods which can change the structure of the pipeline:
038 * pushHead() and swap(), the later is used both by a flush to disk and by an in-memory compaction.
039 * The pipeline version is updated by swap(); it allows to identify conflicting operations at the
040 * suffix of the pipeline. The synchronization model is copy-on-write. Methods which change the
041 * structure of the pipeline (pushHead() and swap()) apply their changes in the context of a lock.
042 * They also make a read-only copy of the pipeline's list. Read methods read from a read-only copy.
043 * If a read method accesses the read-only copy more than once it makes a local copy of it to ensure
044 * it accesses the same copy. The methods getVersionedList(), getVersionedTail(), and
045 * flattenOneSegment() are also protected by a lock since they need to have a consistent (atomic)
046 * view of the pipeline list and version number.
047 */
048@InterfaceAudience.Private
049public class CompactionPipeline {
050  private static final Logger LOG = LoggerFactory.getLogger(CompactionPipeline.class);
051
052  public final static long FIXED_OVERHEAD =
053    ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
054  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST);
055
056  private final RegionServicesForStores region;
057  private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
058  // The list is volatile to avoid reading a new allocated reference before the c'tor is executed
059  private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
060  /**
061   * <pre>
062   * Version is volatile to ensure it is atomically read when not using a lock.
063   * To indicate whether the suffix of pipeline changes:
064   * 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
065   *   added at Head, {@link #version} not change.
066   * 2.for {@link CompactionPipeline#swap},{@link #version} increase.
067   * 3.for {@link CompactionPipeline#replaceAtIndex},{@link #version} increase.
068   * </pre>
069   */
070  private volatile long version = 0;
071
072  public CompactionPipeline(RegionServicesForStores region) {
073    this.region = region;
074  }
075
076  public boolean pushHead(MutableSegment segment) {
077    // Record the ImmutableSegment' heap overhead when initialing
078    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
079    ImmutableSegment immutableSegment =
080      SegmentFactory.instance().createImmutableSegment(segment, memstoreAccounting);
081    if (region != null) {
082      region.addMemStoreSize(memstoreAccounting.getDataSize(), memstoreAccounting.getHeapSize(),
083        memstoreAccounting.getOffHeapSize(), memstoreAccounting.getCellsCount());
084    }
085    synchronized (pipeline) {
086      boolean res = addFirst(immutableSegment);
087      readOnlyCopy = new LinkedList<>(pipeline);
088      return res;
089    }
090  }
091
092  public VersionedSegmentsList getVersionedList() {
093    synchronized (pipeline) {
094      return new VersionedSegmentsList(readOnlyCopy, version);
095    }
096  }
097
098  public VersionedSegmentsList getVersionedTail() {
099    synchronized (pipeline) {
100      ArrayList<ImmutableSegment> segmentList = new ArrayList<>();
101      if (!pipeline.isEmpty()) {
102        segmentList.add(0, pipeline.getLast());
103      }
104      return new VersionedSegmentsList(segmentList, version);
105    }
106  }
107
108  /**
109   * Swaps the versioned list at the tail of the pipeline with a new segment. Swapping only if there
110   * were no changes to the suffix of the list since the version list was created.
111   * @param versionedList    suffix of the pipeline to be replaced can be tail or all the pipeline
112   * @param segment          new segment to replace the suffix. Can be null if the suffix just needs
113   *                         to be removed.
114   * @param closeSuffix      whether to close the suffix (to release memory), as part of swapping it
115   *                         out During index merge op this will be false and for compaction it will
116   *                         be true.
117   * @param updateRegionSize whether to update the region size. Update the region size, when the
118   *                         pipeline is swapped as part of in-memory-flush and further
119   *                         merge/compaction. Don't update the region size when the swap is result
120   *                         of the snapshot (flush-to-disk).
121   * @return true iff swapped tail with new segment
122   */
123  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
124      justification = "Increment is done under a synchronize block so safe")
125  public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
126    boolean closeSuffix, boolean updateRegionSize) {
127    if (versionedList.getVersion() != version) {
128      return false;
129    }
130    List<ImmutableSegment> suffix;
131    synchronized (pipeline) {
132      if (versionedList.getVersion() != version) {
133        return false;
134      }
135      suffix = versionedList.getStoreSegments();
136      LOG.debug("Swapping pipeline suffix; before={}, new segment={}",
137        versionedList.getStoreSegments().size(), segment);
138      swapSuffix(suffix, segment, closeSuffix);
139      readOnlyCopy = new LinkedList<>(pipeline);
140      version++;
141    }
142    if (updateRegionSize && region != null) {
143      // update the global memstore size counter
144      long suffixDataSize = getSegmentsKeySize(suffix);
145      long suffixHeapSize = getSegmentsHeapSize(suffix);
146      long suffixOffHeapSize = getSegmentsOffHeapSize(suffix);
147      int suffixCellsCount = getSegmentsCellsCount(suffix);
148      long newDataSize = 0;
149      long newHeapSize = 0;
150      long newOffHeapSize = 0;
151      int newCellsCount = 0;
152      if (segment != null) {
153        newDataSize = segment.getDataSize();
154        newHeapSize = segment.getHeapSize();
155        newOffHeapSize = segment.getOffHeapSize();
156        newCellsCount = segment.getCellsCount();
157      }
158      long dataSizeDelta = suffixDataSize - newDataSize;
159      long heapSizeDelta = suffixHeapSize - newHeapSize;
160      long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
161      int cellsCountDelta = suffixCellsCount - newCellsCount;
162      region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta, -cellsCountDelta);
163      LOG.debug(
164        "Suffix data size={}, new segment data size={}, suffix heap size={},new segment heap "
165          + "size={}  suffix off heap size={}, new segment off heap size={}, suffix cells "
166          + "count={}, new segment cells count={}",
167        suffixDataSize, newDataSize, suffixHeapSize, newHeapSize, suffixOffHeapSize, newOffHeapSize,
168        suffixCellsCount, newCellsCount);
169    }
170    return true;
171  }
172
173  private static long getSegmentsHeapSize(List<? extends Segment> list) {
174    long res = 0;
175    for (Segment segment : list) {
176      res += segment.getHeapSize();
177    }
178    return res;
179  }
180
181  private static long getSegmentsOffHeapSize(List<? extends Segment> list) {
182    long res = 0;
183    for (Segment segment : list) {
184      res += segment.getOffHeapSize();
185    }
186    return res;
187  }
188
189  private static long getSegmentsKeySize(List<? extends Segment> list) {
190    long res = 0;
191    for (Segment segment : list) {
192      res += segment.getDataSize();
193    }
194    return res;
195  }
196
197  private static int getSegmentsCellsCount(List<? extends Segment> list) {
198    int res = 0;
199    for (Segment segment : list) {
200      res += segment.getCellsCount();
201    }
202    return res;
203  }
204
205  /**
206   * If the caller holds the current version, go over the the pipeline and try to flatten each
207   * segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based.
208   * Flattening of the segment that initially is not based on ConcurrentSkipListMap has no effect.
209   * Return after one segment is successfully flatten.
210   * @return true iff a segment was successfully flattened
211   */
212  public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType,
213    MemStoreCompactionStrategy.Action action) {
214
215    if (requesterVersion != version) {
216      LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
217        + requesterVersion + ", actual version: " + version);
218      return false;
219    }
220
221    synchronized (pipeline) {
222      if (requesterVersion != version) {
223        LOG.warn("Segment flattening failed, because versions do not match");
224        return false;
225      }
226      int i = -1;
227      for (ImmutableSegment s : pipeline) {
228        i++;
229        if (s.canBeFlattened()) {
230          s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
231          if (s.isEmpty()) {
232            // after s.waitForUpdates() is called, there is no updates pending,if no cells in s,
233            // we can skip it.
234            continue;
235          }
236          // size to be updated
237          MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
238          ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
239            (CSLMImmutableSegment) s, idxType, newMemstoreAccounting, action);
240          replaceAtIndex(i, newS);
241          if (region != null) {
242            // Update the global memstore size counter upon flattening there is no change in the
243            // data size
244            MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
245            region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
246              mss.getCellsCount());
247          }
248          LOG.debug("Compaction pipeline segment {} flattened", s);
249          return true;
250        }
251      }
252    }
253    // do not update the global memstore size counter and do not increase the version,
254    // because all the cells remain in place
255    return false;
256  }
257
258  public boolean isEmpty() {
259    return readOnlyCopy.isEmpty();
260  }
261
262  public List<? extends Segment> getSegments() {
263    return readOnlyCopy;
264  }
265
266  public long size() {
267    return readOnlyCopy.size();
268  }
269
270  public long getMinSequenceId() {
271    long minSequenceId = Long.MAX_VALUE;
272    LinkedList<? extends Segment> localCopy = readOnlyCopy;
273    if (!localCopy.isEmpty()) {
274      minSequenceId = localCopy.getLast().getMinSequenceId();
275    }
276    return minSequenceId;
277  }
278
279  public MemStoreSize getTailSize() {
280    LinkedList<? extends Segment> localCopy = readOnlyCopy;
281    return localCopy.isEmpty() ? new MemStoreSize() : localCopy.peekLast().getMemStoreSize();
282  }
283
284  public MemStoreSize getPipelineSize() {
285    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
286    LinkedList<? extends Segment> localCopy = readOnlyCopy;
287    for (Segment segment : localCopy) {
288      memStoreSizing.incMemStoreSize(segment.getMemStoreSize());
289    }
290    return memStoreSizing.getMemStoreSize();
291  }
292
293  /**
294   * Must be called under the {@link CompactionPipeline#pipeline} Lock.
295   */
296  private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
297    boolean closeSegmentsInSuffix) {
298    matchAndRemoveSuffixFromPipeline(suffix);
299    if (segment != null) {
300      pipeline.addLast(segment);
301    }
302    // During index merge we won't be closing the segments undergoing the merge. Segment#close()
303    // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
304    // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
305    // compaction, we would have copied the cells data from old MSLAB chunks into a new chunk
306    // created for the result segment. So we can release the chunks associated with the compacted
307    // segments.
308    if (closeSegmentsInSuffix) {
309      for (Segment itemInSuffix : suffix) {
310        itemInSuffix.close();
311      }
312    }
313  }
314
315  /**
316   * Checking that the {@link Segment}s in suffix input parameter is same as the {@link Segment}s in
317   * {@link CompactionPipeline#pipeline} one by one from the last element to the first element of
318   * suffix. If matched, remove suffix from {@link CompactionPipeline#pipeline}. <br/>
319   * Must be called under the {@link CompactionPipeline#pipeline} Lock.
320   */
321  private void matchAndRemoveSuffixFromPipeline(List<? extends Segment> suffix) {
322    if (suffix.isEmpty()) {
323      return;
324    }
325    if (pipeline.size() < suffix.size()) {
326      throw new IllegalStateException(
327        "CODE-BUG:pipleine size:[" + pipeline.size() + "],suffix size:[" + suffix.size()
328          + "],pipeline size must greater than or equals suffix size");
329    }
330
331    ListIterator<? extends Segment> suffixIterator = suffix.listIterator(suffix.size());
332    ListIterator<? extends Segment> pipelineIterator = pipeline.listIterator(pipeline.size());
333    int count = 0;
334    while (suffixIterator.hasPrevious()) {
335      Segment suffixSegment = suffixIterator.previous();
336      Segment pipelineSegment = pipelineIterator.previous();
337      if (suffixSegment != pipelineSegment) {
338        throw new IllegalStateException("CODE-BUG:suffix last:[" + count + "]" + suffixSegment
339          + " is not pipleline segment:[" + pipelineSegment + "]");
340      }
341      count++;
342    }
343
344    for (int index = 1; index <= count; index++) {
345      pipeline.pollLast();
346    }
347
348  }
349
350  // replacing one segment in the pipeline with a new one exactly at the same index
351  // need to be called only within synchronized block
352  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
353      justification = "replaceAtIndex is invoked under a synchronize block so safe")
354  private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
355    pipeline.set(idx, newSegment);
356    readOnlyCopy = new LinkedList<>(pipeline);
357    // the version increment is indeed needed, because the swap uses removeAll() method of the
358    // linked-list that compares the objects to find what to remove.
359    // The flattening changes the segment object completely (creation pattern) and so
360    // swap will not proceed correctly after concurrent flattening.
361    version++;
362  }
363
364  public Segment getTail() {
365    List<? extends Segment> localCopy = getSegments();
366    if (localCopy.isEmpty()) {
367      return null;
368    }
369    return localCopy.get(localCopy.size() - 1);
370  }
371
372  private boolean addFirst(ImmutableSegment segment) {
373    pipeline.addFirst(segment);
374    return true;
375  }
376
377  // debug method
378  private boolean validateSuffixList(LinkedList<ImmutableSegment> suffix) {
379    if (suffix.isEmpty()) {
380      // empty suffix is always valid
381      return true;
382    }
383    Iterator<ImmutableSegment> pipelineBackwardIterator = pipeline.descendingIterator();
384    Iterator<ImmutableSegment> suffixBackwardIterator = suffix.descendingIterator();
385    ImmutableSegment suffixCurrent;
386    ImmutableSegment pipelineCurrent;
387    for (; suffixBackwardIterator.hasNext();) {
388      if (!pipelineBackwardIterator.hasNext()) {
389        // a suffix longer than pipeline is invalid
390        return false;
391      }
392      suffixCurrent = suffixBackwardIterator.next();
393      pipelineCurrent = pipelineBackwardIterator.next();
394      if (suffixCurrent != pipelineCurrent) {
395        // non-matching suffix
396        return false;
397      }
398    }
399    // suffix matches pipeline suffix
400    return true;
401  }
402
403}