001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertThrows;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.List;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparatorImpl;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.testclassification.RegionServerTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.CollectionBackedScanner;
036import org.junit.ClassRule;
037import org.junit.Test;
038import org.junit.experimental.categories.Category;
039
040@Category({ RegionServerTests.class, SmallTests.class })
041public class TestKeyValueHeap {
042
043  @ClassRule
044  public static final HBaseClassTestRule CLASS_RULE =
045    HBaseClassTestRule.forClass(TestKeyValueHeap.class);
046
047  private byte[] row1 = Bytes.toBytes("row1");
048  private byte[] fam1 = Bytes.toBytes("fam1");
049  private byte[] col1 = Bytes.toBytes("col1");
050  private byte[] data = Bytes.toBytes("data");
051
052  private byte[] row2 = Bytes.toBytes("row2");
053  private byte[] fam2 = Bytes.toBytes("fam2");
054  private byte[] col2 = Bytes.toBytes("col2");
055
056  private byte[] col3 = Bytes.toBytes("col3");
057  private byte[] col4 = Bytes.toBytes("col4");
058  private byte[] col5 = Bytes.toBytes("col5");
059
060  // Variable name encoding. kv<row#><fam#><col#>
061  Cell kv111 = new KeyValue(row1, fam1, col1, data);
062  Cell kv112 = new KeyValue(row1, fam1, col2, data);
063  Cell kv113 = new KeyValue(row1, fam1, col3, data);
064  Cell kv114 = new KeyValue(row1, fam1, col4, data);
065  Cell kv115 = new KeyValue(row1, fam1, col5, data);
066  Cell kv121 = new KeyValue(row1, fam2, col1, data);
067  Cell kv122 = new KeyValue(row1, fam2, col2, data);
068  Cell kv211 = new KeyValue(row2, fam1, col1, data);
069  Cell kv212 = new KeyValue(row2, fam1, col2, data);
070  Cell kv213 = new KeyValue(row2, fam1, col3, data);
071
072  TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212));
073  TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112));
074  TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
075
076  List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3));
077
078  /*
079   * Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned Cells
080   * are same as {@code expected}.
081   * @return List of Cells returned from scanners.
082   */
083  public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> scanners)
084    throws IOException {
085    // Creating KeyValueHeap
086    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
087      List<Cell> actual = new ArrayList<>();
088      while (kvh.peek() != null) {
089        actual.add(kvh.next());
090      }
091
092      assertEquals(expected, actual);
093      return actual;
094    }
095  }
096
097  @Test
098  public void testSorted() throws IOException {
099    // Cases that need to be checked are:
100    // 1. The "smallest" Cell is in the same scanners as current
101    // 2. Current scanner gets empty
102
103    List<Cell> expected =
104      Arrays.asList(kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213);
105
106    List<Cell> actual = assertCells(expected, scanners);
107
108    // Check if result is sorted according to Comparator
109    for (int i = 0; i < actual.size() - 1; i++) {
110      int ret = CellComparatorImpl.COMPARATOR.compare(actual.get(i), actual.get(i + 1));
111      assertTrue(ret < 0);
112    }
113  }
114
115  @Test
116  public void testSeek() throws IOException {
117    // Cases:
118    // 1. Seek Cell that is not in scanner
119    // 2. Check that smallest that is returned from a seek is correct
120    List<Cell> expected = Arrays.asList(kv211);
121
122    // Creating KeyValueHeap
123    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
124      Cell seekKv = new KeyValue(row2, fam1, null, null);
125      kvh.seek(seekKv);
126
127      List<Cell> actual = Arrays.asList(kvh.peek());
128
129      assertEquals("Expected = " + Arrays.toString(expected.toArray()) + "\n Actual = "
130        + Arrays.toString(actual.toArray()), expected, actual);
131    }
132  }
133
134  @Test
135  public void testScannerLeak() throws IOException {
136    // Test for unclosed scanners (HBASE-1927)
137
138    TestScanner s4 = new TestScanner(new ArrayList<>());
139    scanners.add(s4);
140
141    // Creating KeyValueHeap
142    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
143      for (;;) {
144        if (kvh.next() == null) {
145          break;
146        }
147      }
148      // Once the internal scanners go out of Cells, those will be removed from KVHeap's priority
149      // queue and added to a Set for lazy close. The actual close will happen only on
150      // KVHeap#close()
151      assertEquals(4, kvh.scannersForDelayedClose.size());
152      assertTrue(kvh.scannersForDelayedClose.contains(s1));
153      assertTrue(kvh.scannersForDelayedClose.contains(s2));
154      assertTrue(kvh.scannersForDelayedClose.contains(s3));
155      assertTrue(kvh.scannersForDelayedClose.contains(s4));
156    }
157
158    for (KeyValueScanner scanner : scanners) {
159      assertTrue(((TestScanner) scanner).isClosed());
160    }
161  }
162
163  @Test
164  public void testScannerException() throws IOException {
165    // Test for NPE issue when exception happens in scanners (HBASE-13835)
166
167    TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212));
168    TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112));
169    TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
170    TestScanner s4 = new SeekTestScanner(new ArrayList<>());
171
172    List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3, s4));
173
174    // Creating KeyValueHeap
175    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
176      for (KeyValueScanner scanner : scanners) {
177        ((SeekTestScanner) scanner).setRealSeekDone(false);
178      }
179      // The pollRealKV should throw IOE.
180      assertThrows(IOException.class, () -> {
181        for (;;) {
182          if (kvh.next() == null) {
183            break;
184          }
185        }
186      });
187    }
188    // It implies there is no NPE thrown from kvh.close() if getting here
189    for (KeyValueScanner scanner : scanners) {
190      // Verify that close is called and only called once for each scanner
191      assertTrue(((SeekTestScanner) scanner).isClosed());
192      assertEquals(1, ((SeekTestScanner) scanner).getClosedNum());
193    }
194  }
195
196  @Test
197  public void testPriorityId() throws IOException {
198    Cell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa"));
199    Cell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb"));
200    TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1);
201    TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2);
202    List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A);
203    assertCells(expected, Arrays.asList(scan1, scan2));
204
205    scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2);
206    scan2 = new TestScanner(Arrays.asList(kv113B), 1);
207    expected = Arrays.asList(kv111, kv112, kv113A, kv113B);
208    assertCells(expected, Arrays.asList(scan1, scan2));
209  }
210
211  private static class TestScanner extends CollectionBackedScanner {
212    private boolean closed = false;
213    private long scannerOrder = 0;
214
215    public TestScanner(List<Cell> list) {
216      super(list);
217    }
218
219    public TestScanner(List<Cell> list, long scannerOrder) {
220      this(list);
221      this.scannerOrder = scannerOrder;
222    }
223
224    @Override
225    public long getScannerOrder() {
226      return scannerOrder;
227    }
228
229    @Override
230    public void close() {
231      closed = true;
232    }
233
234    public boolean isClosed() {
235      return closed;
236    }
237  }
238
239  private static class SeekTestScanner extends TestScanner {
240    private int closedNum = 0;
241    private boolean realSeekDone = true;
242
243    public SeekTestScanner(List<Cell> list) {
244      super(list);
245    }
246
247    @Override
248    public void close() {
249      super.close();
250      closedNum++;
251    }
252
253    public int getClosedNum() {
254      return closedNum;
255    }
256
257    @Override
258    public boolean realSeekDone() {
259      return realSeekDone;
260    }
261
262    public void setRealSeekDone(boolean done) {
263      realSeekDone = done;
264    }
265
266    @Override
267    public void enforceSeek() throws IOException {
268      throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner");
269    }
270  }
271}