001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.List;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellComparatorImpl;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseTestCase;
028import org.apache.hadoop.hbase.KeyValue;
029import org.apache.hadoop.hbase.testclassification.RegionServerTests;
030import org.apache.hadoop.hbase.testclassification.SmallTests;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.hbase.util.CollectionBackedScanner;
033import org.junit.Before;
034import org.junit.ClassRule;
035import org.junit.Test;
036import org.junit.experimental.categories.Category;
037
038@Category({RegionServerTests.class, SmallTests.class})
039public class TestKeyValueHeap extends HBaseTestCase {
040
041  @ClassRule
042  public static final HBaseClassTestRule CLASS_RULE =
043      HBaseClassTestRule.forClass(TestKeyValueHeap.class);
044
045  private byte[] row1 = Bytes.toBytes("row1");
046  private byte[] fam1 = Bytes.toBytes("fam1");
047  private byte[] col1 = Bytes.toBytes("col1");
048  private byte[] data = Bytes.toBytes("data");
049
050  private byte[] row2 = Bytes.toBytes("row2");
051  private byte[] fam2 = Bytes.toBytes("fam2");
052  private byte[] col2 = Bytes.toBytes("col2");
053
054  private byte[] col3 = Bytes.toBytes("col3");
055  private byte[] col4 = Bytes.toBytes("col4");
056  private byte[] col5 = Bytes.toBytes("col5");
057
058  // Variable name encoding. kv<row#><fam#><col#>
059  Cell kv111 = new KeyValue(row1, fam1, col1, data);
060  Cell kv112 = new KeyValue(row1, fam1, col2, data);
061  Cell kv113 = new KeyValue(row1, fam1, col3, data);
062  Cell kv114 = new KeyValue(row1, fam1, col4, data);
063  Cell kv115 = new KeyValue(row1, fam1, col5, data);
064  Cell kv121 = new KeyValue(row1, fam2, col1, data);
065  Cell kv122 = new KeyValue(row1, fam2, col2, data);
066  Cell kv211 = new KeyValue(row2, fam1, col1, data);
067  Cell kv212 = new KeyValue(row2, fam1, col2, data);
068  Cell kv213 = new KeyValue(row2, fam1, col3, data);
069
070  TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212));
071  TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112));
072  TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
073
074  List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3));
075
076  /*
077   * Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned
078   * Cells are same as {@code expected}.
079   * @return List of Cells returned from scanners.
080   */
081  public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> scanners)
082      throws IOException {
083    //Creating KeyValueHeap
084    KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
085
086    List<Cell> actual = new ArrayList<>();
087    while(kvh.peek() != null){
088      actual.add(kvh.next());
089    }
090
091    assertEquals(expected, actual);
092    return actual;
093  }
094
095  @Override
096  @Before
097  public void setUp() throws Exception {
098    super.setUp();
099  }
100
101  @Test
102  public void testSorted() throws IOException{
103    //Cases that need to be checked are:
104    //1. The "smallest" Cell is in the same scanners as current
105    //2. Current scanner gets empty
106
107    List<Cell> expected = Arrays.asList(
108        kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213);
109
110    List<Cell> actual = assertCells(expected, scanners);
111
112    //Check if result is sorted according to Comparator
113    for(int i=0; i<actual.size()-1; i++){
114      int ret = CellComparatorImpl.COMPARATOR.compare(actual.get(i), actual.get(i+1));
115      assertTrue(ret < 0);
116    }
117  }
118
119  @Test
120  public void testSeek() throws IOException {
121    //Cases:
122    //1. Seek Cell that is not in scanner
123    //2. Check that smallest that is returned from a seek is correct
124
125    List<Cell> expected = Arrays.asList(kv211);
126
127    //Creating KeyValueHeap
128    KeyValueHeap kvh =
129      new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
130
131    Cell seekKv = new KeyValue(row2, fam1, null, null);
132    kvh.seek(seekKv);
133
134    List<Cell> actual = Arrays.asList(kvh.peek());
135
136    assertEquals("Expected = " + Arrays.toString(expected.toArray())
137        + "\n Actual = " + Arrays.toString(actual.toArray()), expected, actual);
138  }
139
140  @Test
141  public void testScannerLeak() throws IOException {
142    // Test for unclosed scanners (HBASE-1927)
143
144    TestScanner s4 = new TestScanner(new ArrayList<>());
145    scanners.add(s4);
146
147    //Creating KeyValueHeap
148    KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
149
150    while(kvh.next() != null);
151    // Once the internal scanners go out of Cells, those will be removed from KVHeap's priority
152    // queue and added to a Set for lazy close. The actual close will happen only on KVHeap#close()
153    assertEquals(4, kvh.scannersForDelayedClose.size());
154    assertTrue(kvh.scannersForDelayedClose.contains(s1));
155    assertTrue(kvh.scannersForDelayedClose.contains(s2));
156    assertTrue(kvh.scannersForDelayedClose.contains(s3));
157    assertTrue(kvh.scannersForDelayedClose.contains(s4));
158    kvh.close();
159    for(KeyValueScanner scanner : scanners) {
160      assertTrue(((TestScanner)scanner).isClosed());
161    }
162  }
163
164  @Test
165  public void testScannerException() throws IOException {
166    // Test for NPE issue when exception happens in scanners (HBASE-13835)
167
168    TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212));
169    TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112));
170    TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
171    TestScanner s4 = new SeekTestScanner(new ArrayList<>());
172
173    List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3, s4));
174
175    // Creating KeyValueHeap
176    KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
177
178    try {
179      for (KeyValueScanner scanner : scanners) {
180        ((SeekTestScanner) scanner).setRealSeekDone(false);
181      }
182      while (kvh.next() != null);
183      // The pollRealKV should throw IOE.
184      assertTrue(false);
185    } catch (IOException ioe) {
186      kvh.close();
187    }
188
189    // It implies there is no NPE thrown from kvh.close() if getting here
190    for (KeyValueScanner scanner : scanners) {
191      // Verify that close is called and only called once for each scanner
192      assertTrue(((SeekTestScanner) scanner).isClosed());
193      assertEquals(1, ((SeekTestScanner) scanner).getClosedNum());
194    }
195  }
196
197  @Test
198  public void testPriorityId() throws IOException {
199    Cell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa"));
200    Cell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb"));
201    {
202      TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1);
203      TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2);
204      List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A);
205      assertCells(expected, new ArrayList<>(Arrays.asList(scan1, scan2)));
206    }
207    {
208      TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2);
209      TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 1);
210      List<Cell> expected = Arrays.asList(kv111, kv112, kv113A, kv113B);
211      assertCells(expected, new ArrayList<>(Arrays.asList(scan1, scan2)));
212    }
213  }
214
215  private static class TestScanner extends CollectionBackedScanner {
216    private boolean closed = false;
217    private long scannerOrder = 0;
218
219    public TestScanner(List<Cell> list) {
220      super(list);
221    }
222
223    public TestScanner(List<Cell> list, long scannerOrder) {
224      this(list);
225      this.scannerOrder = scannerOrder;
226    }
227
228    @Override
229    public long getScannerOrder() {
230      return scannerOrder;
231    }
232
233    @Override
234    public void close(){
235      closed = true;
236    }
237
238    public boolean isClosed() {
239      return closed;
240    }
241  }
242
243  private static class SeekTestScanner extends TestScanner {
244    private int closedNum = 0;
245    private boolean realSeekDone = true;
246
247    public SeekTestScanner(List<Cell> list) {
248      super(list);
249    }
250
251    @Override
252    public void close() {
253      super.close();
254      closedNum++;
255    }
256
257    public int getClosedNum() {
258      return closedNum;
259    }
260
261    @Override
262    public boolean realSeekDone() {
263      return realSeekDone;
264    }
265
266    public void setRealSeekDone(boolean done) {
267      realSeekDone = done;
268    }
269
270    @Override
271    public void enforceSeek() throws IOException {
272      throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner");
273    }
274  }
275}