001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicReference;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HTableDescriptor;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
043import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
044import org.apache.hadoop.hbase.coprocessor.ObserverContext;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
047import org.apache.hadoop.hbase.coprocessor.RegionObserver;
048import org.apache.hadoop.hbase.io.hfile.BlockCache;
049import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
050import org.apache.hadoop.hbase.io.hfile.CacheConfig;
051import org.apache.hadoop.hbase.io.hfile.CachedBlock;
052import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
054import org.apache.hadoop.hbase.regionserver.BloomType;
055import org.apache.hadoop.hbase.regionserver.HRegion;
056import org.apache.hadoop.hbase.regionserver.HStore;
057import org.apache.hadoop.hbase.regionserver.InternalScanner;
058import org.apache.hadoop.hbase.regionserver.RegionScanner;
059import org.apache.hadoop.hbase.regionserver.ScannerContext;
060import org.apache.hadoop.hbase.testclassification.ClientTests;
061import org.apache.hadoop.hbase.testclassification.LargeTests;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.junit.After;
065import org.junit.AfterClass;
066import org.junit.Before;
067import org.junit.BeforeClass;
068import org.junit.ClassRule;
069import org.junit.Rule;
070import org.junit.Test;
071import org.junit.experimental.categories.Category;
072import org.junit.rules.TestName;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
077
078@Category({ LargeTests.class, ClientTests.class })
079@SuppressWarnings("deprecation")
080public class TestBlockEvictionFromClient {
081
082  @ClassRule
083  public static final HBaseClassTestRule CLASS_RULE =
084    HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class);
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
087  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
088  static byte[][] ROWS = new byte[2][];
089  private static int NO_OF_THREADS = 3;
090  private static byte[] ROW = Bytes.toBytes("testRow");
091  private static byte[] ROW1 = Bytes.toBytes("testRow1");
092  private static byte[] ROW2 = Bytes.toBytes("testRow2");
093  private static byte[] ROW3 = Bytes.toBytes("testRow3");
094  private static byte[] FAMILY = Bytes.toBytes("testFamily");
095  private static byte[][] FAMILIES_1 = new byte[1][0];
096  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
097  private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
098  private static byte[] data = new byte[1000];
099  private static byte[] data2 = Bytes.add(data, data);
100  protected static int SLAVES = 1;
101  private static CountDownLatch latch;
102  private static CountDownLatch getLatch;
103  private static CountDownLatch compactionLatch;
104  private static CountDownLatch exceptionLatch;
105
106  @Rule
107  public TestName name = new TestName();
108
109  @BeforeClass
110  public static void setUpBeforeClass() throws Exception {
111    ROWS[0] = ROW;
112    ROWS[1] = ROW1;
113    Configuration conf = TEST_UTIL.getConfiguration();
114    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
115      MultiRowMutationEndpoint.class.getName());
116    conf.setInt("hbase.regionserver.handler.count", 20);
117    conf.setInt("hbase.bucketcache.size", 400);
118    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
119    conf.setFloat("hfile.block.cache.size", 0.2f);
120    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
121    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
122    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
123    FAMILIES_1[0] = FAMILY;
124    TEST_UTIL.startMiniCluster(SLAVES);
125  }
126
127  @AfterClass
128  public static void tearDownAfterClass() throws Exception {
129    TEST_UTIL.shutdownMiniCluster();
130  }
131
132  @Before
133  public void setUp() throws Exception {
134    CustomInnerRegionObserver.waitForGets.set(false);
135    CustomInnerRegionObserver.countOfNext.set(0);
136    CustomInnerRegionObserver.countOfGets.set(0);
137  }
138
139  @After
140  public void tearDown() throws Exception {
141    if (latch != null) {
142      while (latch.getCount() > 0) {
143        latch.countDown();
144      }
145    }
146    if (getLatch != null) {
147      getLatch.countDown();
148    }
149    if (compactionLatch != null) {
150      compactionLatch.countDown();
151    }
152    if (exceptionLatch != null) {
153      exceptionLatch.countDown();
154    }
155    latch = null;
156    getLatch = null;
157    compactionLatch = null;
158    exceptionLatch = null;
159    CustomInnerRegionObserver.throwException.set(false);
160    // Clean up the tables for every test case
161    TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
162    for (TableName tableName : listTableNames) {
163      if (!tableName.isSystemTable()) {
164        TEST_UTIL.getAdmin().disableTable(tableName);
165        TEST_UTIL.getAdmin().deleteTable(tableName);
166      }
167    }
168  }
169
170  @Test
171  public void testBlockEvictionWithParallelScans() throws Exception {
172    Table table = null;
173    try {
174      latch = new CountDownLatch(1);
175      final TableName tableName = TableName.valueOf(name.getMethodName());
176      // Create a table with block size as 1024
177      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
178        CustomInnerRegionObserver.class.getName());
179      // get the block cache and region
180      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
181      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
182      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
183      HStore store = region.getStores().iterator().next();
184      CacheConfig cacheConf = store.getCacheConfig();
185      cacheConf.setCacheDataOnWrite(true);
186      cacheConf.setEvictOnClose(true);
187      BlockCache cache = cacheConf.getBlockCache().get();
188
189      // insert data. 2 Rows are added
190      Put put = new Put(ROW);
191      put.addColumn(FAMILY, QUALIFIER, data);
192      table.put(put);
193      put = new Put(ROW1);
194      put.addColumn(FAMILY, QUALIFIER, data);
195      table.put(put);
196      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
197      // data was in memstore so don't expect any changes
198      // flush the data
199      // Should create one Hfile with 2 blocks
200      region.flush(true);
201      // Load cache
202      // Create three sets of scan
203      ScanThread[] scanThreads = initiateScan(table, false);
204      Thread.sleep(100);
205      checkForBlockEviction(cache, false, false);
206      for (ScanThread thread : scanThreads) {
207        thread.join();
208      }
209      // CustomInnerRegionObserver.sleepTime.set(0);
210      Iterator<CachedBlock> iterator = cache.iterator();
211      iterateBlockCache(cache, iterator);
212      // read the data and expect same blocks, one new hit, no misses
213      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
214      iterator = cache.iterator();
215      iterateBlockCache(cache, iterator);
216      // Check how this miss is happening
217      // insert a second column, read the row, no new blocks, 3 new hits
218      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
219      byte[] data2 = Bytes.add(data, data);
220      put = new Put(ROW);
221      put.addColumn(FAMILY, QUALIFIER2, data2);
222      table.put(put);
223      Result r = table.get(new Get(ROW));
224      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
225      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
226      iterator = cache.iterator();
227      iterateBlockCache(cache, iterator);
228      // flush, one new block
229      System.out.println("Flushing cache");
230      region.flush(true);
231      iterator = cache.iterator();
232      iterateBlockCache(cache, iterator);
233      // compact, net minus two blocks, two hits, no misses
234      System.out.println("Compacting");
235      assertEquals(2, store.getStorefilesCount());
236      store.triggerMajorCompaction();
237      region.compact(true);
238      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
239      assertEquals(1, store.getStorefilesCount());
240      iterator = cache.iterator();
241      iterateBlockCache(cache, iterator);
242      // read the row, this should be a cache miss because we don't cache data
243      // blocks on compaction
244      r = table.get(new Get(ROW));
245      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
246      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
247      iterator = cache.iterator();
248      iterateBlockCache(cache, iterator);
249    } finally {
250      if (table != null) {
251        table.close();
252      }
253    }
254  }
255
256  @Test
257  public void testParallelGetsAndScans() throws IOException, InterruptedException {
258    Table table = null;
259    try {
260      latch = new CountDownLatch(2);
261      // Check if get() returns blocks on its close() itself
262      getLatch = new CountDownLatch(1);
263      final TableName tableName = TableName.valueOf(name.getMethodName());
264      // Create KV that will give you two blocks
265      // Create a table with block size as 1024
266      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
267        CustomInnerRegionObserver.class.getName());
268      // get the block cache and region
269      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
270      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
271      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
272      HStore store = region.getStores().iterator().next();
273      CacheConfig cacheConf = store.getCacheConfig();
274      cacheConf.setCacheDataOnWrite(true);
275      cacheConf.setEvictOnClose(true);
276      BlockCache cache = cacheConf.getBlockCache().get();
277
278      insertData(table);
279      // flush the data
280      System.out.println("Flushing cache");
281      // Should create one Hfile with 2 blocks
282      region.flush(true);
283      // Create three sets of scan
284      CustomInnerRegionObserver.waitForGets.set(true);
285      ScanThread[] scanThreads = initiateScan(table, false);
286      // Create three sets of gets
287      GetThread[] getThreads = initiateGet(table, false, false);
288      checkForBlockEviction(cache, false, false);
289      CustomInnerRegionObserver.waitForGets.set(false);
290      checkForBlockEviction(cache, false, false);
291      for (GetThread thread : getThreads) {
292        thread.join();
293      }
294      // Verify whether the gets have returned the blocks that it had
295      CustomInnerRegionObserver.waitForGets.set(true);
296      // giving some time for the block to be decremented
297      checkForBlockEviction(cache, true, false);
298      getLatch.countDown();
299      for (ScanThread thread : scanThreads) {
300        thread.join();
301      }
302      System.out.println("Scans should have returned the bloks");
303      // Check with either true or false
304      CustomInnerRegionObserver.waitForGets.set(false);
305      // The scan should also have released the blocks by now
306      checkForBlockEviction(cache, true, true);
307    } finally {
308      if (table != null) {
309        table.close();
310      }
311    }
312  }
313
314  @Test
315  public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
316    Table table = null;
317    try {
318      latch = new CountDownLatch(1);
319      // Check if get() returns blocks on its close() itself
320      getLatch = new CountDownLatch(1);
321      final TableName tableName = TableName.valueOf(name.getMethodName());
322      // Create KV that will give you two blocks
323      // Create a table with block size as 1024
324      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
325        CustomInnerRegionObserver.class.getName());
326      // get the block cache and region
327      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
328      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
329      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
330      HStore store = region.getStores().iterator().next();
331      CacheConfig cacheConf = store.getCacheConfig();
332      cacheConf.setCacheDataOnWrite(true);
333      cacheConf.setEvictOnClose(true);
334      BlockCache cache = cacheConf.getBlockCache().get();
335
336      Put put = new Put(ROW);
337      put.addColumn(FAMILY, QUALIFIER, data);
338      table.put(put);
339      region.flush(true);
340      put = new Put(ROW1);
341      put.addColumn(FAMILY, QUALIFIER, data);
342      table.put(put);
343      region.flush(true);
344      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
345      put = new Put(ROW);
346      put.addColumn(FAMILY, QUALIFIER2, data2);
347      table.put(put);
348      region.flush(true);
349      // flush the data
350      System.out.println("Flushing cache");
351      // Should create one Hfile with 2 blocks
352      CustomInnerRegionObserver.waitForGets.set(true);
353      // Create three sets of gets
354      GetThread[] getThreads = initiateGet(table, false, false);
355      Thread.sleep(200);
356      CustomInnerRegionObserver.getCdl().get().countDown();
357      for (GetThread thread : getThreads) {
358        thread.join();
359      }
360      // Verify whether the gets have returned the blocks that it had
361      CustomInnerRegionObserver.waitForGets.set(true);
362      // giving some time for the block to be decremented
363      checkForBlockEviction(cache, true, false);
364      getLatch.countDown();
365      System.out.println("Gets should have returned the bloks");
366    } finally {
367      if (table != null) {
368        table.close();
369      }
370    }
371  }
372
373  @Test
374  // TODO : check how block index works here
375  public void testGetsWithMultiColumnsAndExplicitTracker()
376    throws IOException, InterruptedException {
377    Table table = null;
378    try {
379      latch = new CountDownLatch(1);
380      // Check if get() returns blocks on its close() itself
381      getLatch = new CountDownLatch(1);
382      final TableName tableName = TableName.valueOf(name.getMethodName());
383      // Create KV that will give you two blocks
384      // Create a table with block size as 1024
385      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
386        CustomInnerRegionObserver.class.getName());
387      // get the block cache and region
388      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
389      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
390      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
391      BlockCache cache = setCacheProperties(region);
392      Put put = new Put(ROW);
393      put.addColumn(FAMILY, QUALIFIER, data);
394      table.put(put);
395      region.flush(true);
396      put = new Put(ROW1);
397      put.addColumn(FAMILY, QUALIFIER, data);
398      table.put(put);
399      region.flush(true);
400      for (int i = 1; i < 10; i++) {
401        put = new Put(ROW);
402        put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
403        table.put(put);
404        if (i % 2 == 0) {
405          region.flush(true);
406        }
407      }
408      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
409      put = new Put(ROW);
410      put.addColumn(FAMILY, QUALIFIER2, data2);
411      table.put(put);
412      region.flush(true);
413      // flush the data
414      System.out.println("Flushing cache");
415      // Should create one Hfile with 2 blocks
416      CustomInnerRegionObserver.waitForGets.set(true);
417      // Create three sets of gets
418      GetThread[] getThreads = initiateGet(table, true, false);
419      Thread.sleep(200);
420      Iterator<CachedBlock> iterator = cache.iterator();
421      boolean usedBlocksFound = false;
422      int refCount = 0;
423      int noOfBlocksWithRef = 0;
424      while (iterator.hasNext()) {
425        CachedBlock next = iterator.next();
426        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
427        if (cache instanceof BucketCache) {
428          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
429        } else if (cache instanceof CombinedBlockCache) {
430          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
431        } else {
432          continue;
433        }
434        if (refCount != 0) {
435          // Blocks will be with count 3
436          System.out.println("The refCount is " + refCount);
437          assertEquals(NO_OF_THREADS, refCount);
438          usedBlocksFound = true;
439          noOfBlocksWithRef++;
440        }
441      }
442      assertTrue(usedBlocksFound);
443      // the number of blocks referred
444      assertEquals(10, noOfBlocksWithRef);
445      CustomInnerRegionObserver.getCdl().get().countDown();
446      for (GetThread thread : getThreads) {
447        thread.join();
448      }
449      // Verify whether the gets have returned the blocks that it had
450      CustomInnerRegionObserver.waitForGets.set(true);
451      // giving some time for the block to be decremented
452      checkForBlockEviction(cache, true, false);
453      getLatch.countDown();
454      System.out.println("Gets should have returned the bloks");
455    } finally {
456      if (table != null) {
457        table.close();
458      }
459    }
460  }
461
462  @Test
463  public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
464    Table table = null;
465    try {
466      latch = new CountDownLatch(1);
467      // Check if get() returns blocks on its close() itself
468      getLatch = new CountDownLatch(1);
469      final TableName tableName = TableName.valueOf(name.getMethodName());
470      // Create KV that will give you two blocks
471      // Create a table with block size as 1024
472      byte[][] fams = new byte[10][];
473      fams[0] = FAMILY;
474      for (int i = 1; i < 10; i++) {
475        fams[i] = (Bytes.toBytes("testFamily" + i));
476      }
477      table =
478        TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName());
479      // get the block cache and region
480      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
481      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
482      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
483      BlockCache cache = setCacheProperties(region);
484
485      Put put = new Put(ROW);
486      put.addColumn(FAMILY, QUALIFIER, data);
487      table.put(put);
488      region.flush(true);
489      put = new Put(ROW1);
490      put.addColumn(FAMILY, QUALIFIER, data);
491      table.put(put);
492      region.flush(true);
493      for (int i = 1; i < 10; i++) {
494        put = new Put(ROW);
495        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
496        table.put(put);
497        if (i % 2 == 0) {
498          region.flush(true);
499        }
500      }
501      region.flush(true);
502      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
503      put = new Put(ROW);
504      put.addColumn(FAMILY, QUALIFIER2, data2);
505      table.put(put);
506      region.flush(true);
507      // flush the data
508      System.out.println("Flushing cache");
509      // Should create one Hfile with 2 blocks
510      CustomInnerRegionObserver.waitForGets.set(true);
511      // Create three sets of gets
512      GetThread[] getThreads = initiateGet(table, true, true);
513      Thread.sleep(200);
514      Iterator<CachedBlock> iterator = cache.iterator();
515      boolean usedBlocksFound = false;
516      int refCount = 0;
517      int noOfBlocksWithRef = 0;
518      while (iterator.hasNext()) {
519        CachedBlock next = iterator.next();
520        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
521        if (cache instanceof BucketCache) {
522          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
523        } else if (cache instanceof CombinedBlockCache) {
524          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
525        } else {
526          continue;
527        }
528        if (refCount != 0) {
529          // Blocks will be with count 3
530          System.out.println("The refCount is " + refCount);
531          assertEquals(NO_OF_THREADS, refCount);
532          usedBlocksFound = true;
533          noOfBlocksWithRef++;
534        }
535      }
536      assertTrue(usedBlocksFound);
537      // the number of blocks referred
538      assertEquals(3, noOfBlocksWithRef);
539      CustomInnerRegionObserver.getCdl().get().countDown();
540      for (GetThread thread : getThreads) {
541        thread.join();
542      }
543      // Verify whether the gets have returned the blocks that it had
544      CustomInnerRegionObserver.waitForGets.set(true);
545      // giving some time for the block to be decremented
546      checkForBlockEviction(cache, true, false);
547      getLatch.countDown();
548      System.out.println("Gets should have returned the bloks");
549    } finally {
550      if (table != null) {
551        table.close();
552      }
553    }
554  }
555
556  @Test
557  public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
558    Table table = null;
559    try {
560      final TableName tableName = TableName.valueOf(name.getMethodName());
561      HTableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName);
562      // This test expects rpc refcount of cached data blocks to be 0 after split. After split,
563      // two daughter regions are opened and a compaction is scheduled to get rid of reference
564      // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1.
565      // It is flakey since compaction can kick in anytime. To solve this issue, table is created
566      // with compaction disabled.
567      desc.setCompactionEnabled(false);
568      table = TEST_UTIL.createTable(desc, FAMILIES_1, null, BloomType.ROW, 1024, null);
569      // get the block cache and region
570      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
571      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
572      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
573      HStore store = region.getStores().iterator().next();
574      CacheConfig cacheConf = store.getCacheConfig();
575      cacheConf.setEvictOnClose(true);
576      BlockCache cache = cacheConf.getBlockCache().get();
577      Put put = new Put(ROW);
578      put.addColumn(FAMILY, QUALIFIER, data);
579      table.put(put);
580      region.flush(true);
581      put = new Put(ROW1);
582      put.addColumn(FAMILY, QUALIFIER, data);
583      table.put(put);
584      region.flush(true);
585      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
586      put = new Put(ROW2);
587      put.addColumn(FAMILY, QUALIFIER2, data2);
588      table.put(put);
589      put = new Put(ROW3);
590      put.addColumn(FAMILY, QUALIFIER2, data2);
591      table.put(put);
592      region.flush(true);
593      ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
594      int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
595      LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(),
596        regionCount);
597      TEST_UTIL.getAdmin().split(tableName, ROW1);
598      // Wait for splits
599      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
600      List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions();
601      for (HRegion r : regions) {
602        LOG.info("" + r.getCompactionState());
603        TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE));
604      }
605      LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache);
606      Iterator<CachedBlock> iterator = cache.iterator();
607      // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
608      // should be closed inorder to return those blocks
609      iterateBlockCache(cache, iterator);
610    } finally {
611      if (table != null) {
612        table.close();
613      }
614    }
615  }
616
617  @Test
618  public void testMultiGets() throws IOException, InterruptedException {
619    Table table = null;
620    try {
621      latch = new CountDownLatch(2);
622      // Check if get() returns blocks on its close() itself
623      getLatch = new CountDownLatch(1);
624      final TableName tableName = TableName.valueOf(name.getMethodName());
625      // Create KV that will give you two blocks
626      // Create a table with block size as 1024
627      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
628        CustomInnerRegionObserver.class.getName());
629      // get the block cache and region
630      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
631      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
632      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
633      HStore store = region.getStores().iterator().next();
634      CacheConfig cacheConf = store.getCacheConfig();
635      cacheConf.setCacheDataOnWrite(true);
636      cacheConf.setEvictOnClose(true);
637      BlockCache cache = cacheConf.getBlockCache().get();
638
639      Put put = new Put(ROW);
640      put.addColumn(FAMILY, QUALIFIER, data);
641      table.put(put);
642      region.flush(true);
643      put = new Put(ROW1);
644      put.addColumn(FAMILY, QUALIFIER, data);
645      table.put(put);
646      region.flush(true);
647      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
648      put = new Put(ROW);
649      put.addColumn(FAMILY, QUALIFIER2, data2);
650      table.put(put);
651      region.flush(true);
652      // flush the data
653      System.out.println("Flushing cache");
654      // Should create one Hfile with 2 blocks
655      CustomInnerRegionObserver.waitForGets.set(true);
656      // Create three sets of gets
657      MultiGetThread[] getThreads = initiateMultiGet(table);
658      Thread.sleep(200);
659      int refCount;
660      Iterator<CachedBlock> iterator = cache.iterator();
661      boolean foundNonZeroBlock = false;
662      while (iterator.hasNext()) {
663        CachedBlock next = iterator.next();
664        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
665        if (cache instanceof BucketCache) {
666          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
667        } else if (cache instanceof CombinedBlockCache) {
668          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
669        } else {
670          continue;
671        }
672        if (refCount != 0) {
673          assertEquals(NO_OF_THREADS, refCount);
674          foundNonZeroBlock = true;
675        }
676      }
677      assertTrue("Should have found nonzero ref count block", foundNonZeroBlock);
678      CustomInnerRegionObserver.getCdl().get().countDown();
679      CustomInnerRegionObserver.getCdl().get().countDown();
680      for (MultiGetThread thread : getThreads) {
681        thread.join();
682      }
683      // Verify whether the gets have returned the blocks that it had
684      CustomInnerRegionObserver.waitForGets.set(true);
685      // giving some time for the block to be decremented
686      iterateBlockCache(cache, iterator);
687      getLatch.countDown();
688      System.out.println("Gets should have returned the bloks");
689    } finally {
690      if (table != null) {
691        table.close();
692      }
693    }
694  }
695
696  @Test
697  public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
698    Table table = null;
699    try {
700      latch = new CountDownLatch(1);
701      // Check if get() returns blocks on its close() itself
702      final TableName tableName = TableName.valueOf(name.getMethodName());
703      // Create KV that will give you two blocks
704      // Create a table with block size as 1024
705      byte[][] fams = new byte[10][];
706      fams[0] = FAMILY;
707      for (int i = 1; i < 10; i++) {
708        fams[i] = (Bytes.toBytes("testFamily" + i));
709      }
710      table =
711        TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName());
712      // get the block cache and region
713      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
714      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
715      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
716      BlockCache cache = setCacheProperties(region);
717
718      Put put = new Put(ROW);
719      put.addColumn(FAMILY, QUALIFIER, data);
720      table.put(put);
721      region.flush(true);
722      put = new Put(ROW1);
723      put.addColumn(FAMILY, QUALIFIER, data);
724      table.put(put);
725      region.flush(true);
726      for (int i = 1; i < 10; i++) {
727        put = new Put(ROW);
728        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
729        table.put(put);
730        if (i % 2 == 0) {
731          region.flush(true);
732        }
733      }
734      region.flush(true);
735      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
736      put = new Put(ROW);
737      put.addColumn(FAMILY, QUALIFIER2, data2);
738      table.put(put);
739      region.flush(true);
740      // flush the data
741      System.out.println("Flushing cache");
742      // Should create one Hfile with 2 blocks
743      // Create three sets of gets
744      ScanThread[] scanThreads = initiateScan(table, true);
745      Thread.sleep(200);
746      Iterator<CachedBlock> iterator = cache.iterator();
747      boolean usedBlocksFound = false;
748      int refCount = 0;
749      int noOfBlocksWithRef = 0;
750      while (iterator.hasNext()) {
751        CachedBlock next = iterator.next();
752        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
753        if (cache instanceof BucketCache) {
754          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
755        } else if (cache instanceof CombinedBlockCache) {
756          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
757        } else {
758          continue;
759        }
760        if (refCount != 0) {
761          // Blocks will be with count 3
762          System.out.println("The refCount is " + refCount);
763          assertEquals(NO_OF_THREADS, refCount);
764          usedBlocksFound = true;
765          noOfBlocksWithRef++;
766        }
767      }
768      assertTrue(usedBlocksFound);
769      // the number of blocks referred
770      assertEquals(12, noOfBlocksWithRef);
771      CustomInnerRegionObserver.getCdl().get().countDown();
772      for (ScanThread thread : scanThreads) {
773        thread.join();
774      }
775      // giving some time for the block to be decremented
776      checkForBlockEviction(cache, true, false);
777    } finally {
778      if (table != null) {
779        table.close();
780      }
781    }
782  }
783
784  private BlockCache setCacheProperties(HRegion region) {
785    Iterator<HStore> strItr = region.getStores().iterator();
786    BlockCache cache = null;
787    while (strItr.hasNext()) {
788      HStore store = strItr.next();
789      CacheConfig cacheConf = store.getCacheConfig();
790      cacheConf.setCacheDataOnWrite(true);
791      cacheConf.setEvictOnClose(true);
792      // Use the last one
793      cache = cacheConf.getBlockCache().get();
794    }
795    return cache;
796  }
797
798  @Test
799  public void testParallelGetsAndScanWithWrappedRegionScanner()
800    throws IOException, InterruptedException {
801    Table table = null;
802    try {
803      latch = new CountDownLatch(2);
804      // Check if get() returns blocks on its close() itself
805      getLatch = new CountDownLatch(1);
806      final TableName tableName = TableName.valueOf(name.getMethodName());
807      // Create KV that will give you two blocks
808      // Create a table with block size as 1024
809      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
810        CustomInnerRegionObserverWrapper.class.getName());
811      // get the block cache and region
812      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
813      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
814      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
815      HStore store = region.getStores().iterator().next();
816      CacheConfig cacheConf = store.getCacheConfig();
817      cacheConf.setCacheDataOnWrite(true);
818      cacheConf.setEvictOnClose(true);
819      BlockCache cache = cacheConf.getBlockCache().get();
820
821      // insert data. 2 Rows are added
822      insertData(table);
823      // flush the data
824      System.out.println("Flushing cache");
825      // Should create one Hfile with 2 blocks
826      region.flush(true);
827      // CustomInnerRegionObserver.sleepTime.set(5000);
828      // Create three sets of scan
829      CustomInnerRegionObserver.waitForGets.set(true);
830      ScanThread[] scanThreads = initiateScan(table, false);
831      // Create three sets of gets
832      GetThread[] getThreads = initiateGet(table, false, false);
833      // The block would have been decremented for the scan case as it was
834      // wrapped
835      // before even the postNext hook gets executed.
836      // giving some time for the block to be decremented
837      Thread.sleep(100);
838      CustomInnerRegionObserver.waitForGets.set(false);
839      checkForBlockEviction(cache, false, false);
840      // countdown the latch
841      CustomInnerRegionObserver.getCdl().get().countDown();
842      for (GetThread thread : getThreads) {
843        thread.join();
844      }
845      getLatch.countDown();
846      for (ScanThread thread : scanThreads) {
847        thread.join();
848      }
849    } finally {
850      if (table != null) {
851        table.close();
852      }
853    }
854  }
855
856  @Test
857  public void testScanWithCompaction() throws IOException, InterruptedException {
858    testScanWithCompactionInternals(name.getMethodName(), false);
859  }
860
861  @Test
862  public void testReverseScanWithCompaction() throws IOException, InterruptedException {
863    testScanWithCompactionInternals(name.getMethodName(), true);
864  }
865
866  private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
867    throws IOException, InterruptedException {
868    Table table = null;
869    try {
870      latch = new CountDownLatch(1);
871      compactionLatch = new CountDownLatch(1);
872      TableName tableName = TableName.valueOf(tableNameStr);
873      // Create a table with block size as 1024
874      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
875        CustomInnerRegionObserverWrapper.class.getName());
876      // get the block cache and region
877      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
878      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
879      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
880      HStore store = region.getStores().iterator().next();
881      CacheConfig cacheConf = store.getCacheConfig();
882      cacheConf.setCacheDataOnWrite(true);
883      cacheConf.setEvictOnClose(true);
884      BlockCache cache = cacheConf.getBlockCache().get();
885
886      // insert data. 2 Rows are added
887      Put put = new Put(ROW);
888      put.addColumn(FAMILY, QUALIFIER, data);
889      table.put(put);
890      put = new Put(ROW1);
891      put.addColumn(FAMILY, QUALIFIER, data);
892      table.put(put);
893      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
894      // Should create one Hfile with 2 blocks
895      region.flush(true);
896      // read the data and expect same blocks, one new hit, no misses
897      int refCount = 0;
898      // Check how this miss is happening
899      // insert a second column, read the row, no new blocks, 3 new hits
900      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
901      byte[] data2 = Bytes.add(data, data);
902      put = new Put(ROW);
903      put.addColumn(FAMILY, QUALIFIER2, data2);
904      table.put(put);
905      // flush, one new block
906      System.out.println("Flushing cache");
907      region.flush(true);
908      Iterator<CachedBlock> iterator = cache.iterator();
909      iterateBlockCache(cache, iterator);
910      // Create three sets of scan
911      ScanThread[] scanThreads = initiateScan(table, reversed);
912      Thread.sleep(100);
913      iterator = cache.iterator();
914      boolean usedBlocksFound = false;
915      while (iterator.hasNext()) {
916        CachedBlock next = iterator.next();
917        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
918        if (cache instanceof BucketCache) {
919          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
920        } else if (cache instanceof CombinedBlockCache) {
921          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
922        } else {
923          continue;
924        }
925        if (refCount != 0) {
926          // Blocks will be with count 3
927          assertEquals(NO_OF_THREADS, refCount);
928          usedBlocksFound = true;
929        }
930      }
931      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
932      usedBlocksFound = false;
933      System.out.println("Compacting");
934      assertEquals(2, store.getStorefilesCount());
935      store.triggerMajorCompaction();
936      region.compact(true);
937      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
938      assertEquals(1, store.getStorefilesCount());
939      // Even after compaction is done we will have some blocks that cannot
940      // be evicted this is because the scan is still referencing them
941      iterator = cache.iterator();
942      while (iterator.hasNext()) {
943        CachedBlock next = iterator.next();
944        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
945        if (cache instanceof BucketCache) {
946          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
947        } else if (cache instanceof CombinedBlockCache) {
948          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
949        } else {
950          continue;
951        }
952        if (refCount != 0) {
953          // Blocks will be with count 3 as they are not yet cleared
954          assertEquals(NO_OF_THREADS, refCount);
955          usedBlocksFound = true;
956        }
957      }
958      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
959      // Should not throw exception
960      compactionLatch.countDown();
961      latch.countDown();
962      for (ScanThread thread : scanThreads) {
963        thread.join();
964      }
965      // by this time all blocks should have been evicted
966      iterator = cache.iterator();
967      iterateBlockCache(cache, iterator);
968      Result r = table.get(new Get(ROW));
969      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
970      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
971      // The gets would be working on new blocks
972      iterator = cache.iterator();
973      iterateBlockCache(cache, iterator);
974    } finally {
975      if (table != null) {
976        table.close();
977      }
978    }
979  }
980
981  @Test
982  public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
983    throws IOException, InterruptedException {
984    // do flush and scan in parallel
985    Table table = null;
986    try {
987      latch = new CountDownLatch(1);
988      compactionLatch = new CountDownLatch(1);
989      final TableName tableName = TableName.valueOf(name.getMethodName());
990      // Create a table with block size as 1024
991      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
992        CustomInnerRegionObserverWrapper.class.getName());
993      // get the block cache and region
994      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
995      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
996      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
997      HStore store = region.getStores().iterator().next();
998      CacheConfig cacheConf = store.getCacheConfig();
999      cacheConf.setCacheDataOnWrite(true);
1000      cacheConf.setEvictOnClose(true);
1001      BlockCache cache = cacheConf.getBlockCache().get();
1002
1003      // insert data. 2 Rows are added
1004      Put put = new Put(ROW);
1005      put.addColumn(FAMILY, QUALIFIER, data);
1006      table.put(put);
1007      put = new Put(ROW1);
1008      put.addColumn(FAMILY, QUALIFIER, data);
1009      table.put(put);
1010      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1011      // Should create one Hfile with 2 blocks
1012      region.flush(true);
1013      // read the data and expect same blocks, one new hit, no misses
1014      int refCount = 0;
1015      // Check how this miss is happening
1016      // insert a second column, read the row, no new blocks, 3 new hits
1017      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1018      byte[] data2 = Bytes.add(data, data);
1019      put = new Put(ROW);
1020      put.addColumn(FAMILY, QUALIFIER2, data2);
1021      table.put(put);
1022      // flush, one new block
1023      System.out.println("Flushing cache");
1024      region.flush(true);
1025      Iterator<CachedBlock> iterator = cache.iterator();
1026      iterateBlockCache(cache, iterator);
1027      // Create three sets of scan
1028      ScanThread[] scanThreads = initiateScan(table, false);
1029      Thread.sleep(100);
1030      iterator = cache.iterator();
1031      boolean usedBlocksFound = false;
1032      while (iterator.hasNext()) {
1033        CachedBlock next = iterator.next();
1034        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1035        if (cache instanceof BucketCache) {
1036          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1037        } else if (cache instanceof CombinedBlockCache) {
1038          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1039        } else {
1040          continue;
1041        }
1042        if (refCount != 0) {
1043          // Blocks will be with count 3
1044          assertEquals(NO_OF_THREADS, refCount);
1045          usedBlocksFound = true;
1046        }
1047      }
1048      // Make a put and do a flush
1049      QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1050      data2 = Bytes.add(data, data);
1051      put = new Put(ROW1);
1052      put.addColumn(FAMILY, QUALIFIER2, data2);
1053      table.put(put);
1054      // flush, one new block
1055      System.out.println("Flushing cache");
1056      region.flush(true);
1057      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1058      usedBlocksFound = false;
1059      System.out.println("Compacting");
1060      assertEquals(3, store.getStorefilesCount());
1061      store.triggerMajorCompaction();
1062      region.compact(true);
1063      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1064      assertEquals(1, store.getStorefilesCount());
1065      // Even after compaction is done we will have some blocks that cannot
1066      // be evicted this is because the scan is still referencing them
1067      iterator = cache.iterator();
1068      while (iterator.hasNext()) {
1069        CachedBlock next = iterator.next();
1070        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1071        if (cache instanceof BucketCache) {
1072          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1073        } else if (cache instanceof CombinedBlockCache) {
1074          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1075        } else {
1076          continue;
1077        }
1078        if (refCount != 0) {
1079          // Blocks will be with count 3 as they are not yet cleared
1080          assertEquals(NO_OF_THREADS, refCount);
1081          usedBlocksFound = true;
1082        }
1083      }
1084      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1085      // Should not throw exception
1086      compactionLatch.countDown();
1087      latch.countDown();
1088      for (ScanThread thread : scanThreads) {
1089        thread.join();
1090      }
1091      // by this time all blocks should have been evicted
1092      iterator = cache.iterator();
1093      // Since a flush and compaction happened after a scan started
1094      // we need to ensure that all the original blocks of the compacted file
1095      // is also removed.
1096      iterateBlockCache(cache, iterator);
1097      Result r = table.get(new Get(ROW));
1098      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1099      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1100      // The gets would be working on new blocks
1101      iterator = cache.iterator();
1102      iterateBlockCache(cache, iterator);
1103    } finally {
1104      if (table != null) {
1105        table.close();
1106      }
1107    }
1108  }
1109
1110  @Test
1111  public void testScanWithException() throws IOException, InterruptedException {
1112    Table table = null;
1113    try {
1114      latch = new CountDownLatch(1);
1115      exceptionLatch = new CountDownLatch(1);
1116      final TableName tableName = TableName.valueOf(name.getMethodName());
1117      // Create KV that will give you two blocks
1118      // Create a table with block size as 1024
1119      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1120        CustomInnerRegionObserverWrapper.class.getName());
1121      // get the block cache and region
1122      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1123      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
1124      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1125      HStore store = region.getStores().iterator().next();
1126      CacheConfig cacheConf = store.getCacheConfig();
1127      cacheConf.setCacheDataOnWrite(true);
1128      cacheConf.setEvictOnClose(true);
1129      BlockCache cache = cacheConf.getBlockCache().get();
1130      // insert data. 2 Rows are added
1131      insertData(table);
1132      // flush the data
1133      System.out.println("Flushing cache");
1134      // Should create one Hfile with 2 blocks
1135      region.flush(true);
1136      // CustomInnerRegionObserver.sleepTime.set(5000);
1137      CustomInnerRegionObserver.throwException.set(true);
1138      ScanThread[] scanThreads = initiateScan(table, false);
1139      // The block would have been decremented for the scan case as it was
1140      // wrapped
1141      // before even the postNext hook gets executed.
1142      // giving some time for the block to be decremented
1143      Thread.sleep(100);
1144      Iterator<CachedBlock> iterator = cache.iterator();
1145      boolean usedBlocksFound = false;
1146      int refCount = 0;
1147      while (iterator.hasNext()) {
1148        CachedBlock next = iterator.next();
1149        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1150        if (cache instanceof BucketCache) {
1151          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1152        } else if (cache instanceof CombinedBlockCache) {
1153          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1154        } else {
1155          continue;
1156        }
1157        if (refCount != 0) {
1158          // Blocks will be with count 3
1159          assertEquals(NO_OF_THREADS, refCount);
1160          usedBlocksFound = true;
1161        }
1162      }
1163      assertTrue(usedBlocksFound);
1164      exceptionLatch.countDown();
1165      // countdown the latch
1166      CustomInnerRegionObserver.getCdl().get().countDown();
1167      for (ScanThread thread : scanThreads) {
1168        thread.join();
1169      }
1170      iterator = cache.iterator();
1171      usedBlocksFound = false;
1172      refCount = 0;
1173      while (iterator.hasNext()) {
1174        CachedBlock next = iterator.next();
1175        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1176        if (cache instanceof BucketCache) {
1177          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1178        } else if (cache instanceof CombinedBlockCache) {
1179          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1180        } else {
1181          continue;
1182        }
1183        if (refCount != 0) {
1184          // Blocks will be with count 3
1185          assertEquals(NO_OF_THREADS, refCount);
1186          usedBlocksFound = true;
1187        }
1188      }
1189      assertFalse(usedBlocksFound);
1190      // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1191      assertEquals(0, refCount);
1192    } finally {
1193      if (table != null) {
1194        table.close();
1195      }
1196    }
1197  }
1198
1199  private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1200    int refCount;
1201    while (iterator.hasNext()) {
1202      CachedBlock next = iterator.next();
1203      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1204      if (cache instanceof BucketCache) {
1205        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1206        LOG.info("BucketCache {} {}", cacheKey, refCount);
1207      } else if (cache instanceof CombinedBlockCache) {
1208        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1209        LOG.info("CombinedBlockCache {} {}", cacheKey, refCount);
1210      } else {
1211        continue;
1212      }
1213      assertEquals(0, refCount);
1214    }
1215  }
1216
1217  private void insertData(Table table) throws IOException {
1218    Put put = new Put(ROW);
1219    put.addColumn(FAMILY, QUALIFIER, data);
1220    table.put(put);
1221    put = new Put(ROW1);
1222    put.addColumn(FAMILY, QUALIFIER, data);
1223    table.put(put);
1224    byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1225    put = new Put(ROW);
1226    put.addColumn(FAMILY, QUALIFIER2, data2);
1227    table.put(put);
1228  }
1229
1230  private ScanThread[] initiateScan(Table table, boolean reverse)
1231    throws IOException, InterruptedException {
1232    ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1233    for (int i = 0; i < NO_OF_THREADS; i++) {
1234      scanThreads[i] = new ScanThread(table, reverse);
1235    }
1236    for (ScanThread thread : scanThreads) {
1237      thread.start();
1238    }
1239    return scanThreads;
1240  }
1241
1242  private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1243    throws IOException, InterruptedException {
1244    GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1245    for (int i = 0; i < NO_OF_THREADS; i++) {
1246      getThreads[i] = new GetThread(table, tracker, multipleCFs);
1247    }
1248    for (GetThread thread : getThreads) {
1249      thread.start();
1250    }
1251    return getThreads;
1252  }
1253
1254  private MultiGetThread[] initiateMultiGet(Table table) throws IOException, InterruptedException {
1255    MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1256    for (int i = 0; i < NO_OF_THREADS; i++) {
1257      multiGetThreads[i] = new MultiGetThread(table);
1258    }
1259    for (MultiGetThread thread : multiGetThreads) {
1260      thread.start();
1261    }
1262    return multiGetThreads;
1263  }
1264
1265  private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1266    throws InterruptedException {
1267    int counter = NO_OF_THREADS;
1268    if (CustomInnerRegionObserver.waitForGets.get()) {
1269      // Because only one row is selected, it has only 2 blocks
1270      counter = counter - 1;
1271      while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1272        Thread.sleep(100);
1273      }
1274    } else {
1275      while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1276        Thread.sleep(100);
1277      }
1278    }
1279    Iterator<CachedBlock> iterator = cache.iterator();
1280    int refCount = 0;
1281    while (iterator.hasNext()) {
1282      CachedBlock next = iterator.next();
1283      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1284      if (cache instanceof BucketCache) {
1285        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1286      } else if (cache instanceof CombinedBlockCache) {
1287        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1288      } else {
1289        continue;
1290      }
1291      System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1292      if (CustomInnerRegionObserver.waitForGets.get()) {
1293        if (expectOnlyZero) {
1294          assertTrue(refCount == 0);
1295        }
1296        if (refCount != 0) {
1297          // Because the scan would have also touched up on these blocks but
1298          // it
1299          // would have touched
1300          // all 3
1301          if (getClosed) {
1302            // If get has closed only the scan's blocks would be available
1303            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1304          } else {
1305            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1306          }
1307        }
1308      } else {
1309        // Because the get would have also touched up on these blocks but it
1310        // would have touched
1311        // upon only 2 additionally
1312        if (expectOnlyZero) {
1313          assertTrue(refCount == 0);
1314        }
1315        if (refCount != 0) {
1316          if (getLatch == null) {
1317            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1318          } else {
1319            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1320          }
1321        }
1322      }
1323    }
1324    CustomInnerRegionObserver.getCdl().get().countDown();
1325  }
1326
1327  private static class MultiGetThread extends Thread {
1328    private final Table table;
1329    private final List<Get> gets = new ArrayList<>();
1330
1331    public MultiGetThread(Table table) {
1332      this.table = table;
1333    }
1334
1335    @Override
1336    public void run() {
1337      gets.add(new Get(ROW));
1338      gets.add(new Get(ROW1));
1339      try {
1340        CustomInnerRegionObserver.getCdl().set(latch);
1341        Result[] r = table.get(gets);
1342        assertTrue(Bytes.equals(r[0].getRow(), ROW));
1343        assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1344      } catch (IOException e) {
1345      }
1346    }
1347  }
1348
1349  private static class GetThread extends Thread {
1350    private final Table table;
1351    private final boolean tracker;
1352    private final boolean multipleCFs;
1353
1354    public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1355      this.table = table;
1356      this.tracker = tracker;
1357      this.multipleCFs = multipleCFs;
1358    }
1359
1360    @Override
1361    public void run() {
1362      try {
1363        initiateGet(table);
1364      } catch (IOException e) {
1365        // do nothing
1366      }
1367    }
1368
1369    private void initiateGet(Table table) throws IOException {
1370      Get get = new Get(ROW);
1371      if (tracker) {
1372        // Change this
1373        if (!multipleCFs) {
1374          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1375          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1376          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1377          // Unknown key
1378          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1379        } else {
1380          get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1381          get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1382          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1383          // Unknown key
1384          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1385        }
1386      }
1387      CustomInnerRegionObserver.getCdl().set(latch);
1388      Result r = table.get(get);
1389      System.out.println(r);
1390      if (!tracker) {
1391        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1392        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1393      } else {
1394        if (!multipleCFs) {
1395          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1396          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1397          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1398        } else {
1399          assertTrue(Bytes.equals(
1400            r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1401            data2));
1402          assertTrue(Bytes.equals(
1403            r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1404            data2));
1405          assertTrue(Bytes.equals(
1406            r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1407            data2));
1408        }
1409      }
1410    }
1411  }
1412
1413  private static class ScanThread extends Thread {
1414    private final Table table;
1415    private final boolean reverse;
1416
1417    public ScanThread(Table table, boolean reverse) {
1418      this.table = table;
1419      this.reverse = reverse;
1420    }
1421
1422    @Override
1423    public void run() {
1424      try {
1425        initiateScan(table);
1426      } catch (IOException e) {
1427        // do nothing
1428      }
1429    }
1430
1431    private void initiateScan(Table table) throws IOException {
1432      Scan scan = new Scan();
1433      if (reverse) {
1434        scan.setReversed(true);
1435      }
1436      CustomInnerRegionObserver.getCdl().set(latch);
1437      ResultScanner resScanner = table.getScanner(scan);
1438      int i = (reverse ? ROWS.length - 1 : 0);
1439      boolean resultFound = false;
1440      for (Result result : resScanner) {
1441        resultFound = true;
1442        System.out.println(result);
1443        if (!reverse) {
1444          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1445          i++;
1446        } else {
1447          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1448          i--;
1449        }
1450      }
1451      assertTrue(resultFound);
1452    }
1453  }
1454
1455  private void waitForStoreFileCount(HStore store, int count, int timeout)
1456    throws InterruptedException {
1457    long start = EnvironmentEdgeManager.currentTime();
1458    while (
1459      start + timeout > EnvironmentEdgeManager.currentTime() && store.getStorefilesCount() != count
1460    ) {
1461      Thread.sleep(100);
1462    }
1463    System.out.println("start=" + start + ", now=" + EnvironmentEdgeManager.currentTime() + ", cur="
1464      + store.getStorefilesCount());
1465    assertEquals(count, store.getStorefilesCount());
1466  }
1467
1468  private static class CustomScanner implements RegionScanner {
1469
1470    private RegionScanner delegate;
1471
1472    public CustomScanner(RegionScanner delegate) {
1473      this.delegate = delegate;
1474    }
1475
1476    @Override
1477    public boolean next(List<Cell> results) throws IOException {
1478      return delegate.next(results);
1479    }
1480
1481    @Override
1482    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
1483      return delegate.next(result, scannerContext);
1484    }
1485
1486    @Override
1487    public boolean nextRaw(List<Cell> result) throws IOException {
1488      return delegate.nextRaw(result);
1489    }
1490
1491    @Override
1492    public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException {
1493      boolean nextRaw = delegate.nextRaw(result, context);
1494      if (compactionLatch != null && compactionLatch.getCount() > 0) {
1495        try {
1496          compactionLatch.await();
1497        } catch (InterruptedException ie) {
1498        }
1499      }
1500
1501      if (CustomInnerRegionObserver.throwException.get()) {
1502        if (exceptionLatch.getCount() > 0) {
1503          try {
1504            exceptionLatch.await();
1505          } catch (InterruptedException e) {
1506          }
1507          throw new IOException("throw exception");
1508        }
1509      }
1510      return nextRaw;
1511    }
1512
1513    @Override
1514    public void close() throws IOException {
1515      delegate.close();
1516    }
1517
1518    @Override
1519    public RegionInfo getRegionInfo() {
1520      return delegate.getRegionInfo();
1521    }
1522
1523    @Override
1524    public boolean isFilterDone() throws IOException {
1525      return delegate.isFilterDone();
1526    }
1527
1528    @Override
1529    public boolean reseek(byte[] row) throws IOException {
1530      return false;
1531    }
1532
1533    @Override
1534    public long getMaxResultSize() {
1535      return delegate.getMaxResultSize();
1536    }
1537
1538    @Override
1539    public long getMvccReadPoint() {
1540      return delegate.getMvccReadPoint();
1541    }
1542
1543    @Override
1544    public int getBatch() {
1545      return delegate.getBatch();
1546    }
1547  }
1548
1549  public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1550    @Override
1551    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
1552      RegionScanner s) throws IOException {
1553      return new CustomScanner(s);
1554    }
1555  }
1556
1557  public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1558    static final AtomicInteger countOfNext = new AtomicInteger(0);
1559    static final AtomicInteger countOfGets = new AtomicInteger(0);
1560    static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1561    static final AtomicBoolean throwException = new AtomicBoolean(false);
1562    private static final AtomicReference<CountDownLatch> cdl =
1563      new AtomicReference<>(new CountDownLatch(0));
1564
1565    @Override
1566    public Optional<RegionObserver> getRegionObserver() {
1567      return Optional.of(this);
1568    }
1569
1570    @Override
1571    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
1572      InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1573      slowdownCode(e, false);
1574      if (getLatch != null && getLatch.getCount() > 0) {
1575        try {
1576          getLatch.await();
1577        } catch (InterruptedException e1) {
1578        }
1579      }
1580      return hasMore;
1581    }
1582
1583    @Override
1584    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
1585      List<Cell> results) throws IOException {
1586      slowdownCode(e, true);
1587    }
1588
1589    public static AtomicReference<CountDownLatch> getCdl() {
1590      return cdl;
1591    }
1592
1593    private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e,
1594      boolean isGet) {
1595      CountDownLatch latch = getCdl().get();
1596      try {
1597        System.out.println(latch.getCount() + " is the count " + isGet);
1598        if (latch.getCount() > 0) {
1599          if (isGet) {
1600            countOfGets.incrementAndGet();
1601          } else {
1602            countOfNext.incrementAndGet();
1603          }
1604          LOG.info("Waiting for the counterCountDownLatch");
1605          latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1606          if (latch.getCount() > 0) {
1607            throw new RuntimeException("Can't wait more");
1608          }
1609        }
1610      } catch (InterruptedException e1) {
1611        LOG.error(e1.toString(), e1);
1612      }
1613    }
1614  }
1615}