001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.LinkedList; 023import java.util.List; 024import java.util.ListIterator; 025import org.apache.hadoop.hbase.util.Bytes; 026import org.apache.hadoop.hbase.util.ClassSize; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. It supports 033 * pushing a segment at the head of the pipeline and removing a segment from the tail when it is 034 * flushed to disk. It also supports swap method to allow the in-memory compaction swap a subset of 035 * the segments at the tail of the pipeline with a new (compacted) one. This swap succeeds only if 036 * the version number passed with the list of segments to swap is the same as the current version of 037 * the pipeline. Essentially, there are two methods which can change the structure of the pipeline: 038 * pushHead() and swap(), the later is used both by a flush to disk and by an in-memory compaction. 039 * The pipeline version is updated by swap(); it allows to identify conflicting operations at the 040 * suffix of the pipeline. The synchronization model is copy-on-write. Methods which change the 041 * structure of the pipeline (pushHead() and swap()) apply their changes in the context of a lock. 042 * They also make a read-only copy of the pipeline's list. Read methods read from a read-only copy. 043 * If a read method accesses the read-only copy more than once it makes a local copy of it to ensure 044 * it accesses the same copy. The methods getVersionedList(), getVersionedTail(), and 045 * flattenOneSegment() are also protected by a lock since they need to have a consistent (atomic) 046 * view of the pipeline list and version number. 047 */ 048@InterfaceAudience.Private 049public class CompactionPipeline { 050 private static final Logger LOG = LoggerFactory.getLogger(CompactionPipeline.class); 051 052 public final static long FIXED_OVERHEAD = 053 ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); 054 public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST); 055 056 private final RegionServicesForStores region; 057 private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>(); 058 // The list is volatile to avoid reading a new allocated reference before the c'tor is executed 059 private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>(); 060 /** 061 * <pre> 062 * Version is volatile to ensure it is atomically read when not using a lock. 063 * To indicate whether the suffix of pipeline changes: 064 * 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only 065 * added at Head, {@link #version} not change. 066 * 2.for {@link CompactionPipeline#swap},{@link #version} increase. 067 * 3.for {@link CompactionPipeline#replaceAtIndex},{@link #version} increase. 068 * </pre> 069 */ 070 private volatile long version = 0; 071 072 public CompactionPipeline(RegionServicesForStores region) { 073 this.region = region; 074 } 075 076 public boolean pushHead(MutableSegment segment) { 077 // Record the ImmutableSegment' heap overhead when initialing 078 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 079 ImmutableSegment immutableSegment = 080 SegmentFactory.instance().createImmutableSegment(segment, memstoreAccounting); 081 if (region != null) { 082 region.addMemStoreSize(memstoreAccounting.getDataSize(), memstoreAccounting.getHeapSize(), 083 memstoreAccounting.getOffHeapSize(), memstoreAccounting.getCellsCount()); 084 } 085 synchronized (pipeline) { 086 boolean res = addFirst(immutableSegment); 087 readOnlyCopy = new LinkedList<>(pipeline); 088 return res; 089 } 090 } 091 092 public VersionedSegmentsList getVersionedList() { 093 synchronized (pipeline) { 094 return new VersionedSegmentsList(readOnlyCopy, version); 095 } 096 } 097 098 public VersionedSegmentsList getVersionedTail() { 099 synchronized (pipeline) { 100 ArrayList<ImmutableSegment> segmentList = new ArrayList<>(); 101 if (!pipeline.isEmpty()) { 102 segmentList.add(0, pipeline.getLast()); 103 } 104 return new VersionedSegmentsList(segmentList, version); 105 } 106 } 107 108 /** 109 * Swaps the versioned list at the tail of the pipeline with a new segment. Swapping only if there 110 * were no changes to the suffix of the list since the version list was created. 111 * @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline 112 * @param segment new segment to replace the suffix. Can be null if the suffix just needs 113 * to be removed. 114 * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it 115 * out During index merge op this will be false and for compaction it will 116 * be true. 117 * @param updateRegionSize whether to update the region size. Update the region size, when the 118 * pipeline is swapped as part of in-memory-flush and further 119 * merge/compaction. Don't update the region size when the swap is result 120 * of the snapshot (flush-to-disk). 121 * @return true iff swapped tail with new segment 122 */ 123 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", 124 justification = "Increment is done under a synchronize block so safe") 125 public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment, 126 boolean closeSuffix, boolean updateRegionSize) { 127 if (versionedList.getVersion() != version) { 128 return false; 129 } 130 List<ImmutableSegment> suffix; 131 synchronized (pipeline) { 132 if (versionedList.getVersion() != version) { 133 return false; 134 } 135 suffix = versionedList.getStoreSegments(); 136 LOG.debug("Swapping pipeline suffix; before={}, new segment={}", 137 versionedList.getStoreSegments().size(), segment); 138 swapSuffix(suffix, segment, closeSuffix); 139 readOnlyCopy = new LinkedList<>(pipeline); 140 version++; 141 } 142 if (updateRegionSize && region != null) { 143 // update the global memstore size counter 144 long suffixDataSize = getSegmentsKeySize(suffix); 145 long suffixHeapSize = getSegmentsHeapSize(suffix); 146 long suffixOffHeapSize = getSegmentsOffHeapSize(suffix); 147 int suffixCellsCount = getSegmentsCellsCount(suffix); 148 long newDataSize = 0; 149 long newHeapSize = 0; 150 long newOffHeapSize = 0; 151 int newCellsCount = 0; 152 if (segment != null) { 153 newDataSize = segment.getDataSize(); 154 newHeapSize = segment.getHeapSize(); 155 newOffHeapSize = segment.getOffHeapSize(); 156 newCellsCount = segment.getCellsCount(); 157 } 158 long dataSizeDelta = suffixDataSize - newDataSize; 159 long heapSizeDelta = suffixHeapSize - newHeapSize; 160 long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize; 161 int cellsCountDelta = suffixCellsCount - newCellsCount; 162 region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta, -cellsCountDelta); 163 LOG.debug( 164 "Suffix data size={}, new segment data size={}, suffix heap size={},new segment heap " 165 + "size={}  suffix off heap size={}, new segment off heap size={}, suffix cells " 166 + "count={}, new segment cells count={}", 167 suffixDataSize, newDataSize, suffixHeapSize, newHeapSize, suffixOffHeapSize, newOffHeapSize, 168 suffixCellsCount, newCellsCount); 169 } 170 return true; 171 } 172 173 private static long getSegmentsHeapSize(List<? extends Segment> list) { 174 long res = 0; 175 for (Segment segment : list) { 176 res += segment.getHeapSize(); 177 } 178 return res; 179 } 180 181 private static long getSegmentsOffHeapSize(List<? extends Segment> list) { 182 long res = 0; 183 for (Segment segment : list) { 184 res += segment.getOffHeapSize(); 185 } 186 return res; 187 } 188 189 private static long getSegmentsKeySize(List<? extends Segment> list) { 190 long res = 0; 191 for (Segment segment : list) { 192 res += segment.getDataSize(); 193 } 194 return res; 195 } 196 197 private static int getSegmentsCellsCount(List<? extends Segment> list) { 198 int res = 0; 199 for (Segment segment : list) { 200 res += segment.getCellsCount(); 201 } 202 return res; 203 } 204 205 /** 206 * If the caller holds the current version, go over the the pipeline and try to flatten each 207 * segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based. 208 * Flattening of the segment that initially is not based on ConcurrentSkipListMap has no effect. 209 * Return after one segment is successfully flatten. 210 * @return true iff a segment was successfully flattened 211 */ 212 public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType, 213 MemStoreCompactionStrategy.Action action) { 214 215 if (requesterVersion != version) { 216 LOG.warn("Segment flattening failed, because versions do not match. Requester version: " 217 + requesterVersion + ", actual version: " + version); 218 return false; 219 } 220 221 synchronized (pipeline) { 222 if (requesterVersion != version) { 223 LOG.warn("Segment flattening failed, because versions do not match"); 224 return false; 225 } 226 int i = -1; 227 for (ImmutableSegment s : pipeline) { 228 i++; 229 if (s.canBeFlattened()) { 230 s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed 231 if (s.isEmpty()) { 232 // after s.waitForUpdates() is called, there is no updates pending,if no cells in s, 233 // we can skip it. 234 continue; 235 } 236 // size to be updated 237 MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing(); 238 ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening( 239 (CSLMImmutableSegment) s, idxType, newMemstoreAccounting, action); 240 replaceAtIndex(i, newS); 241 if (region != null) { 242 // Update the global memstore size counter upon flattening there is no change in the 243 // data size 244 MemStoreSize mss = newMemstoreAccounting.getMemStoreSize(); 245 region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), 246 mss.getCellsCount()); 247 } 248 LOG.debug("Compaction pipeline segment {} flattened", s); 249 return true; 250 } 251 } 252 } 253 // do not update the global memstore size counter and do not increase the version, 254 // because all the cells remain in place 255 return false; 256 } 257 258 public boolean isEmpty() { 259 return readOnlyCopy.isEmpty(); 260 } 261 262 public List<? extends Segment> getSegments() { 263 return readOnlyCopy; 264 } 265 266 public long size() { 267 return readOnlyCopy.size(); 268 } 269 270 public long getMinSequenceId() { 271 long minSequenceId = Long.MAX_VALUE; 272 LinkedList<? extends Segment> localCopy = readOnlyCopy; 273 if (!localCopy.isEmpty()) { 274 minSequenceId = localCopy.getLast().getMinSequenceId(); 275 } 276 return minSequenceId; 277 } 278 279 public MemStoreSize getTailSize() { 280 LinkedList<? extends Segment> localCopy = readOnlyCopy; 281 return localCopy.isEmpty() ? new MemStoreSize() : localCopy.peekLast().getMemStoreSize(); 282 } 283 284 public MemStoreSize getPipelineSize() { 285 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 286 LinkedList<? extends Segment> localCopy = readOnlyCopy; 287 for (Segment segment : localCopy) { 288 memStoreSizing.incMemStoreSize(segment.getMemStoreSize()); 289 } 290 return memStoreSizing.getMemStoreSize(); 291 } 292 293 /** 294 * Must be called under the {@link CompactionPipeline#pipeline} Lock. 295 */ 296 private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment, 297 boolean closeSegmentsInSuffix) { 298 matchAndRemoveSuffixFromPipeline(suffix); 299 if (segment != null) { 300 pipeline.addLast(segment); 301 } 302 // During index merge we won't be closing the segments undergoing the merge. Segment#close() 303 // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy 304 // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data 305 // compaction, we would have copied the cells data from old MSLAB chunks into a new chunk 306 // created for the result segment. So we can release the chunks associated with the compacted 307 // segments. 308 if (closeSegmentsInSuffix) { 309 for (Segment itemInSuffix : suffix) { 310 itemInSuffix.close(); 311 } 312 } 313 } 314 315 /** 316 * Checking that the {@link Segment}s in suffix input parameter is same as the {@link Segment}s in 317 * {@link CompactionPipeline#pipeline} one by one from the last element to the first element of 318 * suffix. If matched, remove suffix from {@link CompactionPipeline#pipeline}. <br/> 319 * Must be called under the {@link CompactionPipeline#pipeline} Lock. 320 */ 321 private void matchAndRemoveSuffixFromPipeline(List<? extends Segment> suffix) { 322 if (suffix.isEmpty()) { 323 return; 324 } 325 if (pipeline.size() < suffix.size()) { 326 throw new IllegalStateException( 327 "CODE-BUG:pipleine size:[" + pipeline.size() + "],suffix size:[" + suffix.size() 328 + "],pipeline size must greater than or equals suffix size"); 329 } 330 331 ListIterator<? extends Segment> suffixIterator = suffix.listIterator(suffix.size()); 332 ListIterator<? extends Segment> pipelineIterator = pipeline.listIterator(pipeline.size()); 333 int count = 0; 334 while (suffixIterator.hasPrevious()) { 335 Segment suffixSegment = suffixIterator.previous(); 336 Segment pipelineSegment = pipelineIterator.previous(); 337 if (suffixSegment != pipelineSegment) { 338 throw new IllegalStateException("CODE-BUG:suffix last:[" + count + "]" + suffixSegment 339 + " is not pipleline segment:[" + pipelineSegment + "]"); 340 } 341 count++; 342 } 343 344 for (int index = 1; index <= count; index++) { 345 pipeline.pollLast(); 346 } 347 348 } 349 350 // replacing one segment in the pipeline with a new one exactly at the same index 351 // need to be called only within synchronized block 352 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", 353 justification = "replaceAtIndex is invoked under a synchronize block so safe") 354 private void replaceAtIndex(int idx, ImmutableSegment newSegment) { 355 pipeline.set(idx, newSegment); 356 readOnlyCopy = new LinkedList<>(pipeline); 357 // the version increment is indeed needed, because the swap uses removeAll() method of the 358 // linked-list that compares the objects to find what to remove. 359 // The flattening changes the segment object completely (creation pattern) and so 360 // swap will not proceed correctly after concurrent flattening. 361 version++; 362 } 363 364 public Segment getTail() { 365 List<? extends Segment> localCopy = getSegments(); 366 if (localCopy.isEmpty()) { 367 return null; 368 } 369 return localCopy.get(localCopy.size() - 1); 370 } 371 372 private boolean addFirst(ImmutableSegment segment) { 373 pipeline.addFirst(segment); 374 return true; 375 } 376 377 // debug method 378 private boolean validateSuffixList(LinkedList<ImmutableSegment> suffix) { 379 if (suffix.isEmpty()) { 380 // empty suffix is always valid 381 return true; 382 } 383 Iterator<ImmutableSegment> pipelineBackwardIterator = pipeline.descendingIterator(); 384 Iterator<ImmutableSegment> suffixBackwardIterator = suffix.descendingIterator(); 385 ImmutableSegment suffixCurrent; 386 ImmutableSegment pipelineCurrent; 387 for (; suffixBackwardIterator.hasNext();) { 388 if (!pipelineBackwardIterator.hasNext()) { 389 // a suffix longer than pipeline is invalid 390 return false; 391 } 392 suffixCurrent = suffixBackwardIterator.next(); 393 pipelineCurrent = pipelineBackwardIterator.next(); 394 if (suffixCurrent != pipelineCurrent) { 395 // non-matching suffix 396 return false; 397 } 398 } 399 // suffix matches pipeline suffix 400 return true; 401 } 402 403}