001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.ByteBufferKeyValue;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
036import org.apache.hadoop.hbase.client.Durability;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RegionInfoBuilder;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.HStore;
044import org.apache.hadoop.hbase.regionserver.InternalScanner;
045import org.apache.hadoop.hbase.testclassification.RegionServerTests;
046import org.apache.hadoop.hbase.testclassification.SmallTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
049import org.junit.After;
050import org.junit.ClassRule;
051import org.junit.Rule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.junit.rules.TestName;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058@Category({ RegionServerTests.class, SmallTests.class })
059public class TestScannerFromBucketCache {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063      HBaseClassTestRule.forClass(TestScannerFromBucketCache.class);
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestScannerFromBucketCache.class);
066  @Rule
067  public TestName name = new TestName();
068
069  HRegion region = null;
070  private HBaseTestingUtility test_util;
071  private Configuration conf;
072  private final int MAX_VERSIONS = 2;
073  byte[] val = new byte[512 * 1024];
074
075  // Test names
076  private TableName tableName;
077
078  private void setUp(boolean useBucketCache) throws IOException {
079    test_util = HBaseTestingUtility.createLocalHTU();
080    conf = test_util.getConfiguration();
081    if (useBucketCache) {
082      conf.setInt("hbase.bucketcache.size", 400);
083      conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
084      conf.setInt("hbase.bucketcache.writer.threads", 10);
085      conf.setFloat("hfile.block.cache.size", 0.2f);
086      conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
087    }
088    tableName = TableName.valueOf(name.getMethodName());
089  }
090
091  @After
092  public void tearDown() throws Exception {
093    EnvironmentEdgeManagerTestHelper.reset();
094    LOG.info("Cleaning test directory: " + test_util.getDataTestDir());
095    test_util.cleanupTestDir();
096  }
097
098  String getName() {
099    return name.getMethodName();
100  }
101
102  @Test
103  public void testBasicScanWithLRUCache() throws IOException {
104    setUp(false);
105    byte[] row1 = Bytes.toBytes("row1");
106    byte[] qf1 = Bytes.toBytes("qualifier1");
107    byte[] qf2 = Bytes.toBytes("qualifier2");
108    byte[] fam1 = Bytes.toBytes("lrucache");
109
110    long ts1 = 1; // System.currentTimeMillis();
111    long ts2 = ts1 + 1;
112    long ts3 = ts1 + 2;
113
114    // Setting up region
115    String method = this.getName();
116    this.region = initHRegion(tableName, method, conf, test_util, fam1);
117    try {
118      List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, false);
119
120      List<Cell> actual = performScan(row1, fam1);
121      // Verify result
122      for (int i = 0; i < expected.size(); i++) {
123        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
124        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
125      }
126      // do the scan again and verify. This time it should be from the lru cache
127      actual = performScan(row1, fam1);
128      // Verify result
129      for (int i = 0; i < expected.size(); i++) {
130        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
131        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
132      }
133
134    } finally {
135      HBaseTestingUtility.closeRegionAndWAL(this.region);
136      this.region = null;
137    }
138  }
139
140  @Test
141  public void testBasicScanWithOffheapBucketCache() throws IOException {
142    setUp(true);
143    byte[] row1 = Bytes.toBytes("row1offheap");
144    byte[] qf1 = Bytes.toBytes("qualifier1");
145    byte[] qf2 = Bytes.toBytes("qualifier2");
146    byte[] fam1 = Bytes.toBytes("famoffheap");
147
148    long ts1 = 1; // System.currentTimeMillis();
149    long ts2 = ts1 + 1;
150    long ts3 = ts1 + 2;
151
152    // Setting up region
153    String method = this.getName();
154    this.region = initHRegion(tableName, method, conf, test_util, fam1);
155    try {
156      List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, false);
157
158      List<Cell> actual = performScan(row1, fam1);
159      // Verify result
160      for (int i = 0; i < expected.size(); i++) {
161        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
162        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
163      }
164      // Wait for the bucket cache threads to move the data to offheap
165      Thread.sleep(500);
166      // do the scan again and verify. This time it should be from the bucket cache in offheap mode
167      actual = performScan(row1, fam1);
168      // Verify result
169      for (int i = 0; i < expected.size(); i++) {
170        assertTrue(actual.get(i) instanceof ByteBufferKeyValue);
171        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
172      }
173
174    } catch (InterruptedException e) {
175    } finally {
176      HBaseTestingUtility.closeRegionAndWAL(this.region);
177      this.region = null;
178    }
179  }
180
181  @Test
182  public void testBasicScanWithOffheapBucketCacheWithMBB() throws IOException {
183    setUp(true);
184    byte[] row1 = Bytes.toBytes("row1offheap");
185    byte[] qf1 = Bytes.toBytes("qualifier1");
186    byte[] qf2 = Bytes.toBytes("qualifier2");
187    byte[] fam1 = Bytes.toBytes("famoffheap");
188
189    long ts1 = 1; // System.currentTimeMillis();
190    long ts2 = ts1 + 1;
191    long ts3 = ts1 + 2;
192
193    // Setting up region
194    String method = this.getName();
195    this.region = initHRegion(tableName, method, conf, test_util, fam1);
196    try {
197      List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, true);
198
199      List<Cell> actual = performScan(row1, fam1);
200      // Verify result
201      for (int i = 0; i < expected.size(); i++) {
202        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
203        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
204      }
205      // Wait for the bucket cache threads to move the data to offheap
206      Thread.sleep(500);
207      // do the scan again and verify. This time it should be from the bucket cache in offheap mode
208      // but one of the cell will be copied due to the asSubByteBuff call
209      Scan scan = new Scan().withStartRow(row1).addFamily(fam1).readVersions(10);
210      actual = new ArrayList<>();
211      InternalScanner scanner = region.getScanner(scan);
212
213      boolean hasNext = scanner.next(actual);
214      assertEquals(false, hasNext);
215      // Verify result
216      for (int i = 0; i < expected.size(); i++) {
217        if (i != 5) {
218          // the last cell fetched will be of type shareable but not offheap because
219          // the MBB is copied to form a single cell
220          assertTrue(actual.get(i) instanceof ByteBufferKeyValue);
221        }
222      }
223
224    } catch (InterruptedException e) {
225    } finally {
226      HBaseTestingUtility.closeRegionAndWAL(this.region);
227      this.region = null;
228    }
229  }
230
231  private List<Cell> insertData(byte[] row1, byte[] qf1, byte[] qf2, byte[] fam1, long ts1,
232      long ts2, long ts3, boolean withVal) throws IOException {
233    // Putting data in Region
234    Put put = null;
235    KeyValue kv13 = null;
236    KeyValue kv12 = null;
237    KeyValue kv11 = null;
238
239    KeyValue kv23 = null;
240    KeyValue kv22 = null;
241    KeyValue kv21 = null;
242    if (!withVal) {
243      kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
244      kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
245      kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
246
247      kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
248      kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
249      kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
250    } else {
251      kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, val);
252      kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, val);
253      kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, val);
254
255      kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, val);
256      kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, val);
257      kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, val);
258    }
259
260    put = new Put(row1);
261    put.add(kv13);
262    put.add(kv12);
263    put.add(kv11);
264    put.add(kv23);
265    put.add(kv22);
266    put.add(kv21);
267    region.put(put);
268    region.flush(true);
269    HStore store = region.getStore(fam1);
270    while (store.getStorefilesCount() <= 0) {
271      try {
272        Thread.sleep(20);
273      } catch (InterruptedException e) {
274      }
275    }
276
277    // Expected
278    List<Cell> expected = new ArrayList<>();
279    expected.add(kv13);
280    expected.add(kv12);
281    expected.add(kv23);
282    expected.add(kv22);
283    return expected;
284  }
285
286  private List<Cell> performScan(byte[] row1, byte[] fam1) throws IOException {
287    Scan scan = new Scan().withStartRow(row1).addFamily(fam1).readVersions(MAX_VERSIONS);
288    List<Cell> actual = new ArrayList<>();
289    InternalScanner scanner = region.getScanner(scan);
290
291    boolean hasNext = scanner.next(actual);
292    assertEquals(false, hasNext);
293    return actual;
294  }
295
296  private static HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
297      HBaseTestingUtility test_util, byte[]... families) throws IOException {
298    return initHRegion(tableName, null, null, callingMethod, conf, test_util, false, families);
299  }
300
301  private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
302      String callingMethod, Configuration conf, HBaseTestingUtility testUtil, boolean isReadOnly,
303      byte[]... families) throws IOException {
304    RegionInfo regionInfo =
305        RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
306    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
307    builder.setReadOnly(isReadOnly).setDurability(Durability.SYNC_WAL);
308    for (byte[] family : families) {
309      builder.setColumnFamily(
310          ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(Integer.MAX_VALUE)
311              .build());
312    }
313    return HBaseTestingUtility
314        .createRegionAndWAL(regionInfo, testUtil.getDataTestDir(callingMethod), conf,
315            builder.build(), BlockCacheFactory.createBlockCache(conf));
316  }
317}