001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertThrows;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.List;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparatorImpl;
030import org.apache.hadoop.hbase.ExtendedCell;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.testclassification.RegionServerTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.CollectionBackedScanner;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040
041@Category({ RegionServerTests.class, SmallTests.class })
042public class TestKeyValueHeap {
043
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046    HBaseClassTestRule.forClass(TestKeyValueHeap.class);
047
048  private byte[] row1 = Bytes.toBytes("row1");
049  private byte[] fam1 = Bytes.toBytes("fam1");
050  private byte[] col1 = Bytes.toBytes("col1");
051  private byte[] data = Bytes.toBytes("data");
052
053  private byte[] row2 = Bytes.toBytes("row2");
054  private byte[] fam2 = Bytes.toBytes("fam2");
055  private byte[] col2 = Bytes.toBytes("col2");
056
057  private byte[] col3 = Bytes.toBytes("col3");
058  private byte[] col4 = Bytes.toBytes("col4");
059  private byte[] col5 = Bytes.toBytes("col5");
060
061  // Variable name encoding. kv<row#><fam#><col#>
062  ExtendedCell kv111 = new KeyValue(row1, fam1, col1, data);
063  ExtendedCell kv112 = new KeyValue(row1, fam1, col2, data);
064  ExtendedCell kv113 = new KeyValue(row1, fam1, col3, data);
065  ExtendedCell kv114 = new KeyValue(row1, fam1, col4, data);
066  ExtendedCell kv115 = new KeyValue(row1, fam1, col5, data);
067  ExtendedCell kv121 = new KeyValue(row1, fam2, col1, data);
068  ExtendedCell kv122 = new KeyValue(row1, fam2, col2, data);
069  ExtendedCell kv211 = new KeyValue(row2, fam1, col1, data);
070  ExtendedCell kv212 = new KeyValue(row2, fam1, col2, data);
071  ExtendedCell kv213 = new KeyValue(row2, fam1, col3, data);
072
073  TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212));
074  TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112));
075  TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
076
077  List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3));
078
079  /*
080   * Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned Cells
081   * are same as {@code expected}.
082   * @return List of Cells returned from scanners.
083   */
084  public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> scanners)
085    throws IOException {
086    // Creating KeyValueHeap
087    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
088      List<Cell> actual = new ArrayList<>();
089      while (kvh.peek() != null) {
090        actual.add(kvh.next());
091      }
092
093      assertEquals(expected, actual);
094      return actual;
095    }
096  }
097
098  @Test
099  public void testSorted() throws IOException {
100    // Cases that need to be checked are:
101    // 1. The "smallest" Cell is in the same scanners as current
102    // 2. Current scanner gets empty
103
104    List<Cell> expected =
105      Arrays.asList(kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213);
106
107    List<Cell> actual = assertCells(expected, scanners);
108
109    // Check if result is sorted according to Comparator
110    for (int i = 0; i < actual.size() - 1; i++) {
111      int ret = CellComparatorImpl.COMPARATOR.compare(actual.get(i), actual.get(i + 1));
112      assertTrue(ret < 0);
113    }
114  }
115
116  @Test
117  public void testSeek() throws IOException {
118    // Cases:
119    // 1. Seek Cell that is not in scanner
120    // 2. Check that smallest that is returned from a seek is correct
121    List<Cell> expected = Arrays.asList(kv211);
122
123    // Creating KeyValueHeap
124    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
125      ExtendedCell seekKv = new KeyValue(row2, fam1, null, null);
126      kvh.seek(seekKv);
127
128      List<Cell> actual = Arrays.asList(kvh.peek());
129
130      assertEquals("Expected = " + Arrays.toString(expected.toArray()) + "\n Actual = "
131        + Arrays.toString(actual.toArray()), expected, actual);
132    }
133  }
134
135  @Test
136  public void testScannerLeak() throws IOException {
137    // Test for unclosed scanners (HBASE-1927)
138
139    TestScanner s4 = new TestScanner(new ArrayList<>());
140    scanners.add(s4);
141
142    // Creating KeyValueHeap
143    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
144      for (;;) {
145        if (kvh.next() == null) {
146          break;
147        }
148      }
149      // Once the internal scanners go out of Cells, those will be removed from KVHeap's priority
150      // queue and added to a Set for lazy close. The actual close will happen only on
151      // KVHeap#close()
152      assertEquals(4, kvh.scannersForDelayedClose.size());
153      assertTrue(kvh.scannersForDelayedClose.contains(s1));
154      assertTrue(kvh.scannersForDelayedClose.contains(s2));
155      assertTrue(kvh.scannersForDelayedClose.contains(s3));
156      assertTrue(kvh.scannersForDelayedClose.contains(s4));
157    }
158
159    for (KeyValueScanner scanner : scanners) {
160      assertTrue(((TestScanner) scanner).isClosed());
161    }
162  }
163
164  @Test
165  public void testScannerException() throws IOException {
166    // Test for NPE issue when exception happens in scanners (HBASE-13835)
167
168    TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212));
169    TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112));
170    TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
171    TestScanner s4 = new SeekTestScanner(new ArrayList<>());
172
173    List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3, s4));
174
175    // Creating KeyValueHeap
176    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
177      for (KeyValueScanner scanner : scanners) {
178        ((SeekTestScanner) scanner).setRealSeekDone(false);
179      }
180      // The pollRealKV should throw IOE.
181      assertThrows(IOException.class, () -> {
182        for (;;) {
183          if (kvh.next() == null) {
184            break;
185          }
186        }
187      });
188    }
189    // It implies there is no NPE thrown from kvh.close() if getting here
190    for (KeyValueScanner scanner : scanners) {
191      // Verify that close is called and only called once for each scanner
192      assertTrue(((SeekTestScanner) scanner).isClosed());
193      assertEquals(1, ((SeekTestScanner) scanner).getClosedNum());
194    }
195  }
196
197  @Test
198  public void testPriorityId() throws IOException {
199    ExtendedCell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa"));
200    ExtendedCell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb"));
201    TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1);
202    TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2);
203    List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A);
204    assertCells(expected, Arrays.asList(scan1, scan2));
205
206    scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2);
207    scan2 = new TestScanner(Arrays.asList(kv113B), 1);
208    expected = Arrays.asList(kv111, kv112, kv113A, kv113B);
209    assertCells(expected, Arrays.asList(scan1, scan2));
210  }
211
212  private static class TestScanner extends CollectionBackedScanner {
213    private boolean closed = false;
214    private long scannerOrder = 0;
215
216    public TestScanner(List<ExtendedCell> list) {
217      super(list);
218    }
219
220    public TestScanner(List<ExtendedCell> list, long scannerOrder) {
221      this(list);
222      this.scannerOrder = scannerOrder;
223    }
224
225    @Override
226    public long getScannerOrder() {
227      return scannerOrder;
228    }
229
230    @Override
231    public void close() {
232      closed = true;
233    }
234
235    public boolean isClosed() {
236      return closed;
237    }
238  }
239
240  private static class SeekTestScanner extends TestScanner {
241    private int closedNum = 0;
242    private boolean realSeekDone = true;
243
244    public SeekTestScanner(List<ExtendedCell> list) {
245      super(list);
246    }
247
248    @Override
249    public void close() {
250      super.close();
251      closedNum++;
252    }
253
254    public int getClosedNum() {
255      return closedNum;
256    }
257
258    @Override
259    public boolean realSeekDone() {
260      return realSeekDone;
261    }
262
263    public void setRealSeekDone(boolean done) {
264      realSeekDone = done;
265    }
266
267    @Override
268    public void enforceSeek() throws IOException {
269      throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner");
270    }
271  }
272}