001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.Comparator; 024import java.util.HashSet; 025import java.util.List; 026import java.util.PriorityQueue; 027import java.util.Set; 028import java.util.function.IntConsumer; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellComparator; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Implements a heap merge across any number of KeyValueScanners. 040 * <p> 041 * Implements KeyValueScanner itself. 042 * <p> 043 * This class is used at the Region level to merge across Stores and at the Store level to merge 044 * across the memstore and StoreFiles. 045 * <p> 046 * In the Region case, we also need InternalScanner.next(List), so this class also implements 047 * InternalScanner. WARNING: As is, if you try to use this as an InternalScanner at the Store level, 048 * you will get runtime exceptions. 049 */ 050@InterfaceAudience.Private 051public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner 052 implements KeyValueScanner, InternalScanner { 053 private static final Logger LOG = LoggerFactory.getLogger(KeyValueHeap.class); 054 protected PriorityQueue<KeyValueScanner> heap = null; 055 // Holds the scanners when a ever a eager close() happens. All such eagerly closed 056 // scans are collected and when the final scanner.close() happens will perform the 057 // actual close. 058 protected List<KeyValueScanner> scannersForDelayedClose = null; 059 060 /** 061 * The current sub-scanner, i.e. the one that contains the next key/value to return to the client. 062 * This scanner is NOT included in {@link #heap} (but we frequently add it back to the heap and 063 * pull the new winner out). We maintain an invariant that the current sub-scanner has already 064 * done a real seek, and that current.peek() is always a real key/value (or null) except for the 065 * fake last-key-on-row-column supplied by the multi-column Bloom filter optimization, which is OK 066 * to propagate to StoreScanner. In order to ensure that, always use {@link #pollRealKV()} to 067 * update current. 068 */ 069 protected KeyValueScanner current = null; 070 071 protected KVScannerComparator comparator; 072 073 private final Set<Path> filesRead = new HashSet<>(); 074 075 /** 076 * Constructor. This KeyValueHeap will handle closing of passed in KeyValueScanners. 077 */ 078 public KeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 079 throws IOException { 080 this(scanners, new KVScannerComparator(comparator)); 081 } 082 083 /** 084 * Constructor. 085 */ 086 KeyValueHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator) 087 throws IOException { 088 this.comparator = comparator; 089 this.scannersForDelayedClose = new ArrayList<>(scanners.size()); 090 if (!scanners.isEmpty()) { 091 this.heap = new PriorityQueue<>(scanners.size(), this.comparator); 092 for (KeyValueScanner scanner : scanners) { 093 if (scanner.peek() != null) { 094 this.heap.add(scanner); 095 } else { 096 this.scannersForDelayedClose.add(scanner); 097 } 098 } 099 this.current = pollRealKV(); 100 } 101 } 102 103 @Override 104 public ExtendedCell peek() { 105 if (this.current == null) { 106 return null; 107 } 108 return this.current.peek(); 109 } 110 111 boolean isLatestCellFromMemstore() { 112 return !this.current.isFileScanner(); 113 } 114 115 @Override 116 public void recordBlockSize(IntConsumer blockSizeConsumer) { 117 this.current.recordBlockSize(blockSizeConsumer); 118 } 119 120 @Override 121 public ExtendedCell next() throws IOException { 122 if (this.current == null) { 123 return null; 124 } 125 ExtendedCell kvReturn = this.current.next(); 126 ExtendedCell kvNext = this.current.peek(); 127 if (kvNext == null) { 128 this.scannersForDelayedClose.add(this.current); 129 this.current = null; 130 this.current = pollRealKV(); 131 } else { 132 KeyValueScanner topScanner = this.heap.peek(); 133 // no need to add current back to the heap if it is the only scanner left 134 if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) { 135 this.heap.add(this.current); 136 this.current = null; 137 this.current = pollRealKV(); 138 } 139 } 140 return kvReturn; 141 } 142 143 /** 144 * Gets the next row of keys from the top-most scanner. 145 * <p> 146 * This method takes care of updating the heap. 147 * <p> 148 * This can ONLY be called when you are using Scanners that implement InternalScanner as well as 149 * KeyValueScanner (a {@link StoreScanner}). 150 * @return true if more rows exist after this one, false if scanner is done 151 */ 152 @Override 153 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 154 throws IOException { 155 if (this.current == null) { 156 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 157 } 158 InternalScanner currentAsInternal = (InternalScanner) this.current; 159 boolean moreCells = currentAsInternal.next(result, scannerContext); 160 Cell pee = this.current.peek(); 161 162 /* 163 * By definition, any InternalScanner must return false only when it has no further rows to be 164 * fetched. So, we can close a scanner if it returns false. All existing implementations seem to 165 * be fine with this. It is much more efficient to close scanners which are not needed than keep 166 * them in the heap. This is also required for certain optimizations. 167 */ 168 169 if (pee == null || !moreCells) { 170 // add the scanner that is to be closed 171 this.scannersForDelayedClose.add(this.current); 172 } else { 173 this.heap.add(this.current); 174 } 175 this.current = null; 176 this.current = pollRealKV(); 177 if (this.current == null) { 178 moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 179 } 180 return moreCells; 181 } 182 183 protected static class KVScannerComparator implements Comparator<KeyValueScanner> { 184 protected CellComparator kvComparator; 185 186 /** 187 * Constructor 188 */ 189 public KVScannerComparator(CellComparator kvComparator) { 190 this.kvComparator = kvComparator; 191 } 192 193 @Override 194 public int compare(KeyValueScanner left, KeyValueScanner right) { 195 int comparison = compare(left.peek(), right.peek()); 196 if (comparison != 0) { 197 return comparison; 198 } else { 199 // Since both the keys are exactly the same, we break the tie in favor of higher ordered 200 // scanner since it'll have newer data. Since higher value should come first, we reverse 201 // sort here. 202 return Long.compare(right.getScannerOrder(), left.getScannerOrder()); 203 } 204 } 205 206 /** 207 * Compares two KeyValue 208 * @return less than 0 if left is smaller, 0 if equal etc.. 209 */ 210 public int compare(Cell left, Cell right) { 211 return this.kvComparator.compare(left, right); 212 } 213 214 /** 215 * */ 216 public CellComparator getComparator() { 217 return this.kvComparator; 218 } 219 } 220 221 @Override 222 public void close() { 223 for (KeyValueScanner scanner : this.scannersForDelayedClose) { 224 scanner.close(); 225 filesRead.addAll(scanner.getFilesRead()); 226 } 227 this.scannersForDelayedClose.clear(); 228 if (this.current != null) { 229 this.current.close(); 230 filesRead.addAll(this.current.getFilesRead()); 231 } 232 if (this.heap != null) { 233 // Order of closing the scanners shouldn't matter here, so simply iterate and close them. 234 for (KeyValueScanner scanner : heap) { 235 scanner.close(); 236 filesRead.addAll(scanner.getFilesRead()); 237 } 238 } 239 } 240 241 /** 242 * Returns the set of store file paths successfully read by the scanners in this heap. Populated 243 * as each scanner is closed (e.g. in close() or shipped()). 244 */ 245 @Override 246 public Set<Path> getFilesRead() { 247 return Collections.unmodifiableSet(filesRead); 248 } 249 250 /** 251 * Seeks all scanners at or below the specified seek key. If we earlied-out of a row, we may end 252 * up skipping values that were never reached yet. Rather than iterating down, we want to give the 253 * opportunity to re-seek. 254 * <p> 255 * As individual scanners may run past their ends, those scanners are automatically closed and 256 * removed from the heap. 257 * <p> 258 * This function (and {@link #reseek(ExtendedCell)}) does not do multi-column Bloom filter and 259 * lazy-seek optimizations. To enable those, call 260 * {@link #requestSeek(ExtendedCell, boolean, boolean)}. 261 * @param seekKey KeyValue to seek at or after 262 * @return true if KeyValues exist at or after specified key, false if not 263 */ 264 @Override 265 public boolean seek(ExtendedCell seekKey) throws IOException { 266 return generalizedSeek(false, // This is not a lazy seek 267 seekKey, false, // forward (false: this is not a reseek) 268 false); // Not using Bloom filters 269 } 270 271 /** 272 * This function is identical to the {@link #seek(ExtendedCell)} function except that 273 * scanner.seek(seekKey) is changed to scanner.reseek(seekKey). 274 */ 275 @Override 276 public boolean reseek(ExtendedCell seekKey) throws IOException { 277 return generalizedSeek(false, // This is not a lazy seek 278 seekKey, true, // forward (true because this is reseek) 279 false); // Not using Bloom filters 280 } 281 282 /** 283 * {@inheritDoc} 284 */ 285 @Override 286 public boolean requestSeek(ExtendedCell key, boolean forward, boolean useBloom) 287 throws IOException { 288 return generalizedSeek(true, key, forward, useBloom); 289 } 290 291 /** 292 * @param isLazy whether we are trying to seek to exactly the given row/col. Enables Bloom 293 * filter and most-recent-file-first optimizations for multi-column get/scan 294 * queries. 295 * @param seekKey key to seek to 296 * @param forward whether to seek forward (also known as reseek) 297 * @param useBloom whether to optimize seeks using Bloom filters 298 */ 299 private boolean generalizedSeek(boolean isLazy, ExtendedCell seekKey, boolean forward, 300 boolean useBloom) throws IOException { 301 if (!isLazy && useBloom) { 302 throw new IllegalArgumentException( 303 "Multi-column Bloom filter " + "optimization requires a lazy seek"); 304 } 305 306 if (current == null) { 307 return false; 308 } 309 310 KeyValueScanner scanner = current; 311 try { 312 while (scanner != null) { 313 Cell topKey = scanner.peek(); 314 if (comparator.getComparator().compare(seekKey, topKey) <= 0) { 315 // Top KeyValue is at-or-after Seek KeyValue. We only know that all 316 // scanners are at or after seekKey (because fake keys of 317 // scanners where a lazy-seek operation has been done are not greater 318 // than their real next keys) but we still need to enforce our 319 // invariant that the top scanner has done a real seek. This way 320 // StoreScanner and RegionScanner do not have to worry about fake 321 // keys. 322 heap.add(scanner); 323 scanner = null; 324 current = pollRealKV(); 325 return current != null; 326 } 327 328 boolean seekResult; 329 if (isLazy && heap.size() > 0) { 330 // If there is only one scanner left, we don't do lazy seek. 331 seekResult = scanner.requestSeek(seekKey, forward, useBloom); 332 } else { 333 seekResult = NonLazyKeyValueScanner.doRealSeek(scanner, seekKey, forward); 334 } 335 336 if (!seekResult) { 337 this.scannersForDelayedClose.add(scanner); 338 } else { 339 heap.add(scanner); 340 } 341 scanner = heap.poll(); 342 if (scanner == null) { 343 current = null; 344 } 345 } 346 } catch (Exception e) { 347 if (scanner != null) { 348 try { 349 scanner.close(); 350 } catch (Exception ce) { 351 LOG.warn("close KeyValueScanner error", ce); 352 } 353 } 354 throw e; 355 } 356 357 // Heap is returning empty, scanner is done 358 return false; 359 } 360 361 /** 362 * Fetches the top sub-scanner from the priority queue, ensuring that a real seek has been done on 363 * it. Works by fetching the top sub-scanner, and if it has not done a real seek, making it do so 364 * (which will modify its top KV), putting it back, and repeating this until success. Relies on 365 * the fact that on a lazy seek we set the current key of a StoreFileScanner to a KV that is not 366 * greater than the real next KV to be read from that file, so the scanner that bubbles up to the 367 * top of the heap will have global next KV in this scanner heap if (1) it has done a real seek 368 * and (2) its KV is the top among all top KVs (some of which are fake) in the scanner heap. 369 */ 370 protected KeyValueScanner pollRealKV() throws IOException { 371 KeyValueScanner kvScanner = heap.poll(); 372 if (kvScanner == null) { 373 return null; 374 } 375 376 while (kvScanner != null && !kvScanner.realSeekDone()) { 377 if (kvScanner.peek() != null) { 378 try { 379 kvScanner.enforceSeek(); 380 } catch (IOException ioe) { 381 // Add the item to delayed close set in case it is leak from close 382 this.scannersForDelayedClose.add(kvScanner); 383 throw ioe; 384 } 385 Cell curKV = kvScanner.peek(); 386 if (curKV != null) { 387 KeyValueScanner nextEarliestScanner = heap.peek(); 388 if (nextEarliestScanner == null) { 389 // The heap is empty. Return the only possible scanner. 390 return kvScanner; 391 } 392 393 // Compare the current scanner to the next scanner. We try to avoid 394 // putting the current one back into the heap if possible. 395 Cell nextKV = nextEarliestScanner.peek(); 396 if (nextKV == null || comparator.compare(curKV, nextKV) < 0) { 397 // We already have the scanner with the earliest KV, so return it. 398 return kvScanner; 399 } 400 401 // Otherwise, put the scanner back into the heap and let it compete 402 // against all other scanners (both those that have done a "real 403 // seek" and a "lazy seek"). 404 heap.add(kvScanner); 405 } else { 406 // Close the scanner because we did a real seek and found out there 407 // are no more KVs. 408 this.scannersForDelayedClose.add(kvScanner); 409 } 410 } else { 411 // Close the scanner because it has already run out of KVs even before 412 // we had to do a real seek on it. 413 this.scannersForDelayedClose.add(kvScanner); 414 } 415 kvScanner = heap.poll(); 416 } 417 418 return kvScanner; 419 } 420 421 /** Returns the current Heap */ 422 public PriorityQueue<KeyValueScanner> getHeap() { 423 return this.heap; 424 } 425 426 KeyValueScanner getCurrentForTesting() { 427 return current; 428 } 429 430 @Override 431 public ExtendedCell getNextIndexedKey() { 432 // here we return the next index key from the top scanner 433 return current == null ? null : current.getNextIndexedKey(); 434 } 435 436 @Override 437 public void shipped() throws IOException { 438 for (KeyValueScanner scanner : this.scannersForDelayedClose) { 439 scanner.close(); // There wont be further fetch of Cells from these scanners. Just close. 440 filesRead.addAll(scanner.getFilesRead()); 441 } 442 this.scannersForDelayedClose.clear(); 443 if (this.current != null) { 444 this.current.shipped(); 445 } 446 if (this.heap != null) { 447 for (KeyValueScanner scanner : this.heap) { 448 scanner.shipped(); 449 } 450 } 451 } 452}