001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.util.ArrayList;
022import java.util.Iterator;
023import java.util.LinkedList;
024import java.util.List;
025
026import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
027import org.apache.hadoop.hbase.util.Bytes;
028import org.apache.hadoop.hbase.util.ClassSize;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
035 * It supports pushing a segment at the head of the pipeline and removing a segment from the
036 * tail when it is flushed to disk.
037 * It also supports swap method to allow the in-memory compaction swap a subset of the segments
038 * at the tail of the pipeline with a new (compacted) one. This swap succeeds only if the version
039 * number passed with the list of segments to swap is the same as the current version of the
040 * pipeline.
041 * Essentially, there are two methods which can change the structure of the pipeline: pushHead()
042 * and swap(), the later is used both by a flush to disk and by an in-memory compaction.
043 * The pipeline version is updated by swap(); it allows to identify conflicting operations at the
044 * suffix of the pipeline.
045 *
046 * The synchronization model is copy-on-write. Methods which change the structure of the
047 * pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also make
048 * a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a read
049 * method accesses the read-only copy more than once it makes a local copy of it
050 * to ensure it accesses the same copy.
051 *
052 * The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() are also
053 * protected by a lock since they need to have a consistent (atomic) view of the pipeline list
054 * and version number.
055 */
056@InterfaceAudience.Private
057public class CompactionPipeline {
058  private static final Logger LOG = LoggerFactory.getLogger(CompactionPipeline.class);
059
060  public final static long FIXED_OVERHEAD = ClassSize
061      .align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
062  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST);
063
064  private final RegionServicesForStores region;
065  private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
066  // The list is volatile to avoid reading a new allocated reference before the c'tor is executed
067  private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
068  // Version is volatile to ensure it is atomically read when not using a lock
069  private volatile long version = 0;
070
071  public CompactionPipeline(RegionServicesForStores region) {
072    this.region = region;
073  }
074
075  public boolean pushHead(MutableSegment segment) {
076    // Record the ImmutableSegment' heap overhead when initialing
077    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
078    ImmutableSegment immutableSegment = SegmentFactory.instance().
079        createImmutableSegment(segment, memstoreAccounting);
080    if (region != null) {
081      region.addMemStoreSize(memstoreAccounting.getDataSize(), memstoreAccounting.getHeapSize(),
082        memstoreAccounting.getOffHeapSize(), memstoreAccounting.getCellsCount());
083    }
084    synchronized (pipeline){
085      boolean res = addFirst(immutableSegment);
086      readOnlyCopy = new LinkedList<>(pipeline);
087      return res;
088    }
089  }
090
091  public VersionedSegmentsList getVersionedList() {
092    synchronized (pipeline){
093      return new VersionedSegmentsList(readOnlyCopy, version);
094    }
095  }
096
097  public VersionedSegmentsList getVersionedTail() {
098    synchronized (pipeline){
099      List<ImmutableSegment> segmentList = new ArrayList<>();
100      if(!pipeline.isEmpty()) {
101        segmentList.add(0, pipeline.getLast());
102      }
103      return new VersionedSegmentsList(segmentList, version);
104    }
105  }
106
107  /**
108   * Swaps the versioned list at the tail of the pipeline with a new segment.
109   * Swapping only if there were no changes to the suffix of the list since the version list was
110   * created.
111   * @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline
112   * @param segment new segment to replace the suffix. Can be null if the suffix just needs to be
113   *                removed.
114   * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out
115   *        During index merge op this will be false and for compaction it will be true.
116   * @param updateRegionSize whether to update the region size. Update the region size,
117   *                         when the pipeline is swapped as part of in-memory-flush and
118   *                         further merge/compaction. Don't update the region size when the
119   *                         swap is result of the snapshot (flush-to-disk).
120   * @return true iff swapped tail with new segment
121   */
122  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
123      justification="Increment is done under a synchronize block so safe")
124  public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
125      boolean closeSuffix, boolean updateRegionSize) {
126    if (versionedList.getVersion() != version) {
127      return false;
128    }
129    List<ImmutableSegment> suffix;
130    synchronized (pipeline){
131      if(versionedList.getVersion() != version) {
132        return false;
133      }
134      suffix = versionedList.getStoreSegments();
135      LOG.debug("Swapping pipeline suffix; before={}, new segment={}",
136          versionedList.getStoreSegments().size(), segment);
137      swapSuffix(suffix, segment, closeSuffix);
138      readOnlyCopy = new LinkedList<>(pipeline);
139      version++;
140    }
141    if (updateRegionSize && region != null) {
142      // update the global memstore size counter
143      long suffixDataSize = getSegmentsKeySize(suffix);
144      long suffixHeapSize = getSegmentsHeapSize(suffix);
145      long suffixOffHeapSize = getSegmentsOffHeapSize(suffix);
146      int suffixCellsCount = getSegmentsCellsCount(suffix);
147      long newDataSize = 0;
148      long newHeapSize = 0;
149      long newOffHeapSize = 0;
150      int newCellsCount = 0;
151      if (segment != null) {
152        newDataSize = segment.getDataSize();
153        newHeapSize = segment.getHeapSize();
154        newOffHeapSize = segment.getOffHeapSize();
155        newCellsCount = segment.getCellsCount();
156      }
157      long dataSizeDelta = suffixDataSize - newDataSize;
158      long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
159      long heapSizeDelta = suffixHeapSize - newHeapSize;
160      int cellsCountDelta = suffixCellsCount - newCellsCount;
161      region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta, -cellsCountDelta);
162      LOG.debug(
163        "Suffix data size={}, new segment data size={}, "
164            + "suffix heap size={}, new segment heap size={}"
165            + "suffix off heap size={}, new segment off heap size={}"
166            + "suffix cells count={}, new cells count={}",
167        suffixDataSize, newDataSize, suffixHeapSize, newHeapSize, suffixOffHeapSize, newOffHeapSize,
168        suffixCellsCount, newCellsCount);
169    }
170    return true;
171  }
172
173  private static long getSegmentsHeapSize(List<? extends Segment> list) {
174    long res = 0;
175    for (Segment segment : list) {
176      res += segment.getHeapSize();
177    }
178    return res;
179  }
180
181  private static long getSegmentsOffHeapSize(List<? extends Segment> list) {
182    long res = 0;
183    for (Segment segment : list) {
184      res += segment.getOffHeapSize();
185    }
186    return res;
187  }
188
189  private static long getSegmentsKeySize(List<? extends Segment> list) {
190    long res = 0;
191    for (Segment segment : list) {
192      res += segment.getDataSize();
193    }
194    return res;
195  }
196
197  private static int getSegmentsCellsCount(List<? extends Segment> list) {
198    int res = 0;
199    for (Segment segment : list) {
200      res += segment.getCellsCount();
201    }
202    return res;
203  }
204
205  /**
206   * If the caller holds the current version, go over the the pipeline and try to flatten each
207   * segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based.
208   * Flattening of the segment that initially is not based on ConcurrentSkipListMap has no effect.
209   * Return after one segment is successfully flatten.
210   *
211   * @return true iff a segment was successfully flattened
212   */
213  public boolean flattenOneSegment(long requesterVersion,
214      CompactingMemStore.IndexType idxType,
215      MemStoreCompactionStrategy.Action action) {
216
217    if(requesterVersion != version) {
218      LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
219          + requesterVersion + ", actual version: " + version);
220      return false;
221    }
222
223    synchronized (pipeline){
224      if(requesterVersion != version) {
225        LOG.warn("Segment flattening failed, because versions do not match");
226        return false;
227      }
228      int i = 0;
229      for (ImmutableSegment s : pipeline) {
230        if ( s.canBeFlattened() ) {
231          // size to be updated
232          MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
233          ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
234              (CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action);
235          replaceAtIndex(i,newS);
236          if (region != null) {
237            // Update the global memstore size counter upon flattening there is no change in the
238            // data size
239            MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
240            Preconditions.checkArgument(mss.getDataSize() == 0, "Not zero!");
241            region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
242              mss.getCellsCount());
243          }
244          LOG.debug("Compaction pipeline segment {} flattened", s);
245          return true;
246        }
247        i++;
248      }
249
250    }
251    // do not update the global memstore size counter and do not increase the version,
252    // because all the cells remain in place
253    return false;
254  }
255
256  public boolean isEmpty() {
257    return readOnlyCopy.isEmpty();
258  }
259
260  public List<? extends Segment> getSegments() {
261    return readOnlyCopy;
262  }
263
264  public long size() {
265    return readOnlyCopy.size();
266  }
267
268  public long getMinSequenceId() {
269    long minSequenceId = Long.MAX_VALUE;
270    LinkedList<? extends Segment> localCopy = readOnlyCopy;
271    if (!localCopy.isEmpty()) {
272      minSequenceId = localCopy.getLast().getMinSequenceId();
273    }
274    return minSequenceId;
275  }
276
277  public MemStoreSize getTailSize() {
278    LinkedList<? extends Segment> localCopy = readOnlyCopy;
279    return localCopy.isEmpty()? new MemStoreSize(): localCopy.peekLast().getMemStoreSize();
280  }
281
282  public MemStoreSize getPipelineSize() {
283    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
284    LinkedList<? extends Segment> localCopy = readOnlyCopy;
285    for (Segment segment : localCopy) {
286      memStoreSizing.incMemStoreSize(segment.getMemStoreSize());
287    }
288    return memStoreSizing.getMemStoreSize();
289  }
290
291  private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
292      boolean closeSegmentsInSuffix) {
293    pipeline.removeAll(suffix);
294    if(segment != null) pipeline.addLast(segment);
295    // During index merge we won't be closing the segments undergoing the merge. Segment#close()
296    // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
297    // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
298    // compaction, we would have copied the cells data from old MSLAB chunks into a new chunk
299    // created for the result segment. So we can release the chunks associated with the compacted
300    // segments.
301    if (closeSegmentsInSuffix) {
302      for (Segment itemInSuffix : suffix) {
303        itemInSuffix.close();
304      }
305    }
306  }
307
308  // replacing one segment in the pipeline with a new one exactly at the same index
309  // need to be called only within synchronized block
310  private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
311    pipeline.set(idx, newSegment);
312    readOnlyCopy = new LinkedList<>(pipeline);
313  }
314
315  public Segment getTail() {
316    List<? extends Segment> localCopy = getSegments();
317    if(localCopy.isEmpty()) {
318      return null;
319    }
320    return localCopy.get(localCopy.size() - 1);
321  }
322
323  private boolean addFirst(ImmutableSegment segment) {
324    pipeline.addFirst(segment);
325    return true;
326  }
327
328  // debug method
329  private boolean validateSuffixList(LinkedList<ImmutableSegment> suffix) {
330    if(suffix.isEmpty()) {
331      // empty suffix is always valid
332      return true;
333    }
334    Iterator<ImmutableSegment> pipelineBackwardIterator = pipeline.descendingIterator();
335    Iterator<ImmutableSegment> suffixBackwardIterator = suffix.descendingIterator();
336    ImmutableSegment suffixCurrent;
337    ImmutableSegment pipelineCurrent;
338    for( ; suffixBackwardIterator.hasNext(); ) {
339      if(!pipelineBackwardIterator.hasNext()) {
340        // a suffix longer than pipeline is invalid
341        return false;
342      }
343      suffixCurrent = suffixBackwardIterator.next();
344      pipelineCurrent = pipelineBackwardIterator.next();
345      if(suffixCurrent != pipelineCurrent) {
346        // non-matching suffix
347        return false;
348      }
349    }
350    // suffix matches pipeline suffix
351    return true;
352  }
353
354}