001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.util.ArrayList; 022import java.util.Iterator; 023import java.util.LinkedList; 024import java.util.List; 025 026import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 027import org.apache.hadoop.hbase.util.Bytes; 028import org.apache.hadoop.hbase.util.ClassSize; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. 035 * It supports pushing a segment at the head of the pipeline and removing a segment from the 036 * tail when it is flushed to disk. 037 * It also supports swap method to allow the in-memory compaction swap a subset of the segments 038 * at the tail of the pipeline with a new (compacted) one. This swap succeeds only if the version 039 * number passed with the list of segments to swap is the same as the current version of the 040 * pipeline. 041 * Essentially, there are two methods which can change the structure of the pipeline: pushHead() 042 * and swap(), the later is used both by a flush to disk and by an in-memory compaction. 043 * The pipeline version is updated by swap(); it allows to identify conflicting operations at the 044 * suffix of the pipeline. 045 * 046 * The synchronization model is copy-on-write. Methods which change the structure of the 047 * pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also make 048 * a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a read 049 * method accesses the read-only copy more than once it makes a local copy of it 050 * to ensure it accesses the same copy. 051 * 052 * The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() are also 053 * protected by a lock since they need to have a consistent (atomic) view of the pipeline list 054 * and version number. 055 */ 056@InterfaceAudience.Private 057public class CompactionPipeline { 058 private static final Logger LOG = LoggerFactory.getLogger(CompactionPipeline.class); 059 060 public final static long FIXED_OVERHEAD = ClassSize 061 .align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); 062 public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST); 063 064 private final RegionServicesForStores region; 065 private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>(); 066 // The list is volatile to avoid reading a new allocated reference before the c'tor is executed 067 private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>(); 068 // Version is volatile to ensure it is atomically read when not using a lock 069 private volatile long version = 0; 070 071 public CompactionPipeline(RegionServicesForStores region) { 072 this.region = region; 073 } 074 075 public boolean pushHead(MutableSegment segment) { 076 // Record the ImmutableSegment' heap overhead when initialing 077 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 078 ImmutableSegment immutableSegment = SegmentFactory.instance(). 079 createImmutableSegment(segment, memstoreAccounting); 080 if (region != null) { 081 region.addMemStoreSize(memstoreAccounting.getDataSize(), memstoreAccounting.getHeapSize(), 082 memstoreAccounting.getOffHeapSize(), memstoreAccounting.getCellsCount()); 083 } 084 synchronized (pipeline){ 085 boolean res = addFirst(immutableSegment); 086 readOnlyCopy = new LinkedList<>(pipeline); 087 return res; 088 } 089 } 090 091 public VersionedSegmentsList getVersionedList() { 092 synchronized (pipeline){ 093 return new VersionedSegmentsList(readOnlyCopy, version); 094 } 095 } 096 097 public VersionedSegmentsList getVersionedTail() { 098 synchronized (pipeline){ 099 List<ImmutableSegment> segmentList = new ArrayList<>(); 100 if(!pipeline.isEmpty()) { 101 segmentList.add(0, pipeline.getLast()); 102 } 103 return new VersionedSegmentsList(segmentList, version); 104 } 105 } 106 107 /** 108 * Swaps the versioned list at the tail of the pipeline with a new segment. 109 * Swapping only if there were no changes to the suffix of the list since the version list was 110 * created. 111 * @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline 112 * @param segment new segment to replace the suffix. Can be null if the suffix just needs to be 113 * removed. 114 * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out 115 * During index merge op this will be false and for compaction it will be true. 116 * @param updateRegionSize whether to update the region size. Update the region size, 117 * when the pipeline is swapped as part of in-memory-flush and 118 * further merge/compaction. Don't update the region size when the 119 * swap is result of the snapshot (flush-to-disk). 120 * @return true iff swapped tail with new segment 121 */ 122 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT", 123 justification="Increment is done under a synchronize block so safe") 124 public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment, 125 boolean closeSuffix, boolean updateRegionSize) { 126 if (versionedList.getVersion() != version) { 127 return false; 128 } 129 List<ImmutableSegment> suffix; 130 synchronized (pipeline){ 131 if(versionedList.getVersion() != version) { 132 return false; 133 } 134 suffix = versionedList.getStoreSegments(); 135 LOG.debug("Swapping pipeline suffix; before={}, new segment={}", 136 versionedList.getStoreSegments().size(), segment); 137 swapSuffix(suffix, segment, closeSuffix); 138 readOnlyCopy = new LinkedList<>(pipeline); 139 version++; 140 } 141 if (updateRegionSize && region != null) { 142 // update the global memstore size counter 143 long suffixDataSize = getSegmentsKeySize(suffix); 144 long suffixHeapSize = getSegmentsHeapSize(suffix); 145 long suffixOffHeapSize = getSegmentsOffHeapSize(suffix); 146 int suffixCellsCount = getSegmentsCellsCount(suffix); 147 long newDataSize = 0; 148 long newHeapSize = 0; 149 long newOffHeapSize = 0; 150 int newCellsCount = 0; 151 if (segment != null) { 152 newDataSize = segment.getDataSize(); 153 newHeapSize = segment.getHeapSize(); 154 newOffHeapSize = segment.getOffHeapSize(); 155 newCellsCount = segment.getCellsCount(); 156 } 157 long dataSizeDelta = suffixDataSize - newDataSize; 158 long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize; 159 long heapSizeDelta = suffixHeapSize - newHeapSize; 160 int cellsCountDelta = suffixCellsCount - newCellsCount; 161 region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta, -cellsCountDelta); 162 LOG.debug( 163 "Suffix data size={}, new segment data size={}, " 164 + "suffix heap size={}, new segment heap size={}" 165 + "suffix off heap size={}, new segment off heap size={}" 166 + "suffix cells count={}, new cells count={}", 167 suffixDataSize, newDataSize, suffixHeapSize, newHeapSize, suffixOffHeapSize, newOffHeapSize, 168 suffixCellsCount, newCellsCount); 169 } 170 return true; 171 } 172 173 private static long getSegmentsHeapSize(List<? extends Segment> list) { 174 long res = 0; 175 for (Segment segment : list) { 176 res += segment.getHeapSize(); 177 } 178 return res; 179 } 180 181 private static long getSegmentsOffHeapSize(List<? extends Segment> list) { 182 long res = 0; 183 for (Segment segment : list) { 184 res += segment.getOffHeapSize(); 185 } 186 return res; 187 } 188 189 private static long getSegmentsKeySize(List<? extends Segment> list) { 190 long res = 0; 191 for (Segment segment : list) { 192 res += segment.getDataSize(); 193 } 194 return res; 195 } 196 197 private static int getSegmentsCellsCount(List<? extends Segment> list) { 198 int res = 0; 199 for (Segment segment : list) { 200 res += segment.getCellsCount(); 201 } 202 return res; 203 } 204 205 /** 206 * If the caller holds the current version, go over the the pipeline and try to flatten each 207 * segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based. 208 * Flattening of the segment that initially is not based on ConcurrentSkipListMap has no effect. 209 * Return after one segment is successfully flatten. 210 * 211 * @return true iff a segment was successfully flattened 212 */ 213 public boolean flattenOneSegment(long requesterVersion, 214 CompactingMemStore.IndexType idxType, 215 MemStoreCompactionStrategy.Action action) { 216 217 if(requesterVersion != version) { 218 LOG.warn("Segment flattening failed, because versions do not match. Requester version: " 219 + requesterVersion + ", actual version: " + version); 220 return false; 221 } 222 223 synchronized (pipeline){ 224 if(requesterVersion != version) { 225 LOG.warn("Segment flattening failed, because versions do not match"); 226 return false; 227 } 228 int i = 0; 229 for (ImmutableSegment s : pipeline) { 230 if ( s.canBeFlattened() ) { 231 // size to be updated 232 MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing(); 233 ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening( 234 (CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action); 235 replaceAtIndex(i,newS); 236 if (region != null) { 237 // Update the global memstore size counter upon flattening there is no change in the 238 // data size 239 MemStoreSize mss = newMemstoreAccounting.getMemStoreSize(); 240 Preconditions.checkArgument(mss.getDataSize() == 0, "Not zero!"); 241 region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), 242 mss.getCellsCount()); 243 } 244 LOG.debug("Compaction pipeline segment {} flattened", s); 245 return true; 246 } 247 i++; 248 } 249 250 } 251 // do not update the global memstore size counter and do not increase the version, 252 // because all the cells remain in place 253 return false; 254 } 255 256 public boolean isEmpty() { 257 return readOnlyCopy.isEmpty(); 258 } 259 260 public List<? extends Segment> getSegments() { 261 return readOnlyCopy; 262 } 263 264 public long size() { 265 return readOnlyCopy.size(); 266 } 267 268 public long getMinSequenceId() { 269 long minSequenceId = Long.MAX_VALUE; 270 LinkedList<? extends Segment> localCopy = readOnlyCopy; 271 if (!localCopy.isEmpty()) { 272 minSequenceId = localCopy.getLast().getMinSequenceId(); 273 } 274 return minSequenceId; 275 } 276 277 public MemStoreSize getTailSize() { 278 LinkedList<? extends Segment> localCopy = readOnlyCopy; 279 return localCopy.isEmpty()? new MemStoreSize(): localCopy.peekLast().getMemStoreSize(); 280 } 281 282 public MemStoreSize getPipelineSize() { 283 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 284 LinkedList<? extends Segment> localCopy = readOnlyCopy; 285 for (Segment segment : localCopy) { 286 memStoreSizing.incMemStoreSize(segment.getMemStoreSize()); 287 } 288 return memStoreSizing.getMemStoreSize(); 289 } 290 291 private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment, 292 boolean closeSegmentsInSuffix) { 293 pipeline.removeAll(suffix); 294 if(segment != null) pipeline.addLast(segment); 295 // During index merge we won't be closing the segments undergoing the merge. Segment#close() 296 // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy 297 // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data 298 // compaction, we would have copied the cells data from old MSLAB chunks into a new chunk 299 // created for the result segment. So we can release the chunks associated with the compacted 300 // segments. 301 if (closeSegmentsInSuffix) { 302 for (Segment itemInSuffix : suffix) { 303 itemInSuffix.close(); 304 } 305 } 306 } 307 308 // replacing one segment in the pipeline with a new one exactly at the same index 309 // need to be called only within synchronized block 310 private void replaceAtIndex(int idx, ImmutableSegment newSegment) { 311 pipeline.set(idx, newSegment); 312 readOnlyCopy = new LinkedList<>(pipeline); 313 } 314 315 public Segment getTail() { 316 List<? extends Segment> localCopy = getSegments(); 317 if(localCopy.isEmpty()) { 318 return null; 319 } 320 return localCopy.get(localCopy.size() - 1); 321 } 322 323 private boolean addFirst(ImmutableSegment segment) { 324 pipeline.addFirst(segment); 325 return true; 326 } 327 328 // debug method 329 private boolean validateSuffixList(LinkedList<ImmutableSegment> suffix) { 330 if(suffix.isEmpty()) { 331 // empty suffix is always valid 332 return true; 333 } 334 Iterator<ImmutableSegment> pipelineBackwardIterator = pipeline.descendingIterator(); 335 Iterator<ImmutableSegment> suffixBackwardIterator = suffix.descendingIterator(); 336 ImmutableSegment suffixCurrent; 337 ImmutableSegment pipelineCurrent; 338 for( ; suffixBackwardIterator.hasNext(); ) { 339 if(!pipelineBackwardIterator.hasNext()) { 340 // a suffix longer than pipeline is invalid 341 return false; 342 } 343 suffixCurrent = suffixBackwardIterator.next(); 344 pipelineCurrent = pipelineBackwardIterator.next(); 345 if(suffixCurrent != pipelineCurrent) { 346 // non-matching suffix 347 return false; 348 } 349 } 350 // suffix matches pipeline suffix 351 return true; 352 } 353 354}