001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ByteBufferKeyValue;
029import org.apache.hadoop.hbase.ExtendedCell;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
036import org.apache.hadoop.hbase.client.Durability;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RegionInfoBuilder;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.HStore;
044import org.apache.hadoop.hbase.regionserver.InternalScanner;
045import org.apache.hadoop.hbase.testclassification.RegionServerTests;
046import org.apache.hadoop.hbase.testclassification.SmallTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
049import org.junit.jupiter.api.AfterEach;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.Test;
052import org.junit.jupiter.api.TestInfo;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056@Tag(RegionServerTests.TAG)
057@Tag(SmallTests.TAG)
058public class TestScannerFromBucketCache {
059
060  private static final Logger LOG = LoggerFactory.getLogger(TestScannerFromBucketCache.class);
061
062  HRegion region = null;
063  private HBaseTestingUtil test_util;
064  private Configuration conf;
065  private final int MAX_VERSIONS = 2;
066  byte[] val = new byte[512 * 1024];
067
068  // Test names
069  private TableName tableName;
070
071  private void setUp(boolean useBucketCache, TestInfo testInfo) throws IOException {
072    test_util = new HBaseTestingUtil();
073    conf = test_util.getConfiguration();
074    if (useBucketCache) {
075      conf.setInt("hbase.bucketcache.size", 400);
076      conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
077      conf.setInt("hbase.bucketcache.writer.threads", 10);
078      conf.setFloat("hfile.block.cache.size", 0.2f);
079      conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
080    }
081    tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
082  }
083
084  @AfterEach
085  public void tearDown() throws Exception {
086    EnvironmentEdgeManagerTestHelper.reset();
087    LOG.info("Cleaning test directory: " + test_util.getDataTestDir());
088    test_util.cleanupTestDir();
089  }
090
091  String getName(TestInfo testInfo) {
092    return testInfo.getTestMethod().get().getName();
093  }
094
095  @Test
096  public void testBasicScanWithLRUCache(TestInfo testInfo) throws IOException {
097    setUp(false, testInfo);
098    byte[] row1 = Bytes.toBytes("row1");
099    byte[] qf1 = Bytes.toBytes("qualifier1");
100    byte[] qf2 = Bytes.toBytes("qualifier2");
101    byte[] fam1 = Bytes.toBytes("lrucache");
102
103    long ts1 = 1;
104    long ts2 = ts1 + 1;
105    long ts3 = ts1 + 2;
106
107    // Setting up region
108    String method = this.getName(testInfo);
109    this.region = initHRegion(tableName, method, conf, test_util, fam1);
110    try {
111      List<ExtendedCell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, false);
112
113      List<ExtendedCell> actual = performScan(row1, fam1);
114      // Verify result
115      for (int i = 0; i < expected.size(); i++) {
116        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
117        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
118      }
119      // do the scan again and verify. This time it should be from the lru cache
120      actual = performScan(row1, fam1);
121      // Verify result
122      for (int i = 0; i < expected.size(); i++) {
123        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
124        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
125      }
126
127    } finally {
128      HBaseTestingUtil.closeRegionAndWAL(this.region);
129      this.region = null;
130    }
131  }
132
133  @Test
134  public void testBasicScanWithOffheapBucketCache(TestInfo testInfo) throws IOException {
135    setUp(true, testInfo);
136    byte[] row1 = Bytes.toBytes("row1offheap");
137    byte[] qf1 = Bytes.toBytes("qualifier1");
138    byte[] qf2 = Bytes.toBytes("qualifier2");
139    byte[] fam1 = Bytes.toBytes("famoffheap");
140
141    long ts1 = 1;
142    long ts2 = ts1 + 1;
143    long ts3 = ts1 + 2;
144
145    // Setting up region
146    String method = this.getName(testInfo);
147    this.region = initHRegion(tableName, method, conf, test_util, fam1);
148    try {
149      List<ExtendedCell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, false);
150
151      List<ExtendedCell> actual = performScan(row1, fam1);
152      // Verify result
153      for (int i = 0; i < expected.size(); i++) {
154        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
155        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
156      }
157      // Wait for the bucket cache threads to move the data to offheap
158      Thread.sleep(500);
159      // do the scan again and verify. This time it should be from the bucket cache in offheap mode
160      actual = performScan(row1, fam1);
161      // Verify result
162      for (int i = 0; i < expected.size(); i++) {
163        assertTrue(actual.get(i) instanceof ByteBufferKeyValue);
164        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
165      }
166
167    } catch (InterruptedException e) {
168    } finally {
169      HBaseTestingUtil.closeRegionAndWAL(this.region);
170      this.region = null;
171    }
172  }
173
174  @Test
175  public void testBasicScanWithOffheapBucketCacheWithMBB(TestInfo testInfo) throws IOException {
176    setUp(true, testInfo);
177    byte[] row1 = Bytes.toBytes("row1offheap");
178    byte[] qf1 = Bytes.toBytes("qualifier1");
179    byte[] qf2 = Bytes.toBytes("qualifier2");
180    byte[] fam1 = Bytes.toBytes("famoffheap");
181
182    long ts1 = 1;
183    long ts2 = ts1 + 1;
184    long ts3 = ts1 + 2;
185
186    // Setting up region
187    String method = this.getName(testInfo);
188    this.region = initHRegion(tableName, method, conf, test_util, fam1);
189    try {
190      List<ExtendedCell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, true);
191
192      List<ExtendedCell> actual = performScan(row1, fam1);
193      // Verify result
194      for (int i = 0; i < expected.size(); i++) {
195        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
196        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
197      }
198      // Wait for the bucket cache threads to move the data to offheap
199      Thread.sleep(500);
200      // do the scan again and verify. This time it should be from the bucket cache in offheap mode
201      // but one of the cell will be copied due to the asSubByteBuff call
202      Scan scan = new Scan().withStartRow(row1).addFamily(fam1).readVersions(10);
203      actual = new ArrayList<>();
204      InternalScanner scanner = region.getScanner(scan);
205
206      boolean hasNext = scanner.next(actual);
207      assertEquals(false, hasNext);
208      // Verify result
209      for (int i = 0; i < expected.size(); i++) {
210        if (i != 5) {
211          // the last cell fetched will be of type shareable but not offheap because
212          // the MBB is copied to form a single cell
213          assertTrue(actual.get(i) instanceof ByteBufferKeyValue);
214        }
215      }
216
217    } catch (InterruptedException e) {
218    } finally {
219      HBaseTestingUtil.closeRegionAndWAL(this.region);
220      this.region = null;
221    }
222  }
223
224  private List<ExtendedCell> insertData(byte[] row1, byte[] qf1, byte[] qf2, byte[] fam1, long ts1,
225    long ts2, long ts3, boolean withVal) throws IOException {
226    // Putting data in Region
227    Put put = null;
228    KeyValue kv13 = null;
229    KeyValue kv12 = null;
230    KeyValue kv11 = null;
231
232    KeyValue kv23 = null;
233    KeyValue kv22 = null;
234    KeyValue kv21 = null;
235    if (!withVal) {
236      kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
237      kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
238      kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
239
240      kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
241      kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
242      kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
243    } else {
244      kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, val);
245      kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, val);
246      kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, val);
247
248      kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, val);
249      kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, val);
250      kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, val);
251    }
252
253    put = new Put(row1);
254    put.add(kv13);
255    put.add(kv12);
256    put.add(kv11);
257    put.add(kv23);
258    put.add(kv22);
259    put.add(kv21);
260    region.put(put);
261    region.flush(true);
262    HStore store = region.getStore(fam1);
263    while (store.getStorefilesCount() <= 0) {
264      try {
265        Thread.sleep(20);
266      } catch (InterruptedException e) {
267      }
268    }
269
270    // Expected
271    List<ExtendedCell> expected = new ArrayList<>();
272    expected.add(kv13);
273    expected.add(kv12);
274    expected.add(kv23);
275    expected.add(kv22);
276    return expected;
277  }
278
279  private List<ExtendedCell> performScan(byte[] row1, byte[] fam1) throws IOException {
280    Scan scan = new Scan().withStartRow(row1).addFamily(fam1).readVersions(MAX_VERSIONS);
281    List<ExtendedCell> actual = new ArrayList<>();
282    InternalScanner scanner = region.getScanner(scan);
283
284    boolean hasNext = scanner.next(actual);
285    assertEquals(false, hasNext);
286    return actual;
287  }
288
289  private static HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
290    HBaseTestingUtil test_util, byte[]... families) throws IOException {
291    return initHRegion(tableName, null, null, callingMethod, conf, test_util, false, families);
292  }
293
294  private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
295    String callingMethod, Configuration conf, HBaseTestingUtil testUtil, boolean isReadOnly,
296    byte[]... families) throws IOException {
297    RegionInfo regionInfo =
298      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
299    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
300    builder.setReadOnly(isReadOnly).setDurability(Durability.SYNC_WAL);
301    for (byte[] family : families) {
302      builder.setColumnFamily(
303        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(Integer.MAX_VALUE).build());
304    }
305    return HBaseTestingUtil.createRegionAndWAL(regionInfo, testUtil.getDataTestDir(callingMethod),
306      conf, builder.build(), BlockCacheFactory.createBlockCache(conf));
307  }
308}