001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.List; 024import org.apache.hadoop.hbase.Cell; 025import org.apache.hadoop.hbase.CellComparatorImpl; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestCase; 028import org.apache.hadoop.hbase.KeyValue; 029import org.apache.hadoop.hbase.testclassification.RegionServerTests; 030import org.apache.hadoop.hbase.testclassification.SmallTests; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.CollectionBackedScanner; 033import org.junit.Before; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037 038@Category({RegionServerTests.class, SmallTests.class}) 039public class TestKeyValueHeap extends HBaseTestCase { 040 041 @ClassRule 042 public static final HBaseClassTestRule CLASS_RULE = 043 HBaseClassTestRule.forClass(TestKeyValueHeap.class); 044 045 private byte[] row1 = Bytes.toBytes("row1"); 046 private byte[] fam1 = Bytes.toBytes("fam1"); 047 private byte[] col1 = Bytes.toBytes("col1"); 048 private byte[] data = Bytes.toBytes("data"); 049 050 private byte[] row2 = Bytes.toBytes("row2"); 051 private byte[] fam2 = Bytes.toBytes("fam2"); 052 private byte[] col2 = Bytes.toBytes("col2"); 053 054 private byte[] col3 = Bytes.toBytes("col3"); 055 private byte[] col4 = Bytes.toBytes("col4"); 056 private byte[] col5 = Bytes.toBytes("col5"); 057 058 // Variable name encoding. kv<row#><fam#><col#> 059 Cell kv111 = new KeyValue(row1, fam1, col1, data); 060 Cell kv112 = new KeyValue(row1, fam1, col2, data); 061 Cell kv113 = new KeyValue(row1, fam1, col3, data); 062 Cell kv114 = new KeyValue(row1, fam1, col4, data); 063 Cell kv115 = new KeyValue(row1, fam1, col5, data); 064 Cell kv121 = new KeyValue(row1, fam2, col1, data); 065 Cell kv122 = new KeyValue(row1, fam2, col2, data); 066 Cell kv211 = new KeyValue(row2, fam1, col1, data); 067 Cell kv212 = new KeyValue(row2, fam1, col2, data); 068 Cell kv213 = new KeyValue(row2, fam1, col3, data); 069 070 TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212)); 071 TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112)); 072 TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); 073 074 List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3)); 075 076 /* 077 * Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned 078 * Cells are same as {@code expected}. 079 * @return List of Cells returned from scanners. 080 */ 081 public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> scanners) 082 throws IOException { 083 //Creating KeyValueHeap 084 KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR); 085 086 List<Cell> actual = new ArrayList<>(); 087 while(kvh.peek() != null){ 088 actual.add(kvh.next()); 089 } 090 091 assertEquals(expected, actual); 092 return actual; 093 } 094 095 @Override 096 @Before 097 public void setUp() throws Exception { 098 super.setUp(); 099 } 100 101 @Test 102 public void testSorted() throws IOException{ 103 //Cases that need to be checked are: 104 //1. The "smallest" Cell is in the same scanners as current 105 //2. Current scanner gets empty 106 107 List<Cell> expected = Arrays.asList( 108 kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213); 109 110 List<Cell> actual = assertCells(expected, scanners); 111 112 //Check if result is sorted according to Comparator 113 for(int i=0; i<actual.size()-1; i++){ 114 int ret = CellComparatorImpl.COMPARATOR.compare(actual.get(i), actual.get(i+1)); 115 assertTrue(ret < 0); 116 } 117 } 118 119 @Test 120 public void testSeek() throws IOException { 121 //Cases: 122 //1. Seek Cell that is not in scanner 123 //2. Check that smallest that is returned from a seek is correct 124 125 List<Cell> expected = Arrays.asList(kv211); 126 127 //Creating KeyValueHeap 128 KeyValueHeap kvh = 129 new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR); 130 131 Cell seekKv = new KeyValue(row2, fam1, null, null); 132 kvh.seek(seekKv); 133 134 List<Cell> actual = Arrays.asList(kvh.peek()); 135 136 assertEquals("Expected = " + Arrays.toString(expected.toArray()) 137 + "\n Actual = " + Arrays.toString(actual.toArray()), expected, actual); 138 } 139 140 @Test 141 public void testScannerLeak() throws IOException { 142 // Test for unclosed scanners (HBASE-1927) 143 144 TestScanner s4 = new TestScanner(new ArrayList<>()); 145 scanners.add(s4); 146 147 //Creating KeyValueHeap 148 KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR); 149 150 while(kvh.next() != null); 151 // Once the internal scanners go out of Cells, those will be removed from KVHeap's priority 152 // queue and added to a Set for lazy close. The actual close will happen only on KVHeap#close() 153 assertEquals(4, kvh.scannersForDelayedClose.size()); 154 assertTrue(kvh.scannersForDelayedClose.contains(s1)); 155 assertTrue(kvh.scannersForDelayedClose.contains(s2)); 156 assertTrue(kvh.scannersForDelayedClose.contains(s3)); 157 assertTrue(kvh.scannersForDelayedClose.contains(s4)); 158 kvh.close(); 159 for(KeyValueScanner scanner : scanners) { 160 assertTrue(((TestScanner)scanner).isClosed()); 161 } 162 } 163 164 @Test 165 public void testScannerException() throws IOException { 166 // Test for NPE issue when exception happens in scanners (HBASE-13835) 167 168 TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212)); 169 TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112)); 170 TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); 171 TestScanner s4 = new SeekTestScanner(new ArrayList<>()); 172 173 List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3, s4)); 174 175 // Creating KeyValueHeap 176 KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR); 177 178 try { 179 for (KeyValueScanner scanner : scanners) { 180 ((SeekTestScanner) scanner).setRealSeekDone(false); 181 } 182 while (kvh.next() != null); 183 // The pollRealKV should throw IOE. 184 assertTrue(false); 185 } catch (IOException ioe) { 186 kvh.close(); 187 } 188 189 // It implies there is no NPE thrown from kvh.close() if getting here 190 for (KeyValueScanner scanner : scanners) { 191 // Verify that close is called and only called once for each scanner 192 assertTrue(((SeekTestScanner) scanner).isClosed()); 193 assertEquals(1, ((SeekTestScanner) scanner).getClosedNum()); 194 } 195 } 196 197 @Test 198 public void testPriorityId() throws IOException { 199 Cell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa")); 200 Cell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb")); 201 { 202 TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1); 203 TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2); 204 List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A); 205 assertCells(expected, new ArrayList<>(Arrays.asList(scan1, scan2))); 206 } 207 { 208 TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2); 209 TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 1); 210 List<Cell> expected = Arrays.asList(kv111, kv112, kv113A, kv113B); 211 assertCells(expected, new ArrayList<>(Arrays.asList(scan1, scan2))); 212 } 213 } 214 215 private static class TestScanner extends CollectionBackedScanner { 216 private boolean closed = false; 217 private long scannerOrder = 0; 218 219 public TestScanner(List<Cell> list) { 220 super(list); 221 } 222 223 public TestScanner(List<Cell> list, long scannerOrder) { 224 this(list); 225 this.scannerOrder = scannerOrder; 226 } 227 228 @Override 229 public long getScannerOrder() { 230 return scannerOrder; 231 } 232 233 @Override 234 public void close(){ 235 closed = true; 236 } 237 238 public boolean isClosed() { 239 return closed; 240 } 241 } 242 243 private static class SeekTestScanner extends TestScanner { 244 private int closedNum = 0; 245 private boolean realSeekDone = true; 246 247 public SeekTestScanner(List<Cell> list) { 248 super(list); 249 } 250 251 @Override 252 public void close() { 253 super.close(); 254 closedNum++; 255 } 256 257 public int getClosedNum() { 258 return closedNum; 259 } 260 261 @Override 262 public boolean realSeekDone() { 263 return realSeekDone; 264 } 265 266 public void setRealSeekDone(boolean done) { 267 realSeekDone = done; 268 } 269 270 @Override 271 public void enforceSeek() throws IOException { 272 throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner"); 273 } 274 } 275}