001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertThrows; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.List; 029import java.util.Set; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellComparatorImpl; 033import org.apache.hadoop.hbase.ExtendedCell; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.testclassification.RegionServerTests; 036import org.apache.hadoop.hbase.testclassification.SmallTests; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.CollectionBackedScanner; 039import org.junit.jupiter.api.Tag; 040import org.junit.jupiter.api.Test; 041 042@Tag(RegionServerTests.TAG) 043@Tag(SmallTests.TAG) 044public class TestKeyValueHeap { 045 046 private byte[] row1 = Bytes.toBytes("row1"); 047 private byte[] fam1 = Bytes.toBytes("fam1"); 048 private byte[] col1 = Bytes.toBytes("col1"); 049 private byte[] data = Bytes.toBytes("data"); 050 051 private byte[] row2 = Bytes.toBytes("row2"); 052 private byte[] fam2 = Bytes.toBytes("fam2"); 053 private byte[] col2 = Bytes.toBytes("col2"); 054 055 private byte[] col3 = Bytes.toBytes("col3"); 056 private byte[] col4 = Bytes.toBytes("col4"); 057 private byte[] col5 = Bytes.toBytes("col5"); 058 059 // Variable name encoding. kv<row#><fam#><col#> 060 ExtendedCell kv111 = new KeyValue(row1, fam1, col1, data); 061 ExtendedCell kv112 = new KeyValue(row1, fam1, col2, data); 062 ExtendedCell kv113 = new KeyValue(row1, fam1, col3, data); 063 ExtendedCell kv114 = new KeyValue(row1, fam1, col4, data); 064 ExtendedCell kv115 = new KeyValue(row1, fam1, col5, data); 065 ExtendedCell kv121 = new KeyValue(row1, fam2, col1, data); 066 ExtendedCell kv122 = new KeyValue(row1, fam2, col2, data); 067 ExtendedCell kv211 = new KeyValue(row2, fam1, col1, data); 068 ExtendedCell kv212 = new KeyValue(row2, fam1, col2, data); 069 ExtendedCell kv213 = new KeyValue(row2, fam1, col3, data); 070 071 TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212)); 072 TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112)); 073 TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); 074 075 List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3)); 076 077 /* 078 * Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned Cells 079 * are same as {@code expected}. 080 * @return List of Cells returned from scanners. 081 */ 082 public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> scanners) 083 throws IOException { 084 // Creating KeyValueHeap 085 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 086 List<Cell> actual = new ArrayList<>(); 087 while (kvh.peek() != null) { 088 actual.add(kvh.next()); 089 } 090 091 assertEquals(expected, actual); 092 return actual; 093 } 094 } 095 096 @Test 097 public void testSorted() throws IOException { 098 // Cases that need to be checked are: 099 // 1. The "smallest" Cell is in the same scanners as current 100 // 2. Current scanner gets empty 101 102 List<Cell> expected = 103 Arrays.asList(kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213); 104 105 List<Cell> actual = assertCells(expected, scanners); 106 107 // Check if result is sorted according to Comparator 108 for (int i = 0; i < actual.size() - 1; i++) { 109 int ret = CellComparatorImpl.COMPARATOR.compare(actual.get(i), actual.get(i + 1)); 110 assertTrue(ret < 0); 111 } 112 } 113 114 @Test 115 public void testSeek() throws IOException { 116 // Cases: 117 // 1. Seek Cell that is not in scanner 118 // 2. Check that smallest that is returned from a seek is correct 119 List<Cell> expected = Arrays.asList(kv211); 120 121 // Creating KeyValueHeap 122 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 123 ExtendedCell seekKv = new KeyValue(row2, fam1, null, null); 124 kvh.seek(seekKv); 125 126 List<Cell> actual = Arrays.asList(kvh.peek()); 127 128 assertEquals(expected, actual, "Expected = " + Arrays.toString(expected.toArray()) 129 + "\n Actual = " + Arrays.toString(actual.toArray())); 130 } 131 } 132 133 @Test 134 public void testScannerLeak() throws IOException { 135 // Test for unclosed scanners (HBASE-1927) 136 137 TestScanner s4 = new TestScanner(new ArrayList<>()); 138 scanners.add(s4); 139 140 // Creating KeyValueHeap 141 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 142 for (;;) { 143 if (kvh.next() == null) { 144 break; 145 } 146 } 147 // Once the internal scanners go out of Cells, those will be removed from KVHeap's priority 148 // queue and added to a Set for lazy close. The actual close will happen only on 149 // KVHeap#close() 150 assertEquals(4, kvh.scannersForDelayedClose.size()); 151 assertTrue(kvh.scannersForDelayedClose.contains(s1)); 152 assertTrue(kvh.scannersForDelayedClose.contains(s2)); 153 assertTrue(kvh.scannersForDelayedClose.contains(s3)); 154 assertTrue(kvh.scannersForDelayedClose.contains(s4)); 155 } 156 157 for (KeyValueScanner scanner : scanners) { 158 assertTrue(((TestScanner) scanner).isClosed()); 159 } 160 } 161 162 @Test 163 public void testScannerException() throws IOException { 164 // Test for NPE issue when exception happens in scanners (HBASE-13835) 165 166 TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212)); 167 TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112)); 168 TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); 169 TestScanner s4 = new SeekTestScanner(new ArrayList<>()); 170 171 List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3, s4)); 172 173 // Creating KeyValueHeap 174 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 175 for (KeyValueScanner scanner : scanners) { 176 ((SeekTestScanner) scanner).setRealSeekDone(false); 177 } 178 // The pollRealKV should throw IOE. 179 assertThrows(IOException.class, () -> { 180 for (;;) { 181 if (kvh.next() == null) { 182 break; 183 } 184 } 185 }); 186 } 187 // It implies there is no NPE thrown from kvh.close() if getting here 188 for (KeyValueScanner scanner : scanners) { 189 // Verify that close is called and only called once for each scanner 190 assertTrue(((SeekTestScanner) scanner).isClosed()); 191 assertEquals(1, ((SeekTestScanner) scanner).getClosedNum()); 192 } 193 } 194 195 @Test 196 public void testPriorityId() throws IOException { 197 ExtendedCell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa")); 198 ExtendedCell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb")); 199 TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1); 200 TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2); 201 List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A); 202 assertCells(expected, Arrays.asList(scan1, scan2)); 203 204 scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2); 205 scan2 = new TestScanner(Arrays.asList(kv113B), 1); 206 expected = Arrays.asList(kv111, kv112, kv113A, kv113B); 207 assertCells(expected, Arrays.asList(scan1, scan2)); 208 } 209 210 @Test 211 public void testGetFilesRead() throws IOException { 212 // Create test scanners with file paths 213 Path file1 = new Path("/test/file1"); 214 Path file2 = new Path("/test/file2"); 215 Path file3 = new Path("/test/file3"); 216 217 FileTrackingScanner scanner1 = 218 new FileTrackingScanner(Arrays.asList(kv115, kv211, kv212), file1); 219 FileTrackingScanner scanner2 = new FileTrackingScanner(Arrays.asList(kv111, kv112), file2); 220 FileTrackingScanner scanner3 = 221 new FileTrackingScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213), file3); 222 223 // Add a non-file-based scanner (e.g., memstore scanner) that doesn't return files 224 TestScanner memStoreScanner = new TestScanner(Arrays.asList(kv114)); 225 226 List<KeyValueScanner> scanners = 227 new ArrayList<>(Arrays.asList(scanner1, scanner2, scanner3, memStoreScanner)); 228 229 // Create KeyValueHeap and scan through all cells 230 KeyValueHeap keyValueHeap = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR); 231 232 // Before closing, should return empty set even after scanning 233 // Scan through all cells first 234 while (keyValueHeap.peek() != null) { 235 keyValueHeap.next(); 236 } 237 238 // Verify that before closing, files are not returned 239 Set<Path> filesReadBeforeClose = keyValueHeap.getFilesRead(); 240 assertTrue(filesReadBeforeClose.isEmpty(), "Should return empty set before closing heap"); 241 assertEquals(0, filesReadBeforeClose.size(), "Should have 0 files before closing"); 242 243 // Now close the heap 244 keyValueHeap.close(); 245 246 // After closing, should return all files from file-based scanners only 247 // Non-file-based scanners (like memstore) should not contribute files 248 Set<Path> filesReadAfterClose = keyValueHeap.getFilesRead(); 249 assertEquals(3, filesReadAfterClose.size(), 250 "Should return set with 3 file paths after closing (excluding non-file scanner)"); 251 assertTrue(filesReadAfterClose.contains(file1), "Should contain file1"); 252 assertTrue(filesReadAfterClose.contains(file2), "Should contain file2"); 253 assertTrue(filesReadAfterClose.contains(file3), "Should contain file3"); 254 255 // Verify that non-file-based scanner doesn't contribute any files 256 // (memStoreScanner.getFilesRead() should return empty set) 257 Set<Path> memStoreFiles = memStoreScanner.getFilesRead(); 258 assertTrue(memStoreFiles.isEmpty(), "Non-file-based scanner should return empty set"); 259 } 260 261 private static class TestScanner extends CollectionBackedScanner { 262 private boolean closed = false; 263 private long scannerOrder = 0; 264 265 public TestScanner(List<ExtendedCell> list) { 266 super(list); 267 } 268 269 public TestScanner(List<ExtendedCell> list, long scannerOrder) { 270 this(list); 271 this.scannerOrder = scannerOrder; 272 } 273 274 @Override 275 public long getScannerOrder() { 276 return scannerOrder; 277 } 278 279 @Override 280 public void close() { 281 closed = true; 282 } 283 284 public boolean isClosed() { 285 return closed; 286 } 287 } 288 289 private static class SeekTestScanner extends TestScanner { 290 private int closedNum = 0; 291 private boolean realSeekDone = true; 292 293 public SeekTestScanner(List<ExtendedCell> list) { 294 super(list); 295 } 296 297 @Override 298 public void close() { 299 super.close(); 300 closedNum++; 301 } 302 303 public int getClosedNum() { 304 return closedNum; 305 } 306 307 @Override 308 public boolean realSeekDone() { 309 return realSeekDone; 310 } 311 312 public void setRealSeekDone(boolean done) { 313 realSeekDone = done; 314 } 315 316 @Override 317 public void enforceSeek() throws IOException { 318 throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner"); 319 } 320 } 321 322 private static class FileTrackingScanner extends TestScanner { 323 private final Path filePath; 324 private boolean closed = false; 325 326 public FileTrackingScanner(List<ExtendedCell> list, Path filePath) { 327 super(list); 328 this.filePath = filePath; 329 } 330 331 @Override 332 public void close() { 333 super.close(); 334 closed = true; 335 } 336 337 @Override 338 public Set<Path> getFilesRead() { 339 // Only return the file path after the scanner is closed 340 return closed ? Collections.singleton(filePath) : Collections.emptySet(); 341 } 342 } 343}