001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.util.ArrayList;
022import java.util.Iterator;
023import java.util.LinkedList;
024import java.util.List;
025
026import org.apache.hadoop.hbase.util.Bytes;
027import org.apache.hadoop.hbase.util.ClassSize;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
034 * It supports pushing a segment at the head of the pipeline and removing a segment from the
035 * tail when it is flushed to disk.
036 * It also supports swap method to allow the in-memory compaction swap a subset of the segments
037 * at the tail of the pipeline with a new (compacted) one. This swap succeeds only if the version
038 * number passed with the list of segments to swap is the same as the current version of the
039 * pipeline.
040 * Essentially, there are two methods which can change the structure of the pipeline: pushHead()
041 * and swap(), the later is used both by a flush to disk and by an in-memory compaction.
042 * The pipeline version is updated by swap(); it allows to identify conflicting operations at the
043 * suffix of the pipeline.
044 *
045 * The synchronization model is copy-on-write. Methods which change the structure of the
046 * pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also make
047 * a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a read
048 * method accesses the read-only copy more than once it makes a local copy of it
049 * to ensure it accesses the same copy.
050 *
051 * The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() are also
052 * protected by a lock since they need to have a consistent (atomic) view of the pipeline list
053 * and version number.
054 */
055@InterfaceAudience.Private
056public class CompactionPipeline {
057  private static final Logger LOG = LoggerFactory.getLogger(CompactionPipeline.class);
058
059  public final static long FIXED_OVERHEAD = ClassSize
060      .align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
061  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST);
062
063  private final RegionServicesForStores region;
064  private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
065  // The list is volatile to avoid reading a new allocated reference before the c'tor is executed
066  private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
067  // Version is volatile to ensure it is atomically read when not using a lock
068  private volatile long version = 0;
069
070  public CompactionPipeline(RegionServicesForStores region) {
071    this.region = region;
072  }
073
074  public boolean pushHead(MutableSegment segment) {
075    // Record the ImmutableSegment' heap overhead when initialing
076    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
077    ImmutableSegment immutableSegment = SegmentFactory.instance().
078        createImmutableSegment(segment, memstoreAccounting);
079    if (region != null) {
080      region.addMemStoreSize(memstoreAccounting.getDataSize(), memstoreAccounting.getHeapSize(),
081        memstoreAccounting.getOffHeapSize(), memstoreAccounting.getCellsCount());
082    }
083    synchronized (pipeline){
084      boolean res = addFirst(immutableSegment);
085      readOnlyCopy = new LinkedList<>(pipeline);
086      return res;
087    }
088  }
089
090  public VersionedSegmentsList getVersionedList() {
091    synchronized (pipeline){
092      return new VersionedSegmentsList(readOnlyCopy, version);
093    }
094  }
095
096  public VersionedSegmentsList getVersionedTail() {
097    synchronized (pipeline){
098      List<ImmutableSegment> segmentList = new ArrayList<>();
099      if(!pipeline.isEmpty()) {
100        segmentList.add(0, pipeline.getLast());
101      }
102      return new VersionedSegmentsList(segmentList, version);
103    }
104  }
105
106  /**
107   * Swaps the versioned list at the tail of the pipeline with a new segment.
108   * Swapping only if there were no changes to the suffix of the list since the version list was
109   * created.
110   * @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline
111   * @param segment new segment to replace the suffix. Can be null if the suffix just needs to be
112   *                removed.
113   * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out
114   *        During index merge op this will be false and for compaction it will be true.
115   * @param updateRegionSize whether to update the region size. Update the region size,
116   *                         when the pipeline is swapped as part of in-memory-flush and
117   *                         further merge/compaction. Don't update the region size when the
118   *                         swap is result of the snapshot (flush-to-disk).
119   * @return true iff swapped tail with new segment
120   */
121  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
122      justification="Increment is done under a synchronize block so safe")
123  public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
124      boolean closeSuffix, boolean updateRegionSize) {
125    if (versionedList.getVersion() != version) {
126      return false;
127    }
128    List<ImmutableSegment> suffix;
129    synchronized (pipeline){
130      if(versionedList.getVersion() != version) {
131        return false;
132      }
133      suffix = versionedList.getStoreSegments();
134      LOG.debug("Swapping pipeline suffix; before={}, new segment={}",
135          versionedList.getStoreSegments().size(), segment);
136      swapSuffix(suffix, segment, closeSuffix);
137      readOnlyCopy = new LinkedList<>(pipeline);
138      version++;
139    }
140    if (updateRegionSize && region != null) {
141      // update the global memstore size counter
142      long suffixDataSize = getSegmentsKeySize(suffix);
143      long suffixHeapSize = getSegmentsHeapSize(suffix);
144      long suffixOffHeapSize = getSegmentsOffHeapSize(suffix);
145      int suffixCellsCount = getSegmentsCellsCount(suffix);
146      long newDataSize = 0;
147      long newHeapSize = 0;
148      long newOffHeapSize = 0;
149      int newCellsCount = 0;
150      if (segment != null) {
151        newDataSize = segment.getDataSize();
152        newHeapSize = segment.getHeapSize();
153        newOffHeapSize = segment.getOffHeapSize();
154        newCellsCount = segment.getCellsCount();
155      }
156      long dataSizeDelta = suffixDataSize - newDataSize;
157      long heapSizeDelta = suffixHeapSize - newHeapSize;
158      long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
159      int cellsCountDelta = suffixCellsCount - newCellsCount;
160      region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta, -cellsCountDelta);
161      LOG.debug(
162        "Suffix data size={}, new segment data size={}, suffix heap size={},new segment heap "
163            + "size={}  suffix off heap size={}, new segment off heap size={}, suffix cells "
164            + "count={}, new segment cells count={}",
165        suffixDataSize, newDataSize, suffixHeapSize, newHeapSize, suffixOffHeapSize, newOffHeapSize,
166        suffixCellsCount, newCellsCount);
167    }
168    return true;
169  }
170
171  private static long getSegmentsHeapSize(List<? extends Segment> list) {
172    long res = 0;
173    for (Segment segment : list) {
174      res += segment.getHeapSize();
175    }
176    return res;
177  }
178
179  private static long getSegmentsOffHeapSize(List<? extends Segment> list) {
180    long res = 0;
181    for (Segment segment : list) {
182      res += segment.getOffHeapSize();
183    }
184    return res;
185  }
186
187  private static long getSegmentsKeySize(List<? extends Segment> list) {
188    long res = 0;
189    for (Segment segment : list) {
190      res += segment.getDataSize();
191    }
192    return res;
193  }
194
195  private static int getSegmentsCellsCount(List<? extends Segment> list) {
196    int res = 0;
197    for (Segment segment : list) {
198      res += segment.getCellsCount();
199    }
200    return res;
201  }
202
203  /**
204   * If the caller holds the current version, go over the the pipeline and try to flatten each
205   * segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based.
206   * Flattening of the segment that initially is not based on ConcurrentSkipListMap has no effect.
207   * Return after one segment is successfully flatten.
208   *
209   * @return true iff a segment was successfully flattened
210   */
211  public boolean flattenOneSegment(long requesterVersion,
212      CompactingMemStore.IndexType idxType,
213      MemStoreCompactionStrategy.Action action) {
214
215    if(requesterVersion != version) {
216      LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
217          + requesterVersion + ", actual version: " + version);
218      return false;
219    }
220
221    synchronized (pipeline){
222      if(requesterVersion != version) {
223        LOG.warn("Segment flattening failed, because versions do not match");
224        return false;
225      }
226      int i = 0;
227      for (ImmutableSegment s : pipeline) {
228        if ( s.canBeFlattened() ) {
229          s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
230          // size to be updated
231          MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
232          ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
233              (CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action);
234          replaceAtIndex(i,newS);
235          if (region != null) {
236            // Update the global memstore size counter upon flattening there is no change in the
237            // data size
238            MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
239            region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
240              mss.getCellsCount());
241          }
242          LOG.debug("Compaction pipeline segment {} flattened", s);
243          return true;
244        }
245        i++;
246      }
247
248    }
249    // do not update the global memstore size counter and do not increase the version,
250    // because all the cells remain in place
251    return false;
252  }
253
254  public boolean isEmpty() {
255    return readOnlyCopy.isEmpty();
256  }
257
258  public List<? extends Segment> getSegments() {
259    return readOnlyCopy;
260  }
261
262  public long size() {
263    return readOnlyCopy.size();
264  }
265
266  public long getMinSequenceId() {
267    long minSequenceId = Long.MAX_VALUE;
268    LinkedList<? extends Segment> localCopy = readOnlyCopy;
269    if (!localCopy.isEmpty()) {
270      minSequenceId = localCopy.getLast().getMinSequenceId();
271    }
272    return minSequenceId;
273  }
274
275  public MemStoreSize getTailSize() {
276    LinkedList<? extends Segment> localCopy = readOnlyCopy;
277    return localCopy.isEmpty()? new MemStoreSize(): localCopy.peekLast().getMemStoreSize();
278  }
279
280  public MemStoreSize getPipelineSize() {
281    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
282    LinkedList<? extends Segment> localCopy = readOnlyCopy;
283    for (Segment segment : localCopy) {
284      memStoreSizing.incMemStoreSize(segment.getMemStoreSize());
285    }
286    return memStoreSizing.getMemStoreSize();
287  }
288
289  private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
290      boolean closeSegmentsInSuffix) {
291    pipeline.removeAll(suffix);
292    if(segment != null) pipeline.addLast(segment);
293    // During index merge we won't be closing the segments undergoing the merge. Segment#close()
294    // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
295    // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
296    // compaction, we would have copied the cells data from old MSLAB chunks into a new chunk
297    // created for the result segment. So we can release the chunks associated with the compacted
298    // segments.
299    if (closeSegmentsInSuffix) {
300      for (Segment itemInSuffix : suffix) {
301        itemInSuffix.close();
302      }
303    }
304  }
305
306  // replacing one segment in the pipeline with a new one exactly at the same index
307  // need to be called only within synchronized block
308  private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
309    pipeline.set(idx, newSegment);
310    readOnlyCopy = new LinkedList<>(pipeline);
311  }
312
313  public Segment getTail() {
314    List<? extends Segment> localCopy = getSegments();
315    if(localCopy.isEmpty()) {
316      return null;
317    }
318    return localCopy.get(localCopy.size() - 1);
319  }
320
321  private boolean addFirst(ImmutableSegment segment) {
322    pipeline.addFirst(segment);
323    return true;
324  }
325
326  // debug method
327  private boolean validateSuffixList(LinkedList<ImmutableSegment> suffix) {
328    if(suffix.isEmpty()) {
329      // empty suffix is always valid
330      return true;
331    }
332    Iterator<ImmutableSegment> pipelineBackwardIterator = pipeline.descendingIterator();
333    Iterator<ImmutableSegment> suffixBackwardIterator = suffix.descendingIterator();
334    ImmutableSegment suffixCurrent;
335    ImmutableSegment pipelineCurrent;
336    for( ; suffixBackwardIterator.hasNext(); ) {
337      if(!pipelineBackwardIterator.hasNext()) {
338        // a suffix longer than pipeline is invalid
339        return false;
340      }
341      suffixCurrent = suffixBackwardIterator.next();
342      pipelineCurrent = pipelineBackwardIterator.next();
343      if(suffixCurrent != pipelineCurrent) {
344        // non-matching suffix
345        return false;
346      }
347    }
348    // suffix matches pipeline suffix
349    return true;
350  }
351
352}