001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertThrows; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.List; 029import java.util.Set; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellComparatorImpl; 033import org.apache.hadoop.hbase.ExtendedCell; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.KeyValue; 036import org.apache.hadoop.hbase.testclassification.RegionServerTests; 037import org.apache.hadoop.hbase.testclassification.SmallTests; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.CollectionBackedScanner; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043 044@Category({ RegionServerTests.class, SmallTests.class }) 045public class TestKeyValueHeap { 046 047 @ClassRule 048 public static final HBaseClassTestRule CLASS_RULE = 049 HBaseClassTestRule.forClass(TestKeyValueHeap.class); 050 051 private byte[] row1 = Bytes.toBytes("row1"); 052 private byte[] fam1 = Bytes.toBytes("fam1"); 053 private byte[] col1 = Bytes.toBytes("col1"); 054 private byte[] data = Bytes.toBytes("data"); 055 056 private byte[] row2 = Bytes.toBytes("row2"); 057 private byte[] fam2 = Bytes.toBytes("fam2"); 058 private byte[] col2 = Bytes.toBytes("col2"); 059 060 private byte[] col3 = Bytes.toBytes("col3"); 061 private byte[] col4 = Bytes.toBytes("col4"); 062 private byte[] col5 = Bytes.toBytes("col5"); 063 064 // Variable name encoding. kv<row#><fam#><col#> 065 ExtendedCell kv111 = new KeyValue(row1, fam1, col1, data); 066 ExtendedCell kv112 = new KeyValue(row1, fam1, col2, data); 067 ExtendedCell kv113 = new KeyValue(row1, fam1, col3, data); 068 ExtendedCell kv114 = new KeyValue(row1, fam1, col4, data); 069 ExtendedCell kv115 = new KeyValue(row1, fam1, col5, data); 070 ExtendedCell kv121 = new KeyValue(row1, fam2, col1, data); 071 ExtendedCell kv122 = new KeyValue(row1, fam2, col2, data); 072 ExtendedCell kv211 = new KeyValue(row2, fam1, col1, data); 073 ExtendedCell kv212 = new KeyValue(row2, fam1, col2, data); 074 ExtendedCell kv213 = new KeyValue(row2, fam1, col3, data); 075 076 TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212)); 077 TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112)); 078 TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); 079 080 List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3)); 081 082 /* 083 * Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned Cells 084 * are same as {@code expected}. 085 * @return List of Cells returned from scanners. 086 */ 087 public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> scanners) 088 throws IOException { 089 // Creating KeyValueHeap 090 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 091 List<Cell> actual = new ArrayList<>(); 092 while (kvh.peek() != null) { 093 actual.add(kvh.next()); 094 } 095 096 assertEquals(expected, actual); 097 return actual; 098 } 099 } 100 101 @Test 102 public void testSorted() throws IOException { 103 // Cases that need to be checked are: 104 // 1. The "smallest" Cell is in the same scanners as current 105 // 2. Current scanner gets empty 106 107 List<Cell> expected = 108 Arrays.asList(kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213); 109 110 List<Cell> actual = assertCells(expected, scanners); 111 112 // Check if result is sorted according to Comparator 113 for (int i = 0; i < actual.size() - 1; i++) { 114 int ret = CellComparatorImpl.COMPARATOR.compare(actual.get(i), actual.get(i + 1)); 115 assertTrue(ret < 0); 116 } 117 } 118 119 @Test 120 public void testSeek() throws IOException { 121 // Cases: 122 // 1. Seek Cell that is not in scanner 123 // 2. Check that smallest that is returned from a seek is correct 124 List<Cell> expected = Arrays.asList(kv211); 125 126 // Creating KeyValueHeap 127 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 128 ExtendedCell seekKv = new KeyValue(row2, fam1, null, null); 129 kvh.seek(seekKv); 130 131 List<Cell> actual = Arrays.asList(kvh.peek()); 132 133 assertEquals("Expected = " + Arrays.toString(expected.toArray()) + "\n Actual = " 134 + Arrays.toString(actual.toArray()), expected, actual); 135 } 136 } 137 138 @Test 139 public void testScannerLeak() throws IOException { 140 // Test for unclosed scanners (HBASE-1927) 141 142 TestScanner s4 = new TestScanner(new ArrayList<>()); 143 scanners.add(s4); 144 145 // Creating KeyValueHeap 146 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 147 for (;;) { 148 if (kvh.next() == null) { 149 break; 150 } 151 } 152 // Once the internal scanners go out of Cells, those will be removed from KVHeap's priority 153 // queue and added to a Set for lazy close. The actual close will happen only on 154 // KVHeap#close() 155 assertEquals(4, kvh.scannersForDelayedClose.size()); 156 assertTrue(kvh.scannersForDelayedClose.contains(s1)); 157 assertTrue(kvh.scannersForDelayedClose.contains(s2)); 158 assertTrue(kvh.scannersForDelayedClose.contains(s3)); 159 assertTrue(kvh.scannersForDelayedClose.contains(s4)); 160 } 161 162 for (KeyValueScanner scanner : scanners) { 163 assertTrue(((TestScanner) scanner).isClosed()); 164 } 165 } 166 167 @Test 168 public void testScannerException() throws IOException { 169 // Test for NPE issue when exception happens in scanners (HBASE-13835) 170 171 TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212)); 172 TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112)); 173 TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); 174 TestScanner s4 = new SeekTestScanner(new ArrayList<>()); 175 176 List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3, s4)); 177 178 // Creating KeyValueHeap 179 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 180 for (KeyValueScanner scanner : scanners) { 181 ((SeekTestScanner) scanner).setRealSeekDone(false); 182 } 183 // The pollRealKV should throw IOE. 184 assertThrows(IOException.class, () -> { 185 for (;;) { 186 if (kvh.next() == null) { 187 break; 188 } 189 } 190 }); 191 } 192 // It implies there is no NPE thrown from kvh.close() if getting here 193 for (KeyValueScanner scanner : scanners) { 194 // Verify that close is called and only called once for each scanner 195 assertTrue(((SeekTestScanner) scanner).isClosed()); 196 assertEquals(1, ((SeekTestScanner) scanner).getClosedNum()); 197 } 198 } 199 200 @Test 201 public void testPriorityId() throws IOException { 202 ExtendedCell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa")); 203 ExtendedCell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb")); 204 TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1); 205 TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2); 206 List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A); 207 assertCells(expected, Arrays.asList(scan1, scan2)); 208 209 scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2); 210 scan2 = new TestScanner(Arrays.asList(kv113B), 1); 211 expected = Arrays.asList(kv111, kv112, kv113A, kv113B); 212 assertCells(expected, Arrays.asList(scan1, scan2)); 213 } 214 215 @Test 216 public void testGetFilesRead() throws IOException { 217 // Create test scanners with file paths 218 Path file1 = new Path("/test/file1"); 219 Path file2 = new Path("/test/file2"); 220 Path file3 = new Path("/test/file3"); 221 222 FileTrackingScanner scanner1 = 223 new FileTrackingScanner(Arrays.asList(kv115, kv211, kv212), file1); 224 FileTrackingScanner scanner2 = new FileTrackingScanner(Arrays.asList(kv111, kv112), file2); 225 FileTrackingScanner scanner3 = 226 new FileTrackingScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213), file3); 227 228 // Add a non-file-based scanner (e.g., memstore scanner) that doesn't return files 229 TestScanner memStoreScanner = new TestScanner(Arrays.asList(kv114)); 230 231 List<KeyValueScanner> scanners = 232 new ArrayList<>(Arrays.asList(scanner1, scanner2, scanner3, memStoreScanner)); 233 234 // Create KeyValueHeap and scan through all cells 235 KeyValueHeap keyValueHeap = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR); 236 237 // Before closing, should return empty set even after scanning 238 // Scan through all cells first 239 while (keyValueHeap.peek() != null) { 240 keyValueHeap.next(); 241 } 242 243 // Verify that before closing, files are not returned 244 Set<Path> filesReadBeforeClose = keyValueHeap.getFilesRead(); 245 assertTrue("Should return empty set before closing heap", filesReadBeforeClose.isEmpty()); 246 assertEquals("Should have 0 files before closing", 0, filesReadBeforeClose.size()); 247 248 // Now close the heap 249 keyValueHeap.close(); 250 251 // After closing, should return all files from file-based scanners only 252 // Non-file-based scanners (like memstore) should not contribute files 253 Set<Path> filesReadAfterClose = keyValueHeap.getFilesRead(); 254 assertEquals("Should return set with 3 file paths after closing (excluding non-file scanner)", 255 3, filesReadAfterClose.size()); 256 assertTrue("Should contain file1", filesReadAfterClose.contains(file1)); 257 assertTrue("Should contain file2", filesReadAfterClose.contains(file2)); 258 assertTrue("Should contain file3", filesReadAfterClose.contains(file3)); 259 260 // Verify that non-file-based scanner doesn't contribute any files 261 // (memStoreScanner.getFilesRead() should return empty set) 262 Set<Path> memStoreFiles = memStoreScanner.getFilesRead(); 263 assertTrue("Non-file-based scanner should return empty set", memStoreFiles.isEmpty()); 264 } 265 266 private static class TestScanner extends CollectionBackedScanner { 267 private boolean closed = false; 268 private long scannerOrder = 0; 269 270 public TestScanner(List<ExtendedCell> list) { 271 super(list); 272 } 273 274 public TestScanner(List<ExtendedCell> list, long scannerOrder) { 275 this(list); 276 this.scannerOrder = scannerOrder; 277 } 278 279 @Override 280 public long getScannerOrder() { 281 return scannerOrder; 282 } 283 284 @Override 285 public void close() { 286 closed = true; 287 } 288 289 public boolean isClosed() { 290 return closed; 291 } 292 } 293 294 private static class SeekTestScanner extends TestScanner { 295 private int closedNum = 0; 296 private boolean realSeekDone = true; 297 298 public SeekTestScanner(List<ExtendedCell> list) { 299 super(list); 300 } 301 302 @Override 303 public void close() { 304 super.close(); 305 closedNum++; 306 } 307 308 public int getClosedNum() { 309 return closedNum; 310 } 311 312 @Override 313 public boolean realSeekDone() { 314 return realSeekDone; 315 } 316 317 public void setRealSeekDone(boolean done) { 318 realSeekDone = done; 319 } 320 321 @Override 322 public void enforceSeek() throws IOException { 323 throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner"); 324 } 325 } 326 327 private static class FileTrackingScanner extends TestScanner { 328 private final Path filePath; 329 private boolean closed = false; 330 331 public FileTrackingScanner(List<ExtendedCell> list, Path filePath) { 332 super(list); 333 this.filePath = filePath; 334 } 335 336 @Override 337 public void close() { 338 super.close(); 339 closed = true; 340 } 341 342 @Override 343 public Set<Path> getFilesRead() { 344 // Only return the file path after the scanner is closed 345 return closed ? Collections.singleton(filePath) : Collections.emptySet(); 346 } 347 } 348}