001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertThrows; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.List; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparatorImpl; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.testclassification.RegionServerTests; 033import org.apache.hadoop.hbase.testclassification.SmallTests; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.CollectionBackedScanner; 036import org.junit.ClassRule; 037import org.junit.Test; 038import org.junit.experimental.categories.Category; 039 040@Category({ RegionServerTests.class, SmallTests.class }) 041public class TestKeyValueHeap { 042 043 @ClassRule 044 public static final HBaseClassTestRule CLASS_RULE = 045 HBaseClassTestRule.forClass(TestKeyValueHeap.class); 046 047 private byte[] row1 = Bytes.toBytes("row1"); 048 private byte[] fam1 = Bytes.toBytes("fam1"); 049 private byte[] col1 = Bytes.toBytes("col1"); 050 private byte[] data = Bytes.toBytes("data"); 051 052 private byte[] row2 = Bytes.toBytes("row2"); 053 private byte[] fam2 = Bytes.toBytes("fam2"); 054 private byte[] col2 = Bytes.toBytes("col2"); 055 056 private byte[] col3 = Bytes.toBytes("col3"); 057 private byte[] col4 = Bytes.toBytes("col4"); 058 private byte[] col5 = Bytes.toBytes("col5"); 059 060 // Variable name encoding. kv<row#><fam#><col#> 061 Cell kv111 = new KeyValue(row1, fam1, col1, data); 062 Cell kv112 = new KeyValue(row1, fam1, col2, data); 063 Cell kv113 = new KeyValue(row1, fam1, col3, data); 064 Cell kv114 = new KeyValue(row1, fam1, col4, data); 065 Cell kv115 = new KeyValue(row1, fam1, col5, data); 066 Cell kv121 = new KeyValue(row1, fam2, col1, data); 067 Cell kv122 = new KeyValue(row1, fam2, col2, data); 068 Cell kv211 = new KeyValue(row2, fam1, col1, data); 069 Cell kv212 = new KeyValue(row2, fam1, col2, data); 070 Cell kv213 = new KeyValue(row2, fam1, col3, data); 071 072 TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212)); 073 TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112)); 074 TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); 075 076 List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3)); 077 078 /* 079 * Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned Cells 080 * are same as {@code expected}. 081 * @return List of Cells returned from scanners. 082 */ 083 public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> scanners) 084 throws IOException { 085 // Creating KeyValueHeap 086 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 087 List<Cell> actual = new ArrayList<>(); 088 while (kvh.peek() != null) { 089 actual.add(kvh.next()); 090 } 091 092 assertEquals(expected, actual); 093 return actual; 094 } 095 } 096 097 @Test 098 public void testSorted() throws IOException { 099 // Cases that need to be checked are: 100 // 1. The "smallest" Cell is in the same scanners as current 101 // 2. Current scanner gets empty 102 103 List<Cell> expected = 104 Arrays.asList(kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213); 105 106 List<Cell> actual = assertCells(expected, scanners); 107 108 // Check if result is sorted according to Comparator 109 for (int i = 0; i < actual.size() - 1; i++) { 110 int ret = CellComparatorImpl.COMPARATOR.compare(actual.get(i), actual.get(i + 1)); 111 assertTrue(ret < 0); 112 } 113 } 114 115 @Test 116 public void testSeek() throws IOException { 117 // Cases: 118 // 1. Seek Cell that is not in scanner 119 // 2. Check that smallest that is returned from a seek is correct 120 List<Cell> expected = Arrays.asList(kv211); 121 122 // Creating KeyValueHeap 123 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 124 Cell seekKv = new KeyValue(row2, fam1, null, null); 125 kvh.seek(seekKv); 126 127 List<Cell> actual = Arrays.asList(kvh.peek()); 128 129 assertEquals("Expected = " + Arrays.toString(expected.toArray()) + "\n Actual = " 130 + Arrays.toString(actual.toArray()), expected, actual); 131 } 132 } 133 134 @Test 135 public void testScannerLeak() throws IOException { 136 // Test for unclosed scanners (HBASE-1927) 137 138 TestScanner s4 = new TestScanner(new ArrayList<>()); 139 scanners.add(s4); 140 141 // Creating KeyValueHeap 142 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 143 for (;;) { 144 if (kvh.next() == null) { 145 break; 146 } 147 } 148 // Once the internal scanners go out of Cells, those will be removed from KVHeap's priority 149 // queue and added to a Set for lazy close. The actual close will happen only on 150 // KVHeap#close() 151 assertEquals(4, kvh.scannersForDelayedClose.size()); 152 assertTrue(kvh.scannersForDelayedClose.contains(s1)); 153 assertTrue(kvh.scannersForDelayedClose.contains(s2)); 154 assertTrue(kvh.scannersForDelayedClose.contains(s3)); 155 assertTrue(kvh.scannersForDelayedClose.contains(s4)); 156 } 157 158 for (KeyValueScanner scanner : scanners) { 159 assertTrue(((TestScanner) scanner).isClosed()); 160 } 161 } 162 163 @Test 164 public void testScannerException() throws IOException { 165 // Test for NPE issue when exception happens in scanners (HBASE-13835) 166 167 TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212)); 168 TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112)); 169 TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); 170 TestScanner s4 = new SeekTestScanner(new ArrayList<>()); 171 172 List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3, s4)); 173 174 // Creating KeyValueHeap 175 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 176 for (KeyValueScanner scanner : scanners) { 177 ((SeekTestScanner) scanner).setRealSeekDone(false); 178 } 179 // The pollRealKV should throw IOE. 180 assertThrows(IOException.class, () -> { 181 for (;;) { 182 if (kvh.next() == null) { 183 break; 184 } 185 } 186 }); 187 } 188 // It implies there is no NPE thrown from kvh.close() if getting here 189 for (KeyValueScanner scanner : scanners) { 190 // Verify that close is called and only called once for each scanner 191 assertTrue(((SeekTestScanner) scanner).isClosed()); 192 assertEquals(1, ((SeekTestScanner) scanner).getClosedNum()); 193 } 194 } 195 196 @Test 197 public void testPriorityId() throws IOException { 198 Cell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa")); 199 Cell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb")); 200 TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1); 201 TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2); 202 List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A); 203 assertCells(expected, Arrays.asList(scan1, scan2)); 204 205 scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2); 206 scan2 = new TestScanner(Arrays.asList(kv113B), 1); 207 expected = Arrays.asList(kv111, kv112, kv113A, kv113B); 208 assertCells(expected, Arrays.asList(scan1, scan2)); 209 } 210 211 private static class TestScanner extends CollectionBackedScanner { 212 private boolean closed = false; 213 private long scannerOrder = 0; 214 215 public TestScanner(List<Cell> list) { 216 super(list); 217 } 218 219 public TestScanner(List<Cell> list, long scannerOrder) { 220 this(list); 221 this.scannerOrder = scannerOrder; 222 } 223 224 @Override 225 public long getScannerOrder() { 226 return scannerOrder; 227 } 228 229 @Override 230 public void close() { 231 closed = true; 232 } 233 234 public boolean isClosed() { 235 return closed; 236 } 237 } 238 239 private static class SeekTestScanner extends TestScanner { 240 private int closedNum = 0; 241 private boolean realSeekDone = true; 242 243 public SeekTestScanner(List<Cell> list) { 244 super(list); 245 } 246 247 @Override 248 public void close() { 249 super.close(); 250 closedNum++; 251 } 252 253 public int getClosedNum() { 254 return closedNum; 255 } 256 257 @Override 258 public boolean realSeekDone() { 259 return realSeekDone; 260 } 261 262 public void setRealSeekDone(boolean done) { 263 realSeekDone = done; 264 } 265 266 @Override 267 public void enforceSeek() throws IOException { 268 throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner"); 269 } 270 } 271}