001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.util.ArrayList; 022import java.util.Iterator; 023import java.util.LinkedList; 024import java.util.List; 025 026import org.apache.hadoop.hbase.util.Bytes; 027import org.apache.hadoop.hbase.util.ClassSize; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. 034 * It supports pushing a segment at the head of the pipeline and removing a segment from the 035 * tail when it is flushed to disk. 036 * It also supports swap method to allow the in-memory compaction swap a subset of the segments 037 * at the tail of the pipeline with a new (compacted) one. This swap succeeds only if the version 038 * number passed with the list of segments to swap is the same as the current version of the 039 * pipeline. 040 * Essentially, there are two methods which can change the structure of the pipeline: pushHead() 041 * and swap(), the later is used both by a flush to disk and by an in-memory compaction. 042 * The pipeline version is updated by swap(); it allows to identify conflicting operations at the 043 * suffix of the pipeline. 044 * 045 * The synchronization model is copy-on-write. Methods which change the structure of the 046 * pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also make 047 * a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a read 048 * method accesses the read-only copy more than once it makes a local copy of it 049 * to ensure it accesses the same copy. 050 * 051 * The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() are also 052 * protected by a lock since they need to have a consistent (atomic) view of the pipeline list 053 * and version number. 054 */ 055@InterfaceAudience.Private 056public class CompactionPipeline { 057 private static final Logger LOG = LoggerFactory.getLogger(CompactionPipeline.class); 058 059 public final static long FIXED_OVERHEAD = ClassSize 060 .align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); 061 public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST); 062 063 private final RegionServicesForStores region; 064 private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>(); 065 // The list is volatile to avoid reading a new allocated reference before the c'tor is executed 066 private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>(); 067 // Version is volatile to ensure it is atomically read when not using a lock 068 private volatile long version = 0; 069 070 public CompactionPipeline(RegionServicesForStores region) { 071 this.region = region; 072 } 073 074 public boolean pushHead(MutableSegment segment) { 075 // Record the ImmutableSegment' heap overhead when initialing 076 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 077 ImmutableSegment immutableSegment = SegmentFactory.instance(). 078 createImmutableSegment(segment, memstoreAccounting); 079 if (region != null) { 080 region.addMemStoreSize(memstoreAccounting.getDataSize(), memstoreAccounting.getHeapSize(), 081 memstoreAccounting.getOffHeapSize(), memstoreAccounting.getCellsCount()); 082 } 083 synchronized (pipeline){ 084 boolean res = addFirst(immutableSegment); 085 readOnlyCopy = new LinkedList<>(pipeline); 086 return res; 087 } 088 } 089 090 public VersionedSegmentsList getVersionedList() { 091 synchronized (pipeline){ 092 return new VersionedSegmentsList(readOnlyCopy, version); 093 } 094 } 095 096 public VersionedSegmentsList getVersionedTail() { 097 synchronized (pipeline){ 098 List<ImmutableSegment> segmentList = new ArrayList<>(); 099 if(!pipeline.isEmpty()) { 100 segmentList.add(0, pipeline.getLast()); 101 } 102 return new VersionedSegmentsList(segmentList, version); 103 } 104 } 105 106 /** 107 * Swaps the versioned list at the tail of the pipeline with a new segment. 108 * Swapping only if there were no changes to the suffix of the list since the version list was 109 * created. 110 * @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline 111 * @param segment new segment to replace the suffix. Can be null if the suffix just needs to be 112 * removed. 113 * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out 114 * During index merge op this will be false and for compaction it will be true. 115 * @param updateRegionSize whether to update the region size. Update the region size, 116 * when the pipeline is swapped as part of in-memory-flush and 117 * further merge/compaction. Don't update the region size when the 118 * swap is result of the snapshot (flush-to-disk). 119 * @return true iff swapped tail with new segment 120 */ 121 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT", 122 justification="Increment is done under a synchronize block so safe") 123 public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment, 124 boolean closeSuffix, boolean updateRegionSize) { 125 if (versionedList.getVersion() != version) { 126 return false; 127 } 128 List<ImmutableSegment> suffix; 129 synchronized (pipeline){ 130 if(versionedList.getVersion() != version) { 131 return false; 132 } 133 suffix = versionedList.getStoreSegments(); 134 LOG.debug("Swapping pipeline suffix; before={}, new segment={}", 135 versionedList.getStoreSegments().size(), segment); 136 swapSuffix(suffix, segment, closeSuffix); 137 readOnlyCopy = new LinkedList<>(pipeline); 138 version++; 139 } 140 if (updateRegionSize && region != null) { 141 // update the global memstore size counter 142 long suffixDataSize = getSegmentsKeySize(suffix); 143 long suffixHeapSize = getSegmentsHeapSize(suffix); 144 long suffixOffHeapSize = getSegmentsOffHeapSize(suffix); 145 int suffixCellsCount = getSegmentsCellsCount(suffix); 146 long newDataSize = 0; 147 long newHeapSize = 0; 148 long newOffHeapSize = 0; 149 int newCellsCount = 0; 150 if (segment != null) { 151 newDataSize = segment.getDataSize(); 152 newHeapSize = segment.getHeapSize(); 153 newOffHeapSize = segment.getOffHeapSize(); 154 newCellsCount = segment.getCellsCount(); 155 } 156 long dataSizeDelta = suffixDataSize - newDataSize; 157 long heapSizeDelta = suffixHeapSize - newHeapSize; 158 long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize; 159 int cellsCountDelta = suffixCellsCount - newCellsCount; 160 region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta, -cellsCountDelta); 161 LOG.debug( 162 "Suffix data size={}, new segment data size={}, suffix heap size={},new segment heap " 163 + "size={}  suffix off heap size={}, new segment off heap size={}, suffix cells " 164 + "count={}, new segment cells count={}", 165 suffixDataSize, newDataSize, suffixHeapSize, newHeapSize, suffixOffHeapSize, newOffHeapSize, 166 suffixCellsCount, newCellsCount); 167 } 168 return true; 169 } 170 171 private static long getSegmentsHeapSize(List<? extends Segment> list) { 172 long res = 0; 173 for (Segment segment : list) { 174 res += segment.getHeapSize(); 175 } 176 return res; 177 } 178 179 private static long getSegmentsOffHeapSize(List<? extends Segment> list) { 180 long res = 0; 181 for (Segment segment : list) { 182 res += segment.getOffHeapSize(); 183 } 184 return res; 185 } 186 187 private static long getSegmentsKeySize(List<? extends Segment> list) { 188 long res = 0; 189 for (Segment segment : list) { 190 res += segment.getDataSize(); 191 } 192 return res; 193 } 194 195 private static int getSegmentsCellsCount(List<? extends Segment> list) { 196 int res = 0; 197 for (Segment segment : list) { 198 res += segment.getCellsCount(); 199 } 200 return res; 201 } 202 203 /** 204 * If the caller holds the current version, go over the the pipeline and try to flatten each 205 * segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based. 206 * Flattening of the segment that initially is not based on ConcurrentSkipListMap has no effect. 207 * Return after one segment is successfully flatten. 208 * 209 * @return true iff a segment was successfully flattened 210 */ 211 public boolean flattenOneSegment(long requesterVersion, 212 CompactingMemStore.IndexType idxType, 213 MemStoreCompactionStrategy.Action action) { 214 215 if(requesterVersion != version) { 216 LOG.warn("Segment flattening failed, because versions do not match. Requester version: " 217 + requesterVersion + ", actual version: " + version); 218 return false; 219 } 220 221 synchronized (pipeline){ 222 if(requesterVersion != version) { 223 LOG.warn("Segment flattening failed, because versions do not match"); 224 return false; 225 } 226 int i = 0; 227 for (ImmutableSegment s : pipeline) { 228 if ( s.canBeFlattened() ) { 229 s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed 230 // size to be updated 231 MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing(); 232 ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening( 233 (CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action); 234 replaceAtIndex(i,newS); 235 if (region != null) { 236 // Update the global memstore size counter upon flattening there is no change in the 237 // data size 238 MemStoreSize mss = newMemstoreAccounting.getMemStoreSize(); 239 region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), 240 mss.getCellsCount()); 241 } 242 LOG.debug("Compaction pipeline segment {} flattened", s); 243 return true; 244 } 245 i++; 246 } 247 248 } 249 // do not update the global memstore size counter and do not increase the version, 250 // because all the cells remain in place 251 return false; 252 } 253 254 public boolean isEmpty() { 255 return readOnlyCopy.isEmpty(); 256 } 257 258 public List<? extends Segment> getSegments() { 259 return readOnlyCopy; 260 } 261 262 public long size() { 263 return readOnlyCopy.size(); 264 } 265 266 public long getMinSequenceId() { 267 long minSequenceId = Long.MAX_VALUE; 268 LinkedList<? extends Segment> localCopy = readOnlyCopy; 269 if (!localCopy.isEmpty()) { 270 minSequenceId = localCopy.getLast().getMinSequenceId(); 271 } 272 return minSequenceId; 273 } 274 275 public MemStoreSize getTailSize() { 276 LinkedList<? extends Segment> localCopy = readOnlyCopy; 277 return localCopy.isEmpty()? new MemStoreSize(): localCopy.peekLast().getMemStoreSize(); 278 } 279 280 public MemStoreSize getPipelineSize() { 281 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 282 LinkedList<? extends Segment> localCopy = readOnlyCopy; 283 for (Segment segment : localCopy) { 284 memStoreSizing.incMemStoreSize(segment.getMemStoreSize()); 285 } 286 return memStoreSizing.getMemStoreSize(); 287 } 288 289 private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment, 290 boolean closeSegmentsInSuffix) { 291 pipeline.removeAll(suffix); 292 if(segment != null) pipeline.addLast(segment); 293 // During index merge we won't be closing the segments undergoing the merge. Segment#close() 294 // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy 295 // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data 296 // compaction, we would have copied the cells data from old MSLAB chunks into a new chunk 297 // created for the result segment. So we can release the chunks associated with the compacted 298 // segments. 299 if (closeSegmentsInSuffix) { 300 for (Segment itemInSuffix : suffix) { 301 itemInSuffix.close(); 302 } 303 } 304 } 305 306 // replacing one segment in the pipeline with a new one exactly at the same index 307 // need to be called only within synchronized block 308 private void replaceAtIndex(int idx, ImmutableSegment newSegment) { 309 pipeline.set(idx, newSegment); 310 readOnlyCopy = new LinkedList<>(pipeline); 311 } 312 313 public Segment getTail() { 314 List<? extends Segment> localCopy = getSegments(); 315 if(localCopy.isEmpty()) { 316 return null; 317 } 318 return localCopy.get(localCopy.size() - 1); 319 } 320 321 private boolean addFirst(ImmutableSegment segment) { 322 pipeline.addFirst(segment); 323 return true; 324 } 325 326 // debug method 327 private boolean validateSuffixList(LinkedList<ImmutableSegment> suffix) { 328 if(suffix.isEmpty()) { 329 // empty suffix is always valid 330 return true; 331 } 332 Iterator<ImmutableSegment> pipelineBackwardIterator = pipeline.descendingIterator(); 333 Iterator<ImmutableSegment> suffixBackwardIterator = suffix.descendingIterator(); 334 ImmutableSegment suffixCurrent; 335 ImmutableSegment pipelineCurrent; 336 for( ; suffixBackwardIterator.hasNext(); ) { 337 if(!pipelineBackwardIterator.hasNext()) { 338 // a suffix longer than pipeline is invalid 339 return false; 340 } 341 suffixCurrent = suffixBackwardIterator.next(); 342 pipelineCurrent = pipelineBackwardIterator.next(); 343 if(suffixCurrent != pipelineCurrent) { 344 // non-matching suffix 345 return false; 346 } 347 } 348 // suffix matches pipeline suffix 349 return true; 350 } 351 352}