001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ByteBufferKeyValue;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.KeyValue;
034import org.apache.hadoop.hbase.PrivateCellUtil;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Durability;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.RegionInfoBuilder;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.regionserver.HRegion;
044import org.apache.hadoop.hbase.regionserver.HStore;
045import org.apache.hadoop.hbase.regionserver.InternalScanner;
046import org.apache.hadoop.hbase.testclassification.RegionServerTests;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
050import org.junit.After;
051import org.junit.ClassRule;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.rules.TestName;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059@Category({ RegionServerTests.class, SmallTests.class })
060public class TestScannerFromBucketCache {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestScannerFromBucketCache.class);
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestScannerFromBucketCache.class);
067  @Rule
068  public TestName name = new TestName();
069
070  HRegion region = null;
071  private HBaseTestingUtil test_util;
072  private Configuration conf;
073  private final int MAX_VERSIONS = 2;
074  byte[] val = new byte[512 * 1024];
075
076  // Test names
077  private TableName tableName;
078
079  private void setUp(boolean useBucketCache) throws IOException {
080    test_util = new HBaseTestingUtil();
081    conf = test_util.getConfiguration();
082    if (useBucketCache) {
083      conf.setInt("hbase.bucketcache.size", 400);
084      conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
085      conf.setInt("hbase.bucketcache.writer.threads", 10);
086      conf.setFloat("hfile.block.cache.size", 0.2f);
087      conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
088    }
089    tableName = TableName.valueOf(name.getMethodName());
090  }
091
092  @After
093  public void tearDown() throws Exception {
094    EnvironmentEdgeManagerTestHelper.reset();
095    LOG.info("Cleaning test directory: " + test_util.getDataTestDir());
096    test_util.cleanupTestDir();
097  }
098
099  String getName() {
100    return name.getMethodName();
101  }
102
103  @Test
104  public void testBasicScanWithLRUCache() throws IOException {
105    setUp(false);
106    byte[] row1 = Bytes.toBytes("row1");
107    byte[] qf1 = Bytes.toBytes("qualifier1");
108    byte[] qf2 = Bytes.toBytes("qualifier2");
109    byte[] fam1 = Bytes.toBytes("lrucache");
110
111    long ts1 = 1;
112    long ts2 = ts1 + 1;
113    long ts3 = ts1 + 2;
114
115    // Setting up region
116    String method = this.getName();
117    this.region = initHRegion(tableName, method, conf, test_util, fam1);
118    try {
119      List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, false);
120
121      List<Cell> actual = performScan(row1, fam1);
122      // Verify result
123      for (int i = 0; i < expected.size(); i++) {
124        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
125        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
126      }
127      // do the scan again and verify. This time it should be from the lru cache
128      actual = performScan(row1, fam1);
129      // Verify result
130      for (int i = 0; i < expected.size(); i++) {
131        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
132        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
133      }
134
135    } finally {
136      HBaseTestingUtil.closeRegionAndWAL(this.region);
137      this.region = null;
138    }
139  }
140
141  @Test
142  public void testBasicScanWithOffheapBucketCache() throws IOException {
143    setUp(true);
144    byte[] row1 = Bytes.toBytes("row1offheap");
145    byte[] qf1 = Bytes.toBytes("qualifier1");
146    byte[] qf2 = Bytes.toBytes("qualifier2");
147    byte[] fam1 = Bytes.toBytes("famoffheap");
148
149    long ts1 = 1;
150    long ts2 = ts1 + 1;
151    long ts3 = ts1 + 2;
152
153    // Setting up region
154    String method = this.getName();
155    this.region = initHRegion(tableName, method, conf, test_util, fam1);
156    try {
157      List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, false);
158
159      List<Cell> actual = performScan(row1, fam1);
160      // Verify result
161      for (int i = 0; i < expected.size(); i++) {
162        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
163        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
164      }
165      // Wait for the bucket cache threads to move the data to offheap
166      Thread.sleep(500);
167      // do the scan again and verify. This time it should be from the bucket cache in offheap mode
168      actual = performScan(row1, fam1);
169      // Verify result
170      for (int i = 0; i < expected.size(); i++) {
171        assertTrue(actual.get(i) instanceof ByteBufferKeyValue);
172        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
173      }
174
175    } catch (InterruptedException e) {
176    } finally {
177      HBaseTestingUtil.closeRegionAndWAL(this.region);
178      this.region = null;
179    }
180  }
181
182  @Test
183  public void testBasicScanWithOffheapBucketCacheWithMBB() throws IOException {
184    setUp(true);
185    byte[] row1 = Bytes.toBytes("row1offheap");
186    byte[] qf1 = Bytes.toBytes("qualifier1");
187    byte[] qf2 = Bytes.toBytes("qualifier2");
188    byte[] fam1 = Bytes.toBytes("famoffheap");
189
190    long ts1 = 1;
191    long ts2 = ts1 + 1;
192    long ts3 = ts1 + 2;
193
194    // Setting up region
195    String method = this.getName();
196    this.region = initHRegion(tableName, method, conf, test_util, fam1);
197    try {
198      List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, true);
199
200      List<Cell> actual = performScan(row1, fam1);
201      // Verify result
202      for (int i = 0; i < expected.size(); i++) {
203        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
204        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
205      }
206      // Wait for the bucket cache threads to move the data to offheap
207      Thread.sleep(500);
208      // do the scan again and verify. This time it should be from the bucket cache in offheap mode
209      // but one of the cell will be copied due to the asSubByteBuff call
210      Scan scan = new Scan().withStartRow(row1).addFamily(fam1).readVersions(10);
211      actual = new ArrayList<>();
212      InternalScanner scanner = region.getScanner(scan);
213
214      boolean hasNext = scanner.next(actual);
215      assertEquals(false, hasNext);
216      // Verify result
217      for (int i = 0; i < expected.size(); i++) {
218        if (i != 5) {
219          // the last cell fetched will be of type shareable but not offheap because
220          // the MBB is copied to form a single cell
221          assertTrue(actual.get(i) instanceof ByteBufferKeyValue);
222        }
223      }
224
225    } catch (InterruptedException e) {
226    } finally {
227      HBaseTestingUtil.closeRegionAndWAL(this.region);
228      this.region = null;
229    }
230  }
231
232  private List<Cell> insertData(byte[] row1, byte[] qf1, byte[] qf2, byte[] fam1, long ts1,
233    long ts2, long ts3, boolean withVal) throws IOException {
234    // Putting data in Region
235    Put put = null;
236    KeyValue kv13 = null;
237    KeyValue kv12 = null;
238    KeyValue kv11 = null;
239
240    KeyValue kv23 = null;
241    KeyValue kv22 = null;
242    KeyValue kv21 = null;
243    if (!withVal) {
244      kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
245      kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
246      kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
247
248      kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
249      kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
250      kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
251    } else {
252      kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, val);
253      kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, val);
254      kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, val);
255
256      kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, val);
257      kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, val);
258      kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, val);
259    }
260
261    put = new Put(row1);
262    put.add(kv13);
263    put.add(kv12);
264    put.add(kv11);
265    put.add(kv23);
266    put.add(kv22);
267    put.add(kv21);
268    region.put(put);
269    region.flush(true);
270    HStore store = region.getStore(fam1);
271    while (store.getStorefilesCount() <= 0) {
272      try {
273        Thread.sleep(20);
274      } catch (InterruptedException e) {
275      }
276    }
277
278    // Expected
279    List<Cell> expected = new ArrayList<>();
280    expected.add(kv13);
281    expected.add(kv12);
282    expected.add(kv23);
283    expected.add(kv22);
284    return expected;
285  }
286
287  private List<Cell> performScan(byte[] row1, byte[] fam1) throws IOException {
288    Scan scan = new Scan().withStartRow(row1).addFamily(fam1).readVersions(MAX_VERSIONS);
289    List<Cell> actual = new ArrayList<>();
290    InternalScanner scanner = region.getScanner(scan);
291
292    boolean hasNext = scanner.next(actual);
293    assertEquals(false, hasNext);
294    return actual;
295  }
296
297  private static HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
298    HBaseTestingUtil test_util, byte[]... families) throws IOException {
299    return initHRegion(tableName, null, null, callingMethod, conf, test_util, false, families);
300  }
301
302  private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
303    String callingMethod, Configuration conf, HBaseTestingUtil testUtil, boolean isReadOnly,
304    byte[]... families) throws IOException {
305    RegionInfo regionInfo =
306      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
307    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
308    builder.setReadOnly(isReadOnly).setDurability(Durability.SYNC_WAL);
309    for (byte[] family : families) {
310      builder.setColumnFamily(
311        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(Integer.MAX_VALUE).build());
312    }
313    return HBaseTestingUtil.createRegionAndWAL(regionInfo, testUtil.getDataTestDir(callingMethod),
314      conf, builder.build(), BlockCacheFactory.createBlockCache(conf));
315  }
316}