001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.regionserver; 021 022import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Comparator; 027import java.util.List; 028import java.util.PriorityQueue; 029 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellComparator; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 036 037/** 038 * Implements a heap merge across any number of KeyValueScanners. 039 * <p> 040 * Implements KeyValueScanner itself. 041 * <p> 042 * This class is used at the Region level to merge across Stores 043 * and at the Store level to merge across the memstore and StoreFiles. 044 * <p> 045 * In the Region case, we also need InternalScanner.next(List), so this class 046 * also implements InternalScanner. WARNING: As is, if you try to use this 047 * as an InternalScanner at the Store level, you will get runtime exceptions. 048 */ 049@InterfaceAudience.Private 050public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner 051 implements KeyValueScanner, InternalScanner { 052 private static final Logger LOG = LoggerFactory.getLogger(KeyValueHeap.class); 053 protected PriorityQueue<KeyValueScanner> heap = null; 054 // Holds the scanners when a ever a eager close() happens. All such eagerly closed 055 // scans are collected and when the final scanner.close() happens will perform the 056 // actual close. 057 protected List<KeyValueScanner> scannersForDelayedClose = null; 058 059 /** 060 * The current sub-scanner, i.e. the one that contains the next key/value 061 * to return to the client. This scanner is NOT included in {@link #heap} 062 * (but we frequently add it back to the heap and pull the new winner out). 063 * We maintain an invariant that the current sub-scanner has already done 064 * a real seek, and that current.peek() is always a real key/value (or null) 065 * except for the fake last-key-on-row-column supplied by the multi-column 066 * Bloom filter optimization, which is OK to propagate to StoreScanner. In 067 * order to ensure that, always use {@link #pollRealKV()} to update current. 068 */ 069 protected KeyValueScanner current = null; 070 071 protected KVScannerComparator comparator; 072 073 /** 074 * Constructor. This KeyValueHeap will handle closing of passed in 075 * KeyValueScanners. 076 * @param scanners 077 * @param comparator 078 */ 079 public KeyValueHeap(List<? extends KeyValueScanner> scanners, 080 CellComparator comparator) throws IOException { 081 this(scanners, new KVScannerComparator(comparator)); 082 } 083 084 /** 085 * Constructor. 086 * @param scanners 087 * @param comparator 088 * @throws IOException 089 */ 090 KeyValueHeap(List<? extends KeyValueScanner> scanners, 091 KVScannerComparator comparator) throws IOException { 092 this.comparator = comparator; 093 this.scannersForDelayedClose = new ArrayList<>(scanners.size()); 094 if (!scanners.isEmpty()) { 095 this.heap = new PriorityQueue<>(scanners.size(), this.comparator); 096 for (KeyValueScanner scanner : scanners) { 097 if (scanner.peek() != null) { 098 this.heap.add(scanner); 099 } else { 100 this.scannersForDelayedClose.add(scanner); 101 } 102 } 103 this.current = pollRealKV(); 104 } 105 } 106 107 @Override 108 public Cell peek() { 109 if (this.current == null) { 110 return null; 111 } 112 return this.current.peek(); 113 } 114 115 boolean isLatestCellFromMemstore() { 116 return !this.current.isFileScanner(); 117 } 118 119 @Override 120 public Cell next() throws IOException { 121 if(this.current == null) { 122 return null; 123 } 124 Cell kvReturn = this.current.next(); 125 Cell kvNext = this.current.peek(); 126 if (kvNext == null) { 127 this.scannersForDelayedClose.add(this.current); 128 this.current = null; 129 this.current = pollRealKV(); 130 } else { 131 KeyValueScanner topScanner = this.heap.peek(); 132 // no need to add current back to the heap if it is the only scanner left 133 if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) { 134 this.heap.add(this.current); 135 this.current = null; 136 this.current = pollRealKV(); 137 } 138 } 139 return kvReturn; 140 } 141 142 /** 143 * Gets the next row of keys from the top-most scanner. 144 * <p> 145 * This method takes care of updating the heap. 146 * <p> 147 * This can ONLY be called when you are using Scanners that implement InternalScanner as well as 148 * KeyValueScanner (a {@link StoreScanner}). 149 * @return true if more rows exist after this one, false if scanner is done 150 */ 151 @Override 152 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 153 if (this.current == null) { 154 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 155 } 156 InternalScanner currentAsInternal = (InternalScanner)this.current; 157 boolean moreCells = currentAsInternal.next(result, scannerContext); 158 Cell pee = this.current.peek(); 159 160 /* 161 * By definition, any InternalScanner must return false only when it has no 162 * further rows to be fetched. So, we can close a scanner if it returns 163 * false. All existing implementations seem to be fine with this. It is much 164 * more efficient to close scanners which are not needed than keep them in 165 * the heap. This is also required for certain optimizations. 166 */ 167 168 if (pee == null || !moreCells) { 169 // add the scanner that is to be closed 170 this.scannersForDelayedClose.add(this.current); 171 } else { 172 this.heap.add(this.current); 173 } 174 this.current = null; 175 this.current = pollRealKV(); 176 if (this.current == null) { 177 moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 178 } 179 return moreCells; 180 } 181 182 protected static class KVScannerComparator implements Comparator<KeyValueScanner> { 183 protected CellComparator kvComparator; 184 /** 185 * Constructor 186 * @param kvComparator 187 */ 188 public KVScannerComparator(CellComparator kvComparator) { 189 this.kvComparator = kvComparator; 190 } 191 192 @Override 193 public int compare(KeyValueScanner left, KeyValueScanner right) { 194 int comparison = compare(left.peek(), right.peek()); 195 if (comparison != 0) { 196 return comparison; 197 } else { 198 // Since both the keys are exactly the same, we break the tie in favor of higher ordered 199 // scanner since it'll have newer data. Since higher value should come first, we reverse 200 // sort here. 201 return Long.compare(right.getScannerOrder(), left.getScannerOrder()); 202 } 203 } 204 /** 205 * Compares two KeyValue 206 * @param left 207 * @param right 208 * @return less than 0 if left is smaller, 0 if equal etc.. 209 */ 210 public int compare(Cell left, Cell right) { 211 return this.kvComparator.compare(left, right); 212 } 213 /** 214 * @return KVComparator 215 */ 216 public CellComparator getComparator() { 217 return this.kvComparator; 218 } 219 } 220 221 @Override 222 public void close() { 223 for (KeyValueScanner scanner : this.scannersForDelayedClose) { 224 scanner.close(); 225 } 226 this.scannersForDelayedClose.clear(); 227 if (this.current != null) { 228 this.current.close(); 229 } 230 if (this.heap != null) { 231 // Order of closing the scanners shouldn't matter here, so simply iterate and close them. 232 for (KeyValueScanner scanner : heap) { 233 scanner.close(); 234 } 235 } 236 } 237 238 /** 239 * Seeks all scanners at or below the specified seek key. If we earlied-out 240 * of a row, we may end up skipping values that were never reached yet. 241 * Rather than iterating down, we want to give the opportunity to re-seek. 242 * <p> 243 * As individual scanners may run past their ends, those scanners are 244 * automatically closed and removed from the heap. 245 * <p> 246 * This function (and {@link #reseek(Cell)}) does not do multi-column 247 * Bloom filter and lazy-seek optimizations. To enable those, call 248 * {@link #requestSeek(Cell, boolean, boolean)}. 249 * @param seekKey KeyValue to seek at or after 250 * @return true if KeyValues exist at or after specified key, false if not 251 * @throws IOException 252 */ 253 @Override 254 public boolean seek(Cell seekKey) throws IOException { 255 return generalizedSeek(false, // This is not a lazy seek 256 seekKey, 257 false, // forward (false: this is not a reseek) 258 false); // Not using Bloom filters 259 } 260 261 /** 262 * This function is identical to the {@link #seek(Cell)} function except 263 * that scanner.seek(seekKey) is changed to scanner.reseek(seekKey). 264 */ 265 @Override 266 public boolean reseek(Cell seekKey) throws IOException { 267 return generalizedSeek(false, // This is not a lazy seek 268 seekKey, 269 true, // forward (true because this is reseek) 270 false); // Not using Bloom filters 271 } 272 273 /** 274 * {@inheritDoc} 275 */ 276 @Override 277 public boolean requestSeek(Cell key, boolean forward, 278 boolean useBloom) throws IOException { 279 return generalizedSeek(true, key, forward, useBloom); 280 } 281 282 /** 283 * @param isLazy whether we are trying to seek to exactly the given row/col. 284 * Enables Bloom filter and most-recent-file-first optimizations for 285 * multi-column get/scan queries. 286 * @param seekKey key to seek to 287 * @param forward whether to seek forward (also known as reseek) 288 * @param useBloom whether to optimize seeks using Bloom filters 289 */ 290 private boolean generalizedSeek(boolean isLazy, Cell seekKey, 291 boolean forward, boolean useBloom) throws IOException { 292 if (!isLazy && useBloom) { 293 throw new IllegalArgumentException("Multi-column Bloom filter " + 294 "optimization requires a lazy seek"); 295 } 296 297 if (current == null) { 298 return false; 299 } 300 301 KeyValueScanner scanner = current; 302 try { 303 while (scanner != null) { 304 Cell topKey = scanner.peek(); 305 if (comparator.getComparator().compare(seekKey, topKey) <= 0) { 306 // Top KeyValue is at-or-after Seek KeyValue. We only know that all 307 // scanners are at or after seekKey (because fake keys of 308 // scanners where a lazy-seek operation has been done are not greater 309 // than their real next keys) but we still need to enforce our 310 // invariant that the top scanner has done a real seek. This way 311 // StoreScanner and RegionScanner do not have to worry about fake 312 // keys. 313 heap.add(scanner); 314 scanner = null; 315 current = pollRealKV(); 316 return current != null; 317 } 318 319 boolean seekResult; 320 if (isLazy && heap.size() > 0) { 321 // If there is only one scanner left, we don't do lazy seek. 322 seekResult = scanner.requestSeek(seekKey, forward, useBloom); 323 } else { 324 seekResult = NonLazyKeyValueScanner.doRealSeek(scanner, seekKey, 325 forward); 326 } 327 328 if (!seekResult) { 329 this.scannersForDelayedClose.add(scanner); 330 } else { 331 heap.add(scanner); 332 } 333 scanner = heap.poll(); 334 if (scanner == null) { 335 current = null; 336 } 337 } 338 } catch (Exception e) { 339 if (scanner != null) { 340 try { 341 scanner.close(); 342 } catch (Exception ce) { 343 LOG.warn("close KeyValueScanner error", ce); 344 } 345 } 346 throw e; 347 } 348 349 // Heap is returning empty, scanner is done 350 return false; 351 } 352 353 /** 354 * Fetches the top sub-scanner from the priority queue, ensuring that a real 355 * seek has been done on it. Works by fetching the top sub-scanner, and if it 356 * has not done a real seek, making it do so (which will modify its top KV), 357 * putting it back, and repeating this until success. Relies on the fact that 358 * on a lazy seek we set the current key of a StoreFileScanner to a KV that 359 * is not greater than the real next KV to be read from that file, so the 360 * scanner that bubbles up to the top of the heap will have global next KV in 361 * this scanner heap if (1) it has done a real seek and (2) its KV is the top 362 * among all top KVs (some of which are fake) in the scanner heap. 363 */ 364 protected KeyValueScanner pollRealKV() throws IOException { 365 KeyValueScanner kvScanner = heap.poll(); 366 if (kvScanner == null) { 367 return null; 368 } 369 370 while (kvScanner != null && !kvScanner.realSeekDone()) { 371 if (kvScanner.peek() != null) { 372 try { 373 kvScanner.enforceSeek(); 374 } catch (IOException ioe) { 375 // Add the item to delayed close set in case it is leak from close 376 this.scannersForDelayedClose.add(kvScanner); 377 throw ioe; 378 } 379 Cell curKV = kvScanner.peek(); 380 if (curKV != null) { 381 KeyValueScanner nextEarliestScanner = heap.peek(); 382 if (nextEarliestScanner == null) { 383 // The heap is empty. Return the only possible scanner. 384 return kvScanner; 385 } 386 387 // Compare the current scanner to the next scanner. We try to avoid 388 // putting the current one back into the heap if possible. 389 Cell nextKV = nextEarliestScanner.peek(); 390 if (nextKV == null || comparator.compare(curKV, nextKV) < 0) { 391 // We already have the scanner with the earliest KV, so return it. 392 return kvScanner; 393 } 394 395 // Otherwise, put the scanner back into the heap and let it compete 396 // against all other scanners (both those that have done a "real 397 // seek" and a "lazy seek"). 398 heap.add(kvScanner); 399 } else { 400 // Close the scanner because we did a real seek and found out there 401 // are no more KVs. 402 this.scannersForDelayedClose.add(kvScanner); 403 } 404 } else { 405 // Close the scanner because it has already run out of KVs even before 406 // we had to do a real seek on it. 407 this.scannersForDelayedClose.add(kvScanner); 408 } 409 kvScanner = heap.poll(); 410 } 411 412 return kvScanner; 413 } 414 415 /** 416 * @return the current Heap 417 */ 418 public PriorityQueue<KeyValueScanner> getHeap() { 419 return this.heap; 420 } 421 422 423 @VisibleForTesting 424 KeyValueScanner getCurrentForTesting() { 425 return current; 426 } 427 428 @Override 429 public Cell getNextIndexedKey() { 430 // here we return the next index key from the top scanner 431 return current == null ? null : current.getNextIndexedKey(); 432 } 433 434 @Override 435 public void shipped() throws IOException { 436 for (KeyValueScanner scanner : this.scannersForDelayedClose) { 437 scanner.close(); // There wont be further fetch of Cells from these scanners. Just close. 438 } 439 this.scannersForDelayedClose.clear(); 440 if (this.current != null) { 441 this.current.shipped(); 442 } 443 if (this.heap != null) { 444 for (KeyValueScanner scanner : this.heap) { 445 scanner.shipped(); 446 } 447 } 448 } 449}