001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicReference;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
042import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
043import org.apache.hadoop.hbase.coprocessor.ObserverContext;
044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
046import org.apache.hadoop.hbase.coprocessor.RegionObserver;
047import org.apache.hadoop.hbase.io.hfile.BlockCache;
048import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
049import org.apache.hadoop.hbase.io.hfile.CacheConfig;
050import org.apache.hadoop.hbase.io.hfile.CachedBlock;
051import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
052import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
053import org.apache.hadoop.hbase.regionserver.BloomType;
054import org.apache.hadoop.hbase.regionserver.HRegion;
055import org.apache.hadoop.hbase.regionserver.HStore;
056import org.apache.hadoop.hbase.regionserver.InternalScanner;
057import org.apache.hadoop.hbase.regionserver.RegionScanner;
058import org.apache.hadoop.hbase.regionserver.ScannerContext;
059import org.apache.hadoop.hbase.testclassification.ClientTests;
060import org.apache.hadoop.hbase.testclassification.LargeTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.junit.After;
064import org.junit.AfterClass;
065import org.junit.Before;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Rule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.junit.rules.TestName;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
076
077@Category({ LargeTests.class, ClientTests.class })
078@SuppressWarnings("deprecation")
079public class TestBlockEvictionFromClient {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083    HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class);
084
085  private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
086  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
087  static byte[][] ROWS = new byte[2][];
088  private static int NO_OF_THREADS = 3;
089  private static byte[] ROW = Bytes.toBytes("testRow");
090  private static byte[] ROW1 = Bytes.toBytes("testRow1");
091  private static byte[] ROW2 = Bytes.toBytes("testRow2");
092  private static byte[] ROW3 = Bytes.toBytes("testRow3");
093  private static byte[] FAMILY = Bytes.toBytes("testFamily");
094  private static byte[][] FAMILIES_1 = new byte[1][0];
095  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
096  private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
097  private static byte[] data = new byte[1000];
098  private static byte[] data2 = Bytes.add(data, data);
099  protected static int SLAVES = 1;
100  private static CountDownLatch latch;
101  private static CountDownLatch getLatch;
102  private static CountDownLatch compactionLatch;
103  private static CountDownLatch exceptionLatch;
104
105  @Rule
106  public TestName name = new TestName();
107
108  /**
109   * @throws java.lang.Exception
110   */
111  @BeforeClass
112  public static void setUpBeforeClass() throws Exception {
113    ROWS[0] = ROW;
114    ROWS[1] = ROW1;
115    Configuration conf = TEST_UTIL.getConfiguration();
116    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
117      MultiRowMutationEndpoint.class.getName());
118    conf.setInt("hbase.regionserver.handler.count", 20);
119    conf.setInt("hbase.bucketcache.size", 400);
120    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
121    conf.setFloat("hfile.block.cache.size", 0.2f);
122    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
123    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
124    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
125    FAMILIES_1[0] = FAMILY;
126    TEST_UTIL.startMiniCluster(SLAVES);
127  }
128
129  /**
130   * @throws java.lang.Exception
131   */
132  @AfterClass
133  public static void tearDownAfterClass() throws Exception {
134    TEST_UTIL.shutdownMiniCluster();
135  }
136
137  /**
138   * @throws java.lang.Exception
139   */
140  @Before
141  public void setUp() throws Exception {
142    CustomInnerRegionObserver.waitForGets.set(false);
143    CustomInnerRegionObserver.countOfNext.set(0);
144    CustomInnerRegionObserver.countOfGets.set(0);
145  }
146
147  /**
148   * @throws java.lang.Exception
149   */
150  @After
151  public void tearDown() throws Exception {
152    if (latch != null) {
153      while (latch.getCount() > 0) {
154        latch.countDown();
155      }
156    }
157    if (getLatch != null) {
158      getLatch.countDown();
159    }
160    if (compactionLatch != null) {
161      compactionLatch.countDown();
162    }
163    if (exceptionLatch != null) {
164      exceptionLatch.countDown();
165    }
166    latch = null;
167    getLatch = null;
168    compactionLatch = null;
169    exceptionLatch = null;
170    CustomInnerRegionObserver.throwException.set(false);
171    // Clean up the tables for every test case
172    TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
173    for (TableName tableName : listTableNames) {
174      if (!tableName.isSystemTable()) {
175        TEST_UTIL.getAdmin().disableTable(tableName);
176        TEST_UTIL.getAdmin().deleteTable(tableName);
177      }
178    }
179  }
180
181  @Test
182  public void testBlockEvictionWithParallelScans() throws Exception {
183    Table table = null;
184    try {
185      latch = new CountDownLatch(1);
186      final TableName tableName = TableName.valueOf(name.getMethodName());
187      // Create a table with block size as 1024
188      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
189        CustomInnerRegionObserver.class.getName());
190      // get the block cache and region
191      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
192      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
193      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
194      HStore store = region.getStores().iterator().next();
195      CacheConfig cacheConf = store.getCacheConfig();
196      cacheConf.setCacheDataOnWrite(true);
197      cacheConf.setEvictOnClose(true);
198      BlockCache cache = cacheConf.getBlockCache().get();
199
200      // insert data. 2 Rows are added
201      Put put = new Put(ROW);
202      put.addColumn(FAMILY, QUALIFIER, data);
203      table.put(put);
204      put = new Put(ROW1);
205      put.addColumn(FAMILY, QUALIFIER, data);
206      table.put(put);
207      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
208      // data was in memstore so don't expect any changes
209      // flush the data
210      // Should create one Hfile with 2 blocks
211      region.flush(true);
212      // Load cache
213      // Create three sets of scan
214      ScanThread[] scanThreads = initiateScan(table, false);
215      Thread.sleep(100);
216      checkForBlockEviction(cache, false, false);
217      for (ScanThread thread : scanThreads) {
218        thread.join();
219      }
220      // CustomInnerRegionObserver.sleepTime.set(0);
221      Iterator<CachedBlock> iterator = cache.iterator();
222      iterateBlockCache(cache, iterator);
223      // read the data and expect same blocks, one new hit, no misses
224      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
225      iterator = cache.iterator();
226      iterateBlockCache(cache, iterator);
227      // Check how this miss is happening
228      // insert a second column, read the row, no new blocks, 3 new hits
229      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
230      byte[] data2 = Bytes.add(data, data);
231      put = new Put(ROW);
232      put.addColumn(FAMILY, QUALIFIER2, data2);
233      table.put(put);
234      Result r = table.get(new Get(ROW));
235      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
236      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
237      iterator = cache.iterator();
238      iterateBlockCache(cache, iterator);
239      // flush, one new block
240      System.out.println("Flushing cache");
241      region.flush(true);
242      iterator = cache.iterator();
243      iterateBlockCache(cache, iterator);
244      // compact, net minus two blocks, two hits, no misses
245      System.out.println("Compacting");
246      assertEquals(2, store.getStorefilesCount());
247      store.triggerMajorCompaction();
248      region.compact(true);
249      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
250      assertEquals(1, store.getStorefilesCount());
251      iterator = cache.iterator();
252      iterateBlockCache(cache, iterator);
253      // read the row, this should be a cache miss because we don't cache data
254      // blocks on compaction
255      r = table.get(new Get(ROW));
256      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
257      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
258      iterator = cache.iterator();
259      iterateBlockCache(cache, iterator);
260    } finally {
261      if (table != null) {
262        table.close();
263      }
264    }
265  }
266
267  @Test
268  public void testParallelGetsAndScans() throws IOException, InterruptedException {
269    Table table = null;
270    try {
271      latch = new CountDownLatch(2);
272      // Check if get() returns blocks on its close() itself
273      getLatch = new CountDownLatch(1);
274      final TableName tableName = TableName.valueOf(name.getMethodName());
275      // Create KV that will give you two blocks
276      // Create a table with block size as 1024
277      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
278        CustomInnerRegionObserver.class.getName());
279      // get the block cache and region
280      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
281      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
282      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
283      HStore store = region.getStores().iterator().next();
284      CacheConfig cacheConf = store.getCacheConfig();
285      cacheConf.setCacheDataOnWrite(true);
286      cacheConf.setEvictOnClose(true);
287      BlockCache cache = cacheConf.getBlockCache().get();
288
289      insertData(table);
290      // flush the data
291      System.out.println("Flushing cache");
292      // Should create one Hfile with 2 blocks
293      region.flush(true);
294      // Create three sets of scan
295      CustomInnerRegionObserver.waitForGets.set(true);
296      ScanThread[] scanThreads = initiateScan(table, false);
297      // Create three sets of gets
298      GetThread[] getThreads = initiateGet(table, false, false);
299      checkForBlockEviction(cache, false, false);
300      CustomInnerRegionObserver.waitForGets.set(false);
301      checkForBlockEviction(cache, false, false);
302      for (GetThread thread : getThreads) {
303        thread.join();
304      }
305      // Verify whether the gets have returned the blocks that it had
306      CustomInnerRegionObserver.waitForGets.set(true);
307      // giving some time for the block to be decremented
308      checkForBlockEviction(cache, true, false);
309      getLatch.countDown();
310      for (ScanThread thread : scanThreads) {
311        thread.join();
312      }
313      System.out.println("Scans should have returned the bloks");
314      // Check with either true or false
315      CustomInnerRegionObserver.waitForGets.set(false);
316      // The scan should also have released the blocks by now
317      checkForBlockEviction(cache, true, true);
318    } finally {
319      if (table != null) {
320        table.close();
321      }
322    }
323  }
324
325  @Test
326  public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
327    Table table = null;
328    try {
329      latch = new CountDownLatch(1);
330      // Check if get() returns blocks on its close() itself
331      getLatch = new CountDownLatch(1);
332      final TableName tableName = TableName.valueOf(name.getMethodName());
333      // Create KV that will give you two blocks
334      // Create a table with block size as 1024
335      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
336        CustomInnerRegionObserver.class.getName());
337      // get the block cache and region
338      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
339      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
340      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
341      HStore store = region.getStores().iterator().next();
342      CacheConfig cacheConf = store.getCacheConfig();
343      cacheConf.setCacheDataOnWrite(true);
344      cacheConf.setEvictOnClose(true);
345      BlockCache cache = cacheConf.getBlockCache().get();
346
347      Put put = new Put(ROW);
348      put.addColumn(FAMILY, QUALIFIER, data);
349      table.put(put);
350      region.flush(true);
351      put = new Put(ROW1);
352      put.addColumn(FAMILY, QUALIFIER, data);
353      table.put(put);
354      region.flush(true);
355      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
356      put = new Put(ROW);
357      put.addColumn(FAMILY, QUALIFIER2, data2);
358      table.put(put);
359      region.flush(true);
360      // flush the data
361      System.out.println("Flushing cache");
362      // Should create one Hfile with 2 blocks
363      CustomInnerRegionObserver.waitForGets.set(true);
364      // Create three sets of gets
365      GetThread[] getThreads = initiateGet(table, false, false);
366      Thread.sleep(200);
367      CustomInnerRegionObserver.getCdl().get().countDown();
368      for (GetThread thread : getThreads) {
369        thread.join();
370      }
371      // Verify whether the gets have returned the blocks that it had
372      CustomInnerRegionObserver.waitForGets.set(true);
373      // giving some time for the block to be decremented
374      checkForBlockEviction(cache, true, false);
375      getLatch.countDown();
376      System.out.println("Gets should have returned the bloks");
377    } finally {
378      if (table != null) {
379        table.close();
380      }
381    }
382  }
383
384  @Test
385  // TODO : check how block index works here
386  public void testGetsWithMultiColumnsAndExplicitTracker()
387    throws IOException, InterruptedException {
388    Table table = null;
389    try {
390      latch = new CountDownLatch(1);
391      // Check if get() returns blocks on its close() itself
392      getLatch = new CountDownLatch(1);
393      final TableName tableName = TableName.valueOf(name.getMethodName());
394      // Create KV that will give you two blocks
395      // Create a table with block size as 1024
396      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
397        CustomInnerRegionObserver.class.getName());
398      // get the block cache and region
399      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
400      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
401      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
402      BlockCache cache = setCacheProperties(region);
403      Put put = new Put(ROW);
404      put.addColumn(FAMILY, QUALIFIER, data);
405      table.put(put);
406      region.flush(true);
407      put = new Put(ROW1);
408      put.addColumn(FAMILY, QUALIFIER, data);
409      table.put(put);
410      region.flush(true);
411      for (int i = 1; i < 10; i++) {
412        put = new Put(ROW);
413        put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
414        table.put(put);
415        if (i % 2 == 0) {
416          region.flush(true);
417        }
418      }
419      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
420      put = new Put(ROW);
421      put.addColumn(FAMILY, QUALIFIER2, data2);
422      table.put(put);
423      region.flush(true);
424      // flush the data
425      System.out.println("Flushing cache");
426      // Should create one Hfile with 2 blocks
427      CustomInnerRegionObserver.waitForGets.set(true);
428      // Create three sets of gets
429      GetThread[] getThreads = initiateGet(table, true, false);
430      Thread.sleep(200);
431      Iterator<CachedBlock> iterator = cache.iterator();
432      boolean usedBlocksFound = false;
433      int refCount = 0;
434      int noOfBlocksWithRef = 0;
435      while (iterator.hasNext()) {
436        CachedBlock next = iterator.next();
437        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
438        if (cache instanceof BucketCache) {
439          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
440        } else if (cache instanceof CombinedBlockCache) {
441          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
442        } else {
443          continue;
444        }
445        if (refCount != 0) {
446          // Blocks will be with count 3
447          System.out.println("The refCount is " + refCount);
448          assertEquals(NO_OF_THREADS, refCount);
449          usedBlocksFound = true;
450          noOfBlocksWithRef++;
451        }
452      }
453      assertTrue(usedBlocksFound);
454      // the number of blocks referred
455      assertEquals(10, noOfBlocksWithRef);
456      CustomInnerRegionObserver.getCdl().get().countDown();
457      for (GetThread thread : getThreads) {
458        thread.join();
459      }
460      // Verify whether the gets have returned the blocks that it had
461      CustomInnerRegionObserver.waitForGets.set(true);
462      // giving some time for the block to be decremented
463      checkForBlockEviction(cache, true, false);
464      getLatch.countDown();
465      System.out.println("Gets should have returned the bloks");
466    } finally {
467      if (table != null) {
468        table.close();
469      }
470    }
471  }
472
473  @Test
474  public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
475    Table table = null;
476    try {
477      latch = new CountDownLatch(1);
478      // Check if get() returns blocks on its close() itself
479      getLatch = new CountDownLatch(1);
480      final TableName tableName = TableName.valueOf(name.getMethodName());
481      // Create KV that will give you two blocks
482      // Create a table with block size as 1024
483      byte[][] fams = new byte[10][];
484      fams[0] = FAMILY;
485      for (int i = 1; i < 10; i++) {
486        fams[i] = (Bytes.toBytes("testFamily" + i));
487      }
488      table =
489        TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName());
490      // get the block cache and region
491      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
492      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
493      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
494      BlockCache cache = setCacheProperties(region);
495
496      Put put = new Put(ROW);
497      put.addColumn(FAMILY, QUALIFIER, data);
498      table.put(put);
499      region.flush(true);
500      put = new Put(ROW1);
501      put.addColumn(FAMILY, QUALIFIER, data);
502      table.put(put);
503      region.flush(true);
504      for (int i = 1; i < 10; i++) {
505        put = new Put(ROW);
506        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
507        table.put(put);
508        if (i % 2 == 0) {
509          region.flush(true);
510        }
511      }
512      region.flush(true);
513      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
514      put = new Put(ROW);
515      put.addColumn(FAMILY, QUALIFIER2, data2);
516      table.put(put);
517      region.flush(true);
518      // flush the data
519      System.out.println("Flushing cache");
520      // Should create one Hfile with 2 blocks
521      CustomInnerRegionObserver.waitForGets.set(true);
522      // Create three sets of gets
523      GetThread[] getThreads = initiateGet(table, true, true);
524      Thread.sleep(200);
525      Iterator<CachedBlock> iterator = cache.iterator();
526      boolean usedBlocksFound = false;
527      int refCount = 0;
528      int noOfBlocksWithRef = 0;
529      while (iterator.hasNext()) {
530        CachedBlock next = iterator.next();
531        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
532        if (cache instanceof BucketCache) {
533          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
534        } else if (cache instanceof CombinedBlockCache) {
535          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
536        } else {
537          continue;
538        }
539        if (refCount != 0) {
540          // Blocks will be with count 3
541          System.out.println("The refCount is " + refCount);
542          assertEquals(NO_OF_THREADS, refCount);
543          usedBlocksFound = true;
544          noOfBlocksWithRef++;
545        }
546      }
547      assertTrue(usedBlocksFound);
548      // the number of blocks referred
549      assertEquals(3, noOfBlocksWithRef);
550      CustomInnerRegionObserver.getCdl().get().countDown();
551      for (GetThread thread : getThreads) {
552        thread.join();
553      }
554      // Verify whether the gets have returned the blocks that it had
555      CustomInnerRegionObserver.waitForGets.set(true);
556      // giving some time for the block to be decremented
557      checkForBlockEviction(cache, true, false);
558      getLatch.countDown();
559      System.out.println("Gets should have returned the bloks");
560    } finally {
561      if (table != null) {
562        table.close();
563      }
564    }
565  }
566
567  @Test
568  public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
569    Table table = null;
570    try {
571      final TableName tableName = TableName.valueOf(name.getMethodName());
572      TableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName);
573      // This test expects rpc refcount of cached data blocks to be 0 after split. After split,
574      // two daughter regions are opened and a compaction is scheduled to get rid of reference
575      // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1.
576      // It is flakey since compaction can kick in anytime. To solve this issue, table is created
577      // with compaction disabled.
578      table = TEST_UTIL.createTable(
579        TableDescriptorBuilder.newBuilder(desc).setCompactionEnabled(false).build(), FAMILIES_1,
580        null, BloomType.ROW, 1024, null);
581      // get the block cache and region
582      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
583      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
584      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
585      HStore store = region.getStores().iterator().next();
586      CacheConfig cacheConf = store.getCacheConfig();
587      cacheConf.setEvictOnClose(true);
588      BlockCache cache = cacheConf.getBlockCache().get();
589
590      Put put = new Put(ROW);
591      put.addColumn(FAMILY, QUALIFIER, data);
592      table.put(put);
593      region.flush(true);
594      put = new Put(ROW1);
595      put.addColumn(FAMILY, QUALIFIER, data);
596      table.put(put);
597      region.flush(true);
598      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
599      put = new Put(ROW2);
600      put.addColumn(FAMILY, QUALIFIER2, data2);
601      table.put(put);
602      put = new Put(ROW3);
603      put.addColumn(FAMILY, QUALIFIER2, data2);
604      table.put(put);
605      region.flush(true);
606      ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
607      int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
608      LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(),
609        regionCount);
610      TEST_UTIL.getAdmin().split(tableName, ROW1);
611      // Wait for splits
612      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
613      region.compact(true);
614      List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions();
615      for (HRegion r : regions) {
616        LOG.info("" + r.getCompactionState());
617        TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE));
618      }
619      LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache);
620      Iterator<CachedBlock> iterator = cache.iterator();
621      // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
622      // should be closed inorder to return those blocks
623      iterateBlockCache(cache, iterator);
624    } finally {
625      if (table != null) {
626        table.close();
627      }
628    }
629  }
630
631  @Test
632  public void testMultiGets() throws IOException, InterruptedException {
633    Table table = null;
634    try {
635      latch = new CountDownLatch(2);
636      // Check if get() returns blocks on its close() itself
637      getLatch = new CountDownLatch(1);
638      final TableName tableName = TableName.valueOf(name.getMethodName());
639      // Create KV that will give you two blocks
640      // Create a table with block size as 1024
641      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
642        CustomInnerRegionObserver.class.getName());
643      // get the block cache and region
644      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
645      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
646      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
647      HStore store = region.getStores().iterator().next();
648      CacheConfig cacheConf = store.getCacheConfig();
649      cacheConf.setCacheDataOnWrite(true);
650      cacheConf.setEvictOnClose(true);
651      BlockCache cache = cacheConf.getBlockCache().get();
652
653      Put put = new Put(ROW);
654      put.addColumn(FAMILY, QUALIFIER, data);
655      table.put(put);
656      region.flush(true);
657      put = new Put(ROW1);
658      put.addColumn(FAMILY, QUALIFIER, data);
659      table.put(put);
660      region.flush(true);
661      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
662      put = new Put(ROW);
663      put.addColumn(FAMILY, QUALIFIER2, data2);
664      table.put(put);
665      region.flush(true);
666      // flush the data
667      System.out.println("Flushing cache");
668      // Should create one Hfile with 2 blocks
669      CustomInnerRegionObserver.waitForGets.set(true);
670      // Create three sets of gets
671      MultiGetThread[] getThreads = initiateMultiGet(table);
672      Thread.sleep(200);
673      int refCount;
674      Iterator<CachedBlock> iterator = cache.iterator();
675      boolean foundNonZeroBlock = false;
676      while (iterator.hasNext()) {
677        CachedBlock next = iterator.next();
678        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
679        if (cache instanceof BucketCache) {
680          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
681        } else if (cache instanceof CombinedBlockCache) {
682          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
683        } else {
684          continue;
685        }
686        if (refCount != 0) {
687          assertEquals(NO_OF_THREADS, refCount);
688          foundNonZeroBlock = true;
689        }
690      }
691      assertTrue("Should have found nonzero ref count block", foundNonZeroBlock);
692      CustomInnerRegionObserver.getCdl().get().countDown();
693      CustomInnerRegionObserver.getCdl().get().countDown();
694      for (MultiGetThread thread : getThreads) {
695        thread.join();
696      }
697      // Verify whether the gets have returned the blocks that it had
698      CustomInnerRegionObserver.waitForGets.set(true);
699      // giving some time for the block to be decremented
700      iterateBlockCache(cache, iterator);
701      getLatch.countDown();
702      System.out.println("Gets should have returned the bloks");
703    } finally {
704      if (table != null) {
705        table.close();
706      }
707    }
708  }
709
710  @Test
711  public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
712    Table table = null;
713    try {
714      latch = new CountDownLatch(1);
715      // Check if get() returns blocks on its close() itself
716      final TableName tableName = TableName.valueOf(name.getMethodName());
717      // Create KV that will give you two blocks
718      // Create a table with block size as 1024
719      byte[][] fams = new byte[10][];
720      fams[0] = FAMILY;
721      for (int i = 1; i < 10; i++) {
722        fams[i] = (Bytes.toBytes("testFamily" + i));
723      }
724      table =
725        TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName());
726      // get the block cache and region
727      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
728      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
729      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
730      BlockCache cache = setCacheProperties(region);
731
732      Put put = new Put(ROW);
733      put.addColumn(FAMILY, QUALIFIER, data);
734      table.put(put);
735      region.flush(true);
736      put = new Put(ROW1);
737      put.addColumn(FAMILY, QUALIFIER, data);
738      table.put(put);
739      region.flush(true);
740      for (int i = 1; i < 10; i++) {
741        put = new Put(ROW);
742        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
743        table.put(put);
744        if (i % 2 == 0) {
745          region.flush(true);
746        }
747      }
748      region.flush(true);
749      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
750      put = new Put(ROW);
751      put.addColumn(FAMILY, QUALIFIER2, data2);
752      table.put(put);
753      region.flush(true);
754      // flush the data
755      System.out.println("Flushing cache");
756      // Should create one Hfile with 2 blocks
757      // Create three sets of gets
758      ScanThread[] scanThreads = initiateScan(table, true);
759      Thread.sleep(200);
760      Iterator<CachedBlock> iterator = cache.iterator();
761      boolean usedBlocksFound = false;
762      int refCount = 0;
763      int noOfBlocksWithRef = 0;
764      while (iterator.hasNext()) {
765        CachedBlock next = iterator.next();
766        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
767        if (cache instanceof BucketCache) {
768          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
769        } else if (cache instanceof CombinedBlockCache) {
770          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
771        } else {
772          continue;
773        }
774        if (refCount != 0) {
775          // Blocks will be with count 3
776          System.out.println("The refCount is " + refCount);
777          assertEquals(NO_OF_THREADS, refCount);
778          usedBlocksFound = true;
779          noOfBlocksWithRef++;
780        }
781      }
782      assertTrue(usedBlocksFound);
783      // the number of blocks referred
784      assertEquals(12, noOfBlocksWithRef);
785      CustomInnerRegionObserver.getCdl().get().countDown();
786      for (ScanThread thread : scanThreads) {
787        thread.join();
788      }
789      // giving some time for the block to be decremented
790      checkForBlockEviction(cache, true, false);
791    } finally {
792      if (table != null) {
793        table.close();
794      }
795    }
796  }
797
798  private BlockCache setCacheProperties(HRegion region) {
799    Iterator<HStore> strItr = region.getStores().iterator();
800    BlockCache cache = null;
801    while (strItr.hasNext()) {
802      HStore store = strItr.next();
803      CacheConfig cacheConf = store.getCacheConfig();
804      cacheConf.setCacheDataOnWrite(true);
805      cacheConf.setEvictOnClose(true);
806      // Use the last one
807      cache = cacheConf.getBlockCache().get();
808    }
809    return cache;
810  }
811
812  @Test
813  public void testParallelGetsAndScanWithWrappedRegionScanner()
814    throws IOException, InterruptedException {
815    Table table = null;
816    try {
817      latch = new CountDownLatch(2);
818      // Check if get() returns blocks on its close() itself
819      getLatch = new CountDownLatch(1);
820      final TableName tableName = TableName.valueOf(name.getMethodName());
821      // Create KV that will give you two blocks
822      // Create a table with block size as 1024
823      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
824        CustomInnerRegionObserverWrapper.class.getName());
825      // get the block cache and region
826      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
827      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
828      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
829      HStore store = region.getStores().iterator().next();
830      CacheConfig cacheConf = store.getCacheConfig();
831      cacheConf.setCacheDataOnWrite(true);
832      cacheConf.setEvictOnClose(true);
833      BlockCache cache = cacheConf.getBlockCache().get();
834
835      // insert data. 2 Rows are added
836      insertData(table);
837      // flush the data
838      System.out.println("Flushing cache");
839      // Should create one Hfile with 2 blocks
840      region.flush(true);
841      // CustomInnerRegionObserver.sleepTime.set(5000);
842      // Create three sets of scan
843      CustomInnerRegionObserver.waitForGets.set(true);
844      ScanThread[] scanThreads = initiateScan(table, false);
845      // Create three sets of gets
846      GetThread[] getThreads = initiateGet(table, false, false);
847      // The block would have been decremented for the scan case as it was
848      // wrapped
849      // before even the postNext hook gets executed.
850      // giving some time for the block to be decremented
851      Thread.sleep(100);
852      CustomInnerRegionObserver.waitForGets.set(false);
853      checkForBlockEviction(cache, false, false);
854      // countdown the latch
855      CustomInnerRegionObserver.getCdl().get().countDown();
856      for (GetThread thread : getThreads) {
857        thread.join();
858      }
859      getLatch.countDown();
860      for (ScanThread thread : scanThreads) {
861        thread.join();
862      }
863    } finally {
864      if (table != null) {
865        table.close();
866      }
867    }
868  }
869
870  @Test
871  public void testScanWithCompaction() throws IOException, InterruptedException {
872    testScanWithCompactionInternals(name.getMethodName(), false);
873  }
874
875  @Test
876  public void testReverseScanWithCompaction() throws IOException, InterruptedException {
877    testScanWithCompactionInternals(name.getMethodName(), true);
878  }
879
880  private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
881    throws IOException, InterruptedException {
882    Table table = null;
883    try {
884      latch = new CountDownLatch(1);
885      compactionLatch = new CountDownLatch(1);
886      TableName tableName = TableName.valueOf(tableNameStr);
887      // Create a table with block size as 1024
888      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
889        CustomInnerRegionObserverWrapper.class.getName());
890      // get the block cache and region
891      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
892      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
893      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
894      HStore store = region.getStores().iterator().next();
895      CacheConfig cacheConf = store.getCacheConfig();
896      cacheConf.setCacheDataOnWrite(true);
897      cacheConf.setEvictOnClose(true);
898      BlockCache cache = cacheConf.getBlockCache().get();
899
900      // insert data. 2 Rows are added
901      Put put = new Put(ROW);
902      put.addColumn(FAMILY, QUALIFIER, data);
903      table.put(put);
904      put = new Put(ROW1);
905      put.addColumn(FAMILY, QUALIFIER, data);
906      table.put(put);
907      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
908      // Should create one Hfile with 2 blocks
909      region.flush(true);
910      // read the data and expect same blocks, one new hit, no misses
911      int refCount = 0;
912      // Check how this miss is happening
913      // insert a second column, read the row, no new blocks, 3 new hits
914      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
915      byte[] data2 = Bytes.add(data, data);
916      put = new Put(ROW);
917      put.addColumn(FAMILY, QUALIFIER2, data2);
918      table.put(put);
919      // flush, one new block
920      System.out.println("Flushing cache");
921      region.flush(true);
922      Iterator<CachedBlock> iterator = cache.iterator();
923      iterateBlockCache(cache, iterator);
924      // Create three sets of scan
925      ScanThread[] scanThreads = initiateScan(table, reversed);
926      Thread.sleep(100);
927      iterator = cache.iterator();
928      boolean usedBlocksFound = false;
929      while (iterator.hasNext()) {
930        CachedBlock next = iterator.next();
931        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
932        if (cache instanceof BucketCache) {
933          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
934        } else if (cache instanceof CombinedBlockCache) {
935          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
936        } else {
937          continue;
938        }
939        if (refCount != 0) {
940          // Blocks will be with count 3
941          assertEquals(NO_OF_THREADS, refCount);
942          usedBlocksFound = true;
943        }
944      }
945      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
946      usedBlocksFound = false;
947      System.out.println("Compacting");
948      assertEquals(2, store.getStorefilesCount());
949      store.triggerMajorCompaction();
950      region.compact(true);
951      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
952      assertEquals(1, store.getStorefilesCount());
953      // Even after compaction is done we will have some blocks that cannot
954      // be evicted this is because the scan is still referencing them
955      iterator = cache.iterator();
956      while (iterator.hasNext()) {
957        CachedBlock next = iterator.next();
958        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
959        if (cache instanceof BucketCache) {
960          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
961        } else if (cache instanceof CombinedBlockCache) {
962          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
963        } else {
964          continue;
965        }
966        if (refCount != 0) {
967          // Blocks will be with count 3 as they are not yet cleared
968          assertEquals(NO_OF_THREADS, refCount);
969          usedBlocksFound = true;
970        }
971      }
972      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
973      // Should not throw exception
974      compactionLatch.countDown();
975      latch.countDown();
976      for (ScanThread thread : scanThreads) {
977        thread.join();
978      }
979      // by this time all blocks should have been evicted
980      iterator = cache.iterator();
981      iterateBlockCache(cache, iterator);
982      Result r = table.get(new Get(ROW));
983      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
984      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
985      // The gets would be working on new blocks
986      iterator = cache.iterator();
987      iterateBlockCache(cache, iterator);
988    } finally {
989      if (table != null) {
990        table.close();
991      }
992    }
993  }
994
995  @Test
996  public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
997    throws IOException, InterruptedException {
998    // do flush and scan in parallel
999    Table table = null;
1000    try {
1001      latch = new CountDownLatch(1);
1002      compactionLatch = new CountDownLatch(1);
1003      final TableName tableName = TableName.valueOf(name.getMethodName());
1004      // Create a table with block size as 1024
1005      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1006        CustomInnerRegionObserverWrapper.class.getName());
1007      // get the block cache and region
1008      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1009      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1010      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1011      HStore store = region.getStores().iterator().next();
1012      CacheConfig cacheConf = store.getCacheConfig();
1013      cacheConf.setCacheDataOnWrite(true);
1014      cacheConf.setEvictOnClose(true);
1015      BlockCache cache = cacheConf.getBlockCache().get();
1016
1017      // insert data. 2 Rows are added
1018      Put put = new Put(ROW);
1019      put.addColumn(FAMILY, QUALIFIER, data);
1020      table.put(put);
1021      put = new Put(ROW1);
1022      put.addColumn(FAMILY, QUALIFIER, data);
1023      table.put(put);
1024      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1025      // Should create one Hfile with 2 blocks
1026      region.flush(true);
1027      // read the data and expect same blocks, one new hit, no misses
1028      int refCount = 0;
1029      // Check how this miss is happening
1030      // insert a second column, read the row, no new blocks, 3 new hits
1031      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1032      byte[] data2 = Bytes.add(data, data);
1033      put = new Put(ROW);
1034      put.addColumn(FAMILY, QUALIFIER2, data2);
1035      table.put(put);
1036      // flush, one new block
1037      System.out.println("Flushing cache");
1038      region.flush(true);
1039      Iterator<CachedBlock> iterator = cache.iterator();
1040      iterateBlockCache(cache, iterator);
1041      // Create three sets of scan
1042      ScanThread[] scanThreads = initiateScan(table, false);
1043      Thread.sleep(100);
1044      iterator = cache.iterator();
1045      boolean usedBlocksFound = false;
1046      while (iterator.hasNext()) {
1047        CachedBlock next = iterator.next();
1048        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1049        if (cache instanceof BucketCache) {
1050          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1051        } else if (cache instanceof CombinedBlockCache) {
1052          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1053        } else {
1054          continue;
1055        }
1056        if (refCount != 0) {
1057          // Blocks will be with count 3
1058          assertEquals(NO_OF_THREADS, refCount);
1059          usedBlocksFound = true;
1060        }
1061      }
1062      // Make a put and do a flush
1063      QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1064      data2 = Bytes.add(data, data);
1065      put = new Put(ROW1);
1066      put.addColumn(FAMILY, QUALIFIER2, data2);
1067      table.put(put);
1068      // flush, one new block
1069      System.out.println("Flushing cache");
1070      region.flush(true);
1071      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1072      usedBlocksFound = false;
1073      System.out.println("Compacting");
1074      assertEquals(3, store.getStorefilesCount());
1075      store.triggerMajorCompaction();
1076      region.compact(true);
1077      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1078      assertEquals(1, store.getStorefilesCount());
1079      // Even after compaction is done we will have some blocks that cannot
1080      // be evicted this is because the scan is still referencing them
1081      iterator = cache.iterator();
1082      while (iterator.hasNext()) {
1083        CachedBlock next = iterator.next();
1084        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1085        if (cache instanceof BucketCache) {
1086          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1087        } else if (cache instanceof CombinedBlockCache) {
1088          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1089        } else {
1090          continue;
1091        }
1092        if (refCount != 0) {
1093          // Blocks will be with count 3 as they are not yet cleared
1094          assertEquals(NO_OF_THREADS, refCount);
1095          usedBlocksFound = true;
1096        }
1097      }
1098      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1099      // Should not throw exception
1100      compactionLatch.countDown();
1101      latch.countDown();
1102      for (ScanThread thread : scanThreads) {
1103        thread.join();
1104      }
1105      // by this time all blocks should have been evicted
1106      iterator = cache.iterator();
1107      // Since a flush and compaction happened after a scan started
1108      // we need to ensure that all the original blocks of the compacted file
1109      // is also removed.
1110      iterateBlockCache(cache, iterator);
1111      Result r = table.get(new Get(ROW));
1112      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1113      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1114      // The gets would be working on new blocks
1115      iterator = cache.iterator();
1116      iterateBlockCache(cache, iterator);
1117    } finally {
1118      if (table != null) {
1119        table.close();
1120      }
1121    }
1122  }
1123
1124  @Test
1125  public void testScanWithException() throws IOException, InterruptedException {
1126    Table table = null;
1127    try {
1128      latch = new CountDownLatch(1);
1129      exceptionLatch = new CountDownLatch(1);
1130      final TableName tableName = TableName.valueOf(name.getMethodName());
1131      // Create KV that will give you two blocks
1132      // Create a table with block size as 1024
1133      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1134        CustomInnerRegionObserverWrapper.class.getName());
1135      // get the block cache and region
1136      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1137      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1138      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1139      HStore store = region.getStores().iterator().next();
1140      CacheConfig cacheConf = store.getCacheConfig();
1141      cacheConf.setCacheDataOnWrite(true);
1142      cacheConf.setEvictOnClose(true);
1143      BlockCache cache = cacheConf.getBlockCache().get();
1144      // insert data. 2 Rows are added
1145      insertData(table);
1146      // flush the data
1147      System.out.println("Flushing cache");
1148      // Should create one Hfile with 2 blocks
1149      region.flush(true);
1150      // CustomInnerRegionObserver.sleepTime.set(5000);
1151      CustomInnerRegionObserver.throwException.set(true);
1152      ScanThread[] scanThreads = initiateScan(table, false);
1153      // The block would have been decremented for the scan case as it was
1154      // wrapped
1155      // before even the postNext hook gets executed.
1156      // giving some time for the block to be decremented
1157      Thread.sleep(100);
1158      Iterator<CachedBlock> iterator = cache.iterator();
1159      boolean usedBlocksFound = false;
1160      int refCount = 0;
1161      while (iterator.hasNext()) {
1162        CachedBlock next = iterator.next();
1163        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1164        if (cache instanceof BucketCache) {
1165          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1166        } else if (cache instanceof CombinedBlockCache) {
1167          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1168        } else {
1169          continue;
1170        }
1171        if (refCount != 0) {
1172          // Blocks will be with count 3
1173          assertEquals(NO_OF_THREADS, refCount);
1174          usedBlocksFound = true;
1175        }
1176      }
1177      assertTrue(usedBlocksFound);
1178      exceptionLatch.countDown();
1179      // countdown the latch
1180      CustomInnerRegionObserver.getCdl().get().countDown();
1181      for (ScanThread thread : scanThreads) {
1182        thread.join();
1183      }
1184      iterator = cache.iterator();
1185      usedBlocksFound = false;
1186      refCount = 0;
1187      while (iterator.hasNext()) {
1188        CachedBlock next = iterator.next();
1189        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1190        if (cache instanceof BucketCache) {
1191          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1192        } else if (cache instanceof CombinedBlockCache) {
1193          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1194        } else {
1195          continue;
1196        }
1197        if (refCount != 0) {
1198          // Blocks will be with count 3
1199          assertEquals(NO_OF_THREADS, refCount);
1200          usedBlocksFound = true;
1201        }
1202      }
1203      assertFalse(usedBlocksFound);
1204      // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1205      assertEquals(0, refCount);
1206    } finally {
1207      if (table != null) {
1208        table.close();
1209      }
1210    }
1211  }
1212
1213  private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1214    int refCount;
1215    while (iterator.hasNext()) {
1216      CachedBlock next = iterator.next();
1217      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1218      if (cache instanceof BucketCache) {
1219        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1220        LOG.info("BucketCache {} {}", cacheKey, refCount);
1221      } else if (cache instanceof CombinedBlockCache) {
1222        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1223        LOG.info("CombinedBlockCache {} {}", cacheKey, refCount);
1224      } else {
1225        continue;
1226      }
1227      assertEquals(0, refCount);
1228    }
1229  }
1230
1231  private void insertData(Table table) throws IOException {
1232    Put put = new Put(ROW);
1233    put.addColumn(FAMILY, QUALIFIER, data);
1234    table.put(put);
1235    put = new Put(ROW1);
1236    put.addColumn(FAMILY, QUALIFIER, data);
1237    table.put(put);
1238    byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1239    put = new Put(ROW);
1240    put.addColumn(FAMILY, QUALIFIER2, data2);
1241    table.put(put);
1242  }
1243
1244  private ScanThread[] initiateScan(Table table, boolean reverse)
1245    throws IOException, InterruptedException {
1246    ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1247    for (int i = 0; i < NO_OF_THREADS; i++) {
1248      scanThreads[i] = new ScanThread(table, reverse);
1249    }
1250    for (ScanThread thread : scanThreads) {
1251      thread.start();
1252    }
1253    return scanThreads;
1254  }
1255
1256  private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1257    throws IOException, InterruptedException {
1258    GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1259    for (int i = 0; i < NO_OF_THREADS; i++) {
1260      getThreads[i] = new GetThread(table, tracker, multipleCFs);
1261    }
1262    for (GetThread thread : getThreads) {
1263      thread.start();
1264    }
1265    return getThreads;
1266  }
1267
1268  private MultiGetThread[] initiateMultiGet(Table table) throws IOException, InterruptedException {
1269    MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1270    for (int i = 0; i < NO_OF_THREADS; i++) {
1271      multiGetThreads[i] = new MultiGetThread(table);
1272    }
1273    for (MultiGetThread thread : multiGetThreads) {
1274      thread.start();
1275    }
1276    return multiGetThreads;
1277  }
1278
1279  private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1280    throws InterruptedException {
1281    int counter = NO_OF_THREADS;
1282    if (CustomInnerRegionObserver.waitForGets.get()) {
1283      // Because only one row is selected, it has only 2 blocks
1284      counter = counter - 1;
1285      while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1286        Thread.sleep(100);
1287      }
1288    } else {
1289      while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1290        Thread.sleep(100);
1291      }
1292    }
1293    Iterator<CachedBlock> iterator = cache.iterator();
1294    int refCount = 0;
1295    while (iterator.hasNext()) {
1296      CachedBlock next = iterator.next();
1297      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1298      if (cache instanceof BucketCache) {
1299        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1300      } else if (cache instanceof CombinedBlockCache) {
1301        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1302      } else {
1303        continue;
1304      }
1305      System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1306      if (CustomInnerRegionObserver.waitForGets.get()) {
1307        if (expectOnlyZero) {
1308          assertTrue(refCount == 0);
1309        }
1310        if (refCount != 0) {
1311          // Because the scan would have also touched up on these blocks but
1312          // it
1313          // would have touched
1314          // all 3
1315          if (getClosed) {
1316            // If get has closed only the scan's blocks would be available
1317            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1318          } else {
1319            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1320          }
1321        }
1322      } else {
1323        // Because the get would have also touched up on these blocks but it
1324        // would have touched
1325        // upon only 2 additionally
1326        if (expectOnlyZero) {
1327          assertTrue(refCount == 0);
1328        }
1329        if (refCount != 0) {
1330          if (getLatch == null) {
1331            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1332          } else {
1333            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1334          }
1335        }
1336      }
1337    }
1338    CustomInnerRegionObserver.getCdl().get().countDown();
1339  }
1340
1341  private static class MultiGetThread extends Thread {
1342    private final Table table;
1343    private final List<Get> gets = new ArrayList<>();
1344
1345    public MultiGetThread(Table table) {
1346      this.table = table;
1347    }
1348
1349    @Override
1350    public void run() {
1351      gets.add(new Get(ROW));
1352      gets.add(new Get(ROW1));
1353      try {
1354        CustomInnerRegionObserver.getCdl().set(latch);
1355        Result[] r = table.get(gets);
1356        assertTrue(Bytes.equals(r[0].getRow(), ROW));
1357        assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1358      } catch (IOException e) {
1359      }
1360    }
1361  }
1362
1363  private static class GetThread extends Thread {
1364    private final Table table;
1365    private final boolean tracker;
1366    private final boolean multipleCFs;
1367
1368    public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1369      this.table = table;
1370      this.tracker = tracker;
1371      this.multipleCFs = multipleCFs;
1372    }
1373
1374    @Override
1375    public void run() {
1376      try {
1377        initiateGet(table);
1378      } catch (IOException e) {
1379        // do nothing
1380      }
1381    }
1382
1383    private void initiateGet(Table table) throws IOException {
1384      Get get = new Get(ROW);
1385      if (tracker) {
1386        // Change this
1387        if (!multipleCFs) {
1388          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1389          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1390          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1391          // Unknown key
1392          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1393        } else {
1394          get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1395          get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1396          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1397          // Unknown key
1398          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1399        }
1400      }
1401      CustomInnerRegionObserver.getCdl().set(latch);
1402      Result r = table.get(get);
1403      System.out.println(r);
1404      if (!tracker) {
1405        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1406        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1407      } else {
1408        if (!multipleCFs) {
1409          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1410          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1411          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1412        } else {
1413          assertTrue(Bytes.equals(
1414            r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1415            data2));
1416          assertTrue(Bytes.equals(
1417            r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1418            data2));
1419          assertTrue(Bytes.equals(
1420            r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1421            data2));
1422        }
1423      }
1424    }
1425  }
1426
1427  private static class ScanThread extends Thread {
1428    private final Table table;
1429    private final boolean reverse;
1430
1431    public ScanThread(Table table, boolean reverse) {
1432      this.table = table;
1433      this.reverse = reverse;
1434    }
1435
1436    @Override
1437    public void run() {
1438      try {
1439        initiateScan(table);
1440      } catch (IOException e) {
1441        // do nothing
1442      }
1443    }
1444
1445    private void initiateScan(Table table) throws IOException {
1446      Scan scan = new Scan();
1447      if (reverse) {
1448        scan.setReversed(true);
1449      }
1450      CustomInnerRegionObserver.getCdl().set(latch);
1451      ResultScanner resScanner = table.getScanner(scan);
1452      int i = (reverse ? ROWS.length - 1 : 0);
1453      boolean resultFound = false;
1454      for (Result result : resScanner) {
1455        resultFound = true;
1456        System.out.println(result);
1457        if (!reverse) {
1458          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1459          i++;
1460        } else {
1461          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1462          i--;
1463        }
1464      }
1465      assertTrue(resultFound);
1466    }
1467  }
1468
1469  private void waitForStoreFileCount(HStore store, int count, int timeout)
1470    throws InterruptedException {
1471    long start = EnvironmentEdgeManager.currentTime();
1472    while (
1473      start + timeout > EnvironmentEdgeManager.currentTime() && store.getStorefilesCount() != count
1474    ) {
1475      Thread.sleep(100);
1476    }
1477    System.out.println("start=" + start + ", now=" + EnvironmentEdgeManager.currentTime() + ", cur="
1478      + store.getStorefilesCount());
1479    assertEquals(count, store.getStorefilesCount());
1480  }
1481
1482  private static class CustomScanner implements RegionScanner {
1483
1484    private RegionScanner delegate;
1485
1486    public CustomScanner(RegionScanner delegate) {
1487      this.delegate = delegate;
1488    }
1489
1490    @Override
1491    public boolean next(List<Cell> results) throws IOException {
1492      return delegate.next(results);
1493    }
1494
1495    @Override
1496    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
1497      return delegate.next(result, scannerContext);
1498    }
1499
1500    @Override
1501    public boolean nextRaw(List<Cell> result) throws IOException {
1502      return delegate.nextRaw(result);
1503    }
1504
1505    @Override
1506    public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException {
1507      boolean nextRaw = delegate.nextRaw(result, context);
1508      if (compactionLatch != null && compactionLatch.getCount() > 0) {
1509        try {
1510          compactionLatch.await();
1511        } catch (InterruptedException ie) {
1512        }
1513      }
1514
1515      if (CustomInnerRegionObserver.throwException.get()) {
1516        if (exceptionLatch.getCount() > 0) {
1517          try {
1518            exceptionLatch.await();
1519          } catch (InterruptedException e) {
1520          }
1521          throw new IOException("throw exception");
1522        }
1523      }
1524      return nextRaw;
1525    }
1526
1527    @Override
1528    public void close() throws IOException {
1529      delegate.close();
1530    }
1531
1532    @Override
1533    public RegionInfo getRegionInfo() {
1534      return delegate.getRegionInfo();
1535    }
1536
1537    @Override
1538    public boolean isFilterDone() throws IOException {
1539      return delegate.isFilterDone();
1540    }
1541
1542    @Override
1543    public boolean reseek(byte[] row) throws IOException {
1544      return false;
1545    }
1546
1547    @Override
1548    public long getMaxResultSize() {
1549      return delegate.getMaxResultSize();
1550    }
1551
1552    @Override
1553    public long getMvccReadPoint() {
1554      return delegate.getMvccReadPoint();
1555    }
1556
1557    @Override
1558    public int getBatch() {
1559      return delegate.getBatch();
1560    }
1561  }
1562
1563  public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1564    @Override
1565    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
1566      RegionScanner s) throws IOException {
1567      return new CustomScanner(s);
1568    }
1569  }
1570
1571  public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1572    static final AtomicInteger countOfNext = new AtomicInteger(0);
1573    static final AtomicInteger countOfGets = new AtomicInteger(0);
1574    static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1575    static final AtomicBoolean throwException = new AtomicBoolean(false);
1576    private static final AtomicReference<CountDownLatch> cdl =
1577      new AtomicReference<>(new CountDownLatch(0));
1578
1579    @Override
1580    public Optional<RegionObserver> getRegionObserver() {
1581      return Optional.of(this);
1582    }
1583
1584    @Override
1585    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
1586      InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1587      slowdownCode(e, false);
1588      if (getLatch != null && getLatch.getCount() > 0) {
1589        try {
1590          getLatch.await();
1591        } catch (InterruptedException e1) {
1592        }
1593      }
1594      return hasMore;
1595    }
1596
1597    @Override
1598    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
1599      List<Cell> results) throws IOException {
1600      slowdownCode(e, true);
1601    }
1602
1603    public static AtomicReference<CountDownLatch> getCdl() {
1604      return cdl;
1605    }
1606
1607    private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e,
1608      boolean isGet) {
1609      CountDownLatch latch = getCdl().get();
1610      try {
1611        System.out.println(latch.getCount() + " is the count " + isGet);
1612        if (latch.getCount() > 0) {
1613          if (isGet) {
1614            countOfGets.incrementAndGet();
1615          } else {
1616            countOfNext.incrementAndGet();
1617          }
1618          LOG.info("Waiting for the counterCountDownLatch");
1619          latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1620          if (latch.getCount() > 0) {
1621            throw new RuntimeException("Can't wait more");
1622          }
1623        }
1624      } catch (InterruptedException e1) {
1625        LOG.error(e1.toString(), e1);
1626      }
1627    }
1628  }
1629}