001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicReference;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.ExtendedCell;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
045import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
046import org.apache.hadoop.hbase.coprocessor.ObserverContext;
047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
049import org.apache.hadoop.hbase.coprocessor.RegionObserver;
050import org.apache.hadoop.hbase.io.hfile.BlockCache;
051import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
052import org.apache.hadoop.hbase.io.hfile.CacheConfig;
053import org.apache.hadoop.hbase.io.hfile.CachedBlock;
054import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
056import org.apache.hadoop.hbase.regionserver.BloomType;
057import org.apache.hadoop.hbase.regionserver.HRegion;
058import org.apache.hadoop.hbase.regionserver.HStore;
059import org.apache.hadoop.hbase.regionserver.InternalScanner;
060import org.apache.hadoop.hbase.regionserver.RegionScanner;
061import org.apache.hadoop.hbase.regionserver.ScannerContext;
062import org.apache.hadoop.hbase.testclassification.ClientTests;
063import org.apache.hadoop.hbase.testclassification.LargeTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.junit.After;
067import org.junit.AfterClass;
068import org.junit.Before;
069import org.junit.BeforeClass;
070import org.junit.ClassRule;
071import org.junit.Rule;
072import org.junit.Test;
073import org.junit.experimental.categories.Category;
074import org.junit.rules.TestName;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
079
080@Category({ LargeTests.class, ClientTests.class })
081@SuppressWarnings("deprecation")
082public class TestBlockEvictionFromClient {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086    HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class);
087
088  private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
089  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
090  static byte[][] ROWS = new byte[2][];
091  private static int NO_OF_THREADS = 3;
092  private static byte[] ROW = Bytes.toBytes("testRow");
093  private static byte[] ROW1 = Bytes.toBytes("testRow1");
094  private static byte[] ROW2 = Bytes.toBytes("testRow2");
095  private static byte[] ROW3 = Bytes.toBytes("testRow3");
096  private static byte[] FAMILY = Bytes.toBytes("testFamily");
097  private static byte[][] FAMILIES_1 = new byte[1][0];
098  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
099  private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
100  private static byte[] data = new byte[1000];
101  private static byte[] data2 = Bytes.add(data, data);
102  protected static int SLAVES = 1;
103  private static CountDownLatch latch;
104  private static CountDownLatch getLatch;
105  private static CountDownLatch compactionLatch;
106  private static CountDownLatch exceptionLatch;
107
108  @Rule
109  public TestName name = new TestName();
110
111  /**
112   * @throws java.lang.Exception
113   */
114  @BeforeClass
115  public static void setUpBeforeClass() throws Exception {
116    ROWS[0] = ROW;
117    ROWS[1] = ROW1;
118    Configuration conf = TEST_UTIL.getConfiguration();
119    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
120      MultiRowMutationEndpoint.class.getName());
121    conf.setInt("hbase.regionserver.handler.count", 20);
122    conf.setInt("hbase.bucketcache.size", 400);
123    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
124    conf.setFloat("hfile.block.cache.size", 0.2f);
125    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
126    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
127    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
128    FAMILIES_1[0] = FAMILY;
129    TEST_UTIL.startMiniCluster(SLAVES);
130  }
131
132  /**
133   * @throws java.lang.Exception
134   */
135  @AfterClass
136  public static void tearDownAfterClass() throws Exception {
137    TEST_UTIL.shutdownMiniCluster();
138  }
139
140  /**
141   * @throws java.lang.Exception
142   */
143  @Before
144  public void setUp() throws Exception {
145    CustomInnerRegionObserver.waitForGets.set(false);
146    CustomInnerRegionObserver.countOfNext.set(0);
147    CustomInnerRegionObserver.countOfGets.set(0);
148  }
149
150  /**
151   * @throws java.lang.Exception
152   */
153  @After
154  public void tearDown() throws Exception {
155    if (latch != null) {
156      while (latch.getCount() > 0) {
157        latch.countDown();
158      }
159    }
160    if (getLatch != null) {
161      getLatch.countDown();
162    }
163    if (compactionLatch != null) {
164      compactionLatch.countDown();
165    }
166    if (exceptionLatch != null) {
167      exceptionLatch.countDown();
168    }
169    latch = null;
170    getLatch = null;
171    compactionLatch = null;
172    exceptionLatch = null;
173    CustomInnerRegionObserver.throwException.set(false);
174    // Clean up the tables for every test case
175    TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
176    for (TableName tableName : listTableNames) {
177      if (!tableName.isSystemTable()) {
178        TEST_UTIL.getAdmin().disableTable(tableName);
179        TEST_UTIL.getAdmin().deleteTable(tableName);
180      }
181    }
182  }
183
184  @Test
185  public void testBlockEvictionWithParallelScans() throws Exception {
186    Table table = null;
187    try {
188      latch = new CountDownLatch(1);
189      final TableName tableName = TableName.valueOf(name.getMethodName());
190      // Create a table with block size as 1024
191      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
192        CustomInnerRegionObserver.class.getName());
193      // get the block cache and region
194      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
195      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
196      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
197      HStore store = region.getStores().iterator().next();
198      CacheConfig cacheConf = store.getCacheConfig();
199      cacheConf.setCacheDataOnWrite(true);
200      cacheConf.setEvictOnClose(true);
201      BlockCache cache = cacheConf.getBlockCache().get();
202
203      // insert data. 2 Rows are added
204      Put put = new Put(ROW);
205      put.addColumn(FAMILY, QUALIFIER, data);
206      table.put(put);
207      put = new Put(ROW1);
208      put.addColumn(FAMILY, QUALIFIER, data);
209      table.put(put);
210      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
211      // data was in memstore so don't expect any changes
212      // flush the data
213      // Should create one Hfile with 2 blocks
214      region.flush(true);
215      // Load cache
216      // Create three sets of scan
217      ScanThread[] scanThreads = initiateScan(table, false);
218      Thread.sleep(100);
219      checkForBlockEviction(cache, false, false);
220      for (ScanThread thread : scanThreads) {
221        thread.join();
222      }
223      // CustomInnerRegionObserver.sleepTime.set(0);
224      Iterator<CachedBlock> iterator = cache.iterator();
225      iterateBlockCache(cache, iterator);
226      // read the data and expect same blocks, one new hit, no misses
227      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
228      iterator = cache.iterator();
229      iterateBlockCache(cache, iterator);
230      // Check how this miss is happening
231      // insert a second column, read the row, no new blocks, 3 new hits
232      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
233      byte[] data2 = Bytes.add(data, data);
234      put = new Put(ROW);
235      put.addColumn(FAMILY, QUALIFIER2, data2);
236      table.put(put);
237      Result r = table.get(new Get(ROW));
238      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
239      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
240      iterator = cache.iterator();
241      iterateBlockCache(cache, iterator);
242      // flush, one new block
243      System.out.println("Flushing cache");
244      region.flush(true);
245      iterator = cache.iterator();
246      iterateBlockCache(cache, iterator);
247      // compact, net minus two blocks, two hits, no misses
248      System.out.println("Compacting");
249      assertEquals(2, store.getStorefilesCount());
250      store.triggerMajorCompaction();
251      region.compact(true);
252      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
253      assertEquals(1, store.getStorefilesCount());
254      iterator = cache.iterator();
255      iterateBlockCache(cache, iterator);
256      // read the row, this should be a cache miss because we don't cache data
257      // blocks on compaction
258      r = table.get(new Get(ROW));
259      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
260      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
261      iterator = cache.iterator();
262      iterateBlockCache(cache, iterator);
263    } finally {
264      if (table != null) {
265        table.close();
266      }
267    }
268  }
269
270  @Test
271  public void testParallelGetsAndScans() throws IOException, InterruptedException {
272    Table table = null;
273    try {
274      latch = new CountDownLatch(2);
275      // Check if get() returns blocks on its close() itself
276      getLatch = new CountDownLatch(1);
277      final TableName tableName = TableName.valueOf(name.getMethodName());
278      // Create KV that will give you two blocks
279      // Create a table with block size as 1024
280      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
281        CustomInnerRegionObserver.class.getName());
282      // get the block cache and region
283      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
284      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
285      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
286      HStore store = region.getStores().iterator().next();
287      CacheConfig cacheConf = store.getCacheConfig();
288      cacheConf.setCacheDataOnWrite(true);
289      cacheConf.setEvictOnClose(true);
290      BlockCache cache = cacheConf.getBlockCache().get();
291
292      insertData(table);
293      // flush the data
294      System.out.println("Flushing cache");
295      // Should create one Hfile with 2 blocks
296      region.flush(true);
297      // Create three sets of scan
298      CustomInnerRegionObserver.waitForGets.set(true);
299      ScanThread[] scanThreads = initiateScan(table, false);
300      // Create three sets of gets
301      GetThread[] getThreads = initiateGet(table, false, false);
302      checkForBlockEviction(cache, false, false);
303      CustomInnerRegionObserver.waitForGets.set(false);
304      checkForBlockEviction(cache, false, false);
305      for (GetThread thread : getThreads) {
306        thread.join();
307      }
308      // Verify whether the gets have returned the blocks that it had
309      CustomInnerRegionObserver.waitForGets.set(true);
310      // giving some time for the block to be decremented
311      checkForBlockEviction(cache, true, false);
312      getLatch.countDown();
313      for (ScanThread thread : scanThreads) {
314        thread.join();
315      }
316      System.out.println("Scans should have returned the bloks");
317      // Check with either true or false
318      CustomInnerRegionObserver.waitForGets.set(false);
319      // The scan should also have released the blocks by now
320      checkForBlockEviction(cache, true, true);
321    } finally {
322      if (table != null) {
323        table.close();
324      }
325    }
326  }
327
328  @Test
329  public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
330    Table table = null;
331    try {
332      latch = new CountDownLatch(1);
333      // Check if get() returns blocks on its close() itself
334      getLatch = new CountDownLatch(1);
335      final TableName tableName = TableName.valueOf(name.getMethodName());
336      // Create KV that will give you two blocks
337      // Create a table with block size as 1024
338      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
339        CustomInnerRegionObserver.class.getName());
340      // get the block cache and region
341      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
342      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
343      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
344      HStore store = region.getStores().iterator().next();
345      CacheConfig cacheConf = store.getCacheConfig();
346      cacheConf.setCacheDataOnWrite(true);
347      cacheConf.setEvictOnClose(true);
348      BlockCache cache = cacheConf.getBlockCache().get();
349
350      Put put = new Put(ROW);
351      put.addColumn(FAMILY, QUALIFIER, data);
352      table.put(put);
353      region.flush(true);
354      put = new Put(ROW1);
355      put.addColumn(FAMILY, QUALIFIER, data);
356      table.put(put);
357      region.flush(true);
358      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
359      put = new Put(ROW);
360      put.addColumn(FAMILY, QUALIFIER2, data2);
361      table.put(put);
362      region.flush(true);
363      // flush the data
364      System.out.println("Flushing cache");
365      // Should create one Hfile with 2 blocks
366      CustomInnerRegionObserver.waitForGets.set(true);
367      // Create three sets of gets
368      GetThread[] getThreads = initiateGet(table, false, false);
369      Thread.sleep(200);
370      CustomInnerRegionObserver.getCdl().get().countDown();
371      for (GetThread thread : getThreads) {
372        thread.join();
373      }
374      // Verify whether the gets have returned the blocks that it had
375      CustomInnerRegionObserver.waitForGets.set(true);
376      // giving some time for the block to be decremented
377      checkForBlockEviction(cache, true, false);
378      getLatch.countDown();
379      System.out.println("Gets should have returned the bloks");
380    } finally {
381      if (table != null) {
382        table.close();
383      }
384    }
385  }
386
387  @Test
388  // TODO : check how block index works here
389  public void testGetsWithMultiColumnsAndExplicitTracker()
390    throws IOException, InterruptedException {
391    Table table = null;
392    try {
393      latch = new CountDownLatch(1);
394      // Check if get() returns blocks on its close() itself
395      getLatch = new CountDownLatch(1);
396      final TableName tableName = TableName.valueOf(name.getMethodName());
397      // Create KV that will give you two blocks
398      // Create a table with block size as 1024
399      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
400        CustomInnerRegionObserver.class.getName());
401      // get the block cache and region
402      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
403      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
404      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
405      BlockCache cache = setCacheProperties(region);
406      Put put = new Put(ROW);
407      put.addColumn(FAMILY, QUALIFIER, data);
408      table.put(put);
409      region.flush(true);
410      put = new Put(ROW1);
411      put.addColumn(FAMILY, QUALIFIER, data);
412      table.put(put);
413      region.flush(true);
414      for (int i = 1; i < 10; i++) {
415        put = new Put(ROW);
416        put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
417        table.put(put);
418        if (i % 2 == 0) {
419          region.flush(true);
420        }
421      }
422      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
423      put = new Put(ROW);
424      put.addColumn(FAMILY, QUALIFIER2, data2);
425      table.put(put);
426      region.flush(true);
427      // flush the data
428      System.out.println("Flushing cache");
429      // Should create one Hfile with 2 blocks
430      CustomInnerRegionObserver.waitForGets.set(true);
431      // Create three sets of gets
432      GetThread[] getThreads = initiateGet(table, true, false);
433      Thread.sleep(200);
434      Iterator<CachedBlock> iterator = cache.iterator();
435      boolean usedBlocksFound = false;
436      int refCount = 0;
437      int noOfBlocksWithRef = 0;
438      while (iterator.hasNext()) {
439        CachedBlock next = iterator.next();
440        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
441        if (cache instanceof BucketCache) {
442          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
443        } else if (cache instanceof CombinedBlockCache) {
444          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
445        } else {
446          continue;
447        }
448        if (refCount != 0) {
449          // Blocks will be with count 3
450          System.out.println("The refCount is " + refCount);
451          assertEquals(NO_OF_THREADS, refCount);
452          usedBlocksFound = true;
453          noOfBlocksWithRef++;
454        }
455      }
456      assertTrue(usedBlocksFound);
457      // the number of blocks referred
458      assertEquals(10, noOfBlocksWithRef);
459      CustomInnerRegionObserver.getCdl().get().countDown();
460      for (GetThread thread : getThreads) {
461        thread.join();
462      }
463      // Verify whether the gets have returned the blocks that it had
464      CustomInnerRegionObserver.waitForGets.set(true);
465      // giving some time for the block to be decremented
466      checkForBlockEviction(cache, true, false);
467      getLatch.countDown();
468      System.out.println("Gets should have returned the bloks");
469    } finally {
470      if (table != null) {
471        table.close();
472      }
473    }
474  }
475
476  @Test
477  public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
478    Table table = null;
479    try {
480      latch = new CountDownLatch(1);
481      // Check if get() returns blocks on its close() itself
482      getLatch = new CountDownLatch(1);
483      final TableName tableName = TableName.valueOf(name.getMethodName());
484      // Create KV that will give you two blocks
485      // Create a table with block size as 1024
486      byte[][] fams = new byte[10][];
487      fams[0] = FAMILY;
488      for (int i = 1; i < 10; i++) {
489        fams[i] = (Bytes.toBytes("testFamily" + i));
490      }
491      table =
492        TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName());
493      // get the block cache and region
494      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
495      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
496      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
497      BlockCache cache = setCacheProperties(region);
498
499      Put put = new Put(ROW);
500      put.addColumn(FAMILY, QUALIFIER, data);
501      table.put(put);
502      region.flush(true);
503      put = new Put(ROW1);
504      put.addColumn(FAMILY, QUALIFIER, data);
505      table.put(put);
506      region.flush(true);
507      for (int i = 1; i < 10; i++) {
508        put = new Put(ROW);
509        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
510        table.put(put);
511        if (i % 2 == 0) {
512          region.flush(true);
513        }
514      }
515      region.flush(true);
516      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
517      put = new Put(ROW);
518      put.addColumn(FAMILY, QUALIFIER2, data2);
519      table.put(put);
520      region.flush(true);
521      // flush the data
522      System.out.println("Flushing cache");
523      // Should create one Hfile with 2 blocks
524      CustomInnerRegionObserver.waitForGets.set(true);
525      // Create three sets of gets
526      GetThread[] getThreads = initiateGet(table, true, true);
527      Thread.sleep(200);
528      Iterator<CachedBlock> iterator = cache.iterator();
529      boolean usedBlocksFound = false;
530      int refCount = 0;
531      int noOfBlocksWithRef = 0;
532      while (iterator.hasNext()) {
533        CachedBlock next = iterator.next();
534        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
535        if (cache instanceof BucketCache) {
536          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
537        } else if (cache instanceof CombinedBlockCache) {
538          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
539        } else {
540          continue;
541        }
542        if (refCount != 0) {
543          // Blocks will be with count 3
544          System.out.println("The refCount is " + refCount);
545          assertEquals(NO_OF_THREADS, refCount);
546          usedBlocksFound = true;
547          noOfBlocksWithRef++;
548        }
549      }
550      assertTrue(usedBlocksFound);
551      // the number of blocks referred
552      assertEquals(3, noOfBlocksWithRef);
553      CustomInnerRegionObserver.getCdl().get().countDown();
554      for (GetThread thread : getThreads) {
555        thread.join();
556      }
557      // Verify whether the gets have returned the blocks that it had
558      CustomInnerRegionObserver.waitForGets.set(true);
559      // giving some time for the block to be decremented
560      checkForBlockEviction(cache, true, false);
561      getLatch.countDown();
562      System.out.println("Gets should have returned the bloks");
563    } finally {
564      if (table != null) {
565        table.close();
566      }
567    }
568  }
569
570  @Test
571  public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
572    Table table = null;
573    try {
574      final TableName tableName = TableName.valueOf(name.getMethodName());
575      TableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName);
576      // This test expects rpc refcount of cached data blocks to be 0 after split. After split,
577      // two daughter regions are opened and a compaction is scheduled to get rid of reference
578      // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1.
579      // It is flakey since compaction can kick in anytime. To solve this issue, table is created
580      // with compaction disabled.
581      table = TEST_UTIL.createTable(
582        TableDescriptorBuilder.newBuilder(desc).setCompactionEnabled(false).build(), FAMILIES_1,
583        null, BloomType.ROW, 1024, null);
584      // get the block cache and region
585      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
586      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
587      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
588      HStore store = region.getStores().iterator().next();
589      CacheConfig cacheConf = store.getCacheConfig();
590      cacheConf.setEvictOnClose(true);
591      BlockCache cache = cacheConf.getBlockCache().get();
592
593      Put put = new Put(ROW);
594      put.addColumn(FAMILY, QUALIFIER, data);
595      table.put(put);
596      region.flush(true);
597      put = new Put(ROW1);
598      put.addColumn(FAMILY, QUALIFIER, data);
599      table.put(put);
600      region.flush(true);
601      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
602      put = new Put(ROW2);
603      put.addColumn(FAMILY, QUALIFIER2, data2);
604      table.put(put);
605      put = new Put(ROW3);
606      put.addColumn(FAMILY, QUALIFIER2, data2);
607      table.put(put);
608      region.flush(true);
609      ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
610      int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
611      LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(),
612        regionCount);
613      TEST_UTIL.getAdmin().split(tableName, ROW1);
614      // Wait for splits
615      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
616      region.compact(true);
617      List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions();
618      for (HRegion r : regions) {
619        LOG.info("" + r.getCompactionState());
620        TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE));
621      }
622      LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache);
623      Iterator<CachedBlock> iterator = cache.iterator();
624      // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
625      // should be closed inorder to return those blocks
626      iterateBlockCache(cache, iterator);
627    } finally {
628      if (table != null) {
629        table.close();
630      }
631    }
632  }
633
634  @Test
635  public void testMultiGets() throws IOException, InterruptedException {
636    Table table = null;
637    try {
638      latch = new CountDownLatch(2);
639      // Check if get() returns blocks on its close() itself
640      getLatch = new CountDownLatch(1);
641      final TableName tableName = TableName.valueOf(name.getMethodName());
642      // Create KV that will give you two blocks
643      // Create a table with block size as 1024
644      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
645        CustomInnerRegionObserver.class.getName());
646      // get the block cache and region
647      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
648      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
649      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
650      HStore store = region.getStores().iterator().next();
651      CacheConfig cacheConf = store.getCacheConfig();
652      cacheConf.setCacheDataOnWrite(true);
653      cacheConf.setEvictOnClose(true);
654      BlockCache cache = cacheConf.getBlockCache().get();
655
656      Put put = new Put(ROW);
657      put.addColumn(FAMILY, QUALIFIER, data);
658      table.put(put);
659      region.flush(true);
660      put = new Put(ROW1);
661      put.addColumn(FAMILY, QUALIFIER, data);
662      table.put(put);
663      region.flush(true);
664      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
665      put = new Put(ROW);
666      put.addColumn(FAMILY, QUALIFIER2, data2);
667      table.put(put);
668      region.flush(true);
669      // flush the data
670      System.out.println("Flushing cache");
671      // Should create one Hfile with 2 blocks
672      CustomInnerRegionObserver.waitForGets.set(true);
673      // Create three sets of gets
674      MultiGetThread[] getThreads = initiateMultiGet(table);
675      Thread.sleep(200);
676      int refCount;
677      Iterator<CachedBlock> iterator = cache.iterator();
678      boolean foundNonZeroBlock = false;
679      while (iterator.hasNext()) {
680        CachedBlock next = iterator.next();
681        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
682        if (cache instanceof BucketCache) {
683          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
684        } else if (cache instanceof CombinedBlockCache) {
685          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
686        } else {
687          continue;
688        }
689        if (refCount != 0) {
690          assertEquals(NO_OF_THREADS, refCount);
691          foundNonZeroBlock = true;
692        }
693      }
694      assertTrue("Should have found nonzero ref count block", foundNonZeroBlock);
695      CustomInnerRegionObserver.getCdl().get().countDown();
696      CustomInnerRegionObserver.getCdl().get().countDown();
697      for (MultiGetThread thread : getThreads) {
698        thread.join();
699      }
700      // Verify whether the gets have returned the blocks that it had
701      CustomInnerRegionObserver.waitForGets.set(true);
702      // giving some time for the block to be decremented
703      iterateBlockCache(cache, iterator);
704      getLatch.countDown();
705      System.out.println("Gets should have returned the bloks");
706    } finally {
707      if (table != null) {
708        table.close();
709      }
710    }
711  }
712
713  @Test
714  public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
715    Table table = null;
716    try {
717      latch = new CountDownLatch(1);
718      // Check if get() returns blocks on its close() itself
719      final TableName tableName = TableName.valueOf(name.getMethodName());
720      // Create KV that will give you two blocks
721      // Create a table with block size as 1024
722      byte[][] fams = new byte[10][];
723      fams[0] = FAMILY;
724      for (int i = 1; i < 10; i++) {
725        fams[i] = (Bytes.toBytes("testFamily" + i));
726      }
727      table =
728        TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName());
729      // get the block cache and region
730      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
731      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
732      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
733      BlockCache cache = setCacheProperties(region);
734
735      Put put = new Put(ROW);
736      put.addColumn(FAMILY, QUALIFIER, data);
737      table.put(put);
738      region.flush(true);
739      put = new Put(ROW1);
740      put.addColumn(FAMILY, QUALIFIER, data);
741      table.put(put);
742      region.flush(true);
743      for (int i = 1; i < 10; i++) {
744        put = new Put(ROW);
745        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
746        table.put(put);
747        if (i % 2 == 0) {
748          region.flush(true);
749        }
750      }
751      region.flush(true);
752      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
753      put = new Put(ROW);
754      put.addColumn(FAMILY, QUALIFIER2, data2);
755      table.put(put);
756      region.flush(true);
757      // flush the data
758      System.out.println("Flushing cache");
759      // Should create one Hfile with 2 blocks
760      // Create three sets of gets
761      ScanThread[] scanThreads = initiateScan(table, true);
762      Thread.sleep(200);
763      Iterator<CachedBlock> iterator = cache.iterator();
764      boolean usedBlocksFound = false;
765      int refCount = 0;
766      int noOfBlocksWithRef = 0;
767      while (iterator.hasNext()) {
768        CachedBlock next = iterator.next();
769        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
770        if (cache instanceof BucketCache) {
771          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
772        } else if (cache instanceof CombinedBlockCache) {
773          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
774        } else {
775          continue;
776        }
777        if (refCount != 0) {
778          // Blocks will be with count 3
779          System.out.println("The refCount is " + refCount);
780          assertEquals(NO_OF_THREADS, refCount);
781          usedBlocksFound = true;
782          noOfBlocksWithRef++;
783        }
784      }
785      assertTrue(usedBlocksFound);
786      // the number of blocks referred
787      assertEquals(12, noOfBlocksWithRef);
788      CustomInnerRegionObserver.getCdl().get().countDown();
789      for (ScanThread thread : scanThreads) {
790        thread.join();
791      }
792      // giving some time for the block to be decremented
793      checkForBlockEviction(cache, true, false);
794    } finally {
795      if (table != null) {
796        table.close();
797      }
798    }
799  }
800
801  private BlockCache setCacheProperties(HRegion region) {
802    Iterator<HStore> strItr = region.getStores().iterator();
803    BlockCache cache = null;
804    while (strItr.hasNext()) {
805      HStore store = strItr.next();
806      CacheConfig cacheConf = store.getCacheConfig();
807      cacheConf.setCacheDataOnWrite(true);
808      cacheConf.setEvictOnClose(true);
809      // Use the last one
810      cache = cacheConf.getBlockCache().get();
811    }
812    return cache;
813  }
814
815  @Test
816  public void testParallelGetsAndScanWithWrappedRegionScanner()
817    throws IOException, InterruptedException {
818    Table table = null;
819    try {
820      latch = new CountDownLatch(2);
821      // Check if get() returns blocks on its close() itself
822      getLatch = new CountDownLatch(1);
823      final TableName tableName = TableName.valueOf(name.getMethodName());
824      // Create KV that will give you two blocks
825      // Create a table with block size as 1024
826      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
827        CustomInnerRegionObserverWrapper.class.getName());
828      // get the block cache and region
829      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
830      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
831      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
832      HStore store = region.getStores().iterator().next();
833      CacheConfig cacheConf = store.getCacheConfig();
834      cacheConf.setCacheDataOnWrite(true);
835      cacheConf.setEvictOnClose(true);
836      BlockCache cache = cacheConf.getBlockCache().get();
837
838      // insert data. 2 Rows are added
839      insertData(table);
840      // flush the data
841      System.out.println("Flushing cache");
842      // Should create one Hfile with 2 blocks
843      region.flush(true);
844      // CustomInnerRegionObserver.sleepTime.set(5000);
845      // Create three sets of scan
846      CustomInnerRegionObserver.waitForGets.set(true);
847      ScanThread[] scanThreads = initiateScan(table, false);
848      // Create three sets of gets
849      GetThread[] getThreads = initiateGet(table, false, false);
850      // The block would have been decremented for the scan case as it was
851      // wrapped
852      // before even the postNext hook gets executed.
853      // giving some time for the block to be decremented
854      Thread.sleep(100);
855      CustomInnerRegionObserver.waitForGets.set(false);
856      checkForBlockEviction(cache, false, false);
857      // countdown the latch
858      CustomInnerRegionObserver.getCdl().get().countDown();
859      for (GetThread thread : getThreads) {
860        thread.join();
861      }
862      getLatch.countDown();
863      for (ScanThread thread : scanThreads) {
864        thread.join();
865      }
866    } finally {
867      if (table != null) {
868        table.close();
869      }
870    }
871  }
872
873  @Test
874  public void testScanWithCompaction() throws IOException, InterruptedException {
875    testScanWithCompactionInternals(name.getMethodName(), false);
876  }
877
878  @Test
879  public void testReverseScanWithCompaction() throws IOException, InterruptedException {
880    testScanWithCompactionInternals(name.getMethodName(), true);
881  }
882
883  private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
884    throws IOException, InterruptedException {
885    Table table = null;
886    try {
887      latch = new CountDownLatch(1);
888      compactionLatch = new CountDownLatch(1);
889      TableName tableName = TableName.valueOf(tableNameStr);
890      // Create a table with block size as 1024
891      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
892        CustomInnerRegionObserverWrapper.class.getName());
893      // get the block cache and region
894      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
895      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
896      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
897      HStore store = region.getStores().iterator().next();
898      CacheConfig cacheConf = store.getCacheConfig();
899      cacheConf.setCacheDataOnWrite(true);
900      cacheConf.setEvictOnClose(true);
901      BlockCache cache = cacheConf.getBlockCache().get();
902
903      // insert data. 2 Rows are added
904      Put put = new Put(ROW);
905      put.addColumn(FAMILY, QUALIFIER, data);
906      table.put(put);
907      put = new Put(ROW1);
908      put.addColumn(FAMILY, QUALIFIER, data);
909      table.put(put);
910      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
911      // Should create one Hfile with 2 blocks
912      region.flush(true);
913      // read the data and expect same blocks, one new hit, no misses
914      int refCount = 0;
915      // Check how this miss is happening
916      // insert a second column, read the row, no new blocks, 3 new hits
917      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
918      byte[] data2 = Bytes.add(data, data);
919      put = new Put(ROW);
920      put.addColumn(FAMILY, QUALIFIER2, data2);
921      table.put(put);
922      // flush, one new block
923      System.out.println("Flushing cache");
924      region.flush(true);
925      Iterator<CachedBlock> iterator = cache.iterator();
926      iterateBlockCache(cache, iterator);
927      // Create three sets of scan
928      ScanThread[] scanThreads = initiateScan(table, reversed);
929      Thread.sleep(100);
930      iterator = cache.iterator();
931      boolean usedBlocksFound = false;
932      while (iterator.hasNext()) {
933        CachedBlock next = iterator.next();
934        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
935        if (cache instanceof BucketCache) {
936          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
937        } else if (cache instanceof CombinedBlockCache) {
938          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
939        } else {
940          continue;
941        }
942        if (refCount != 0) {
943          // Blocks will be with count 3
944          assertEquals(NO_OF_THREADS, refCount);
945          usedBlocksFound = true;
946        }
947      }
948      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
949      usedBlocksFound = false;
950      System.out.println("Compacting");
951      assertEquals(2, store.getStorefilesCount());
952      store.triggerMajorCompaction();
953      region.compact(true);
954      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
955      assertEquals(1, store.getStorefilesCount());
956      // Even after compaction is done we will have some blocks that cannot
957      // be evicted this is because the scan is still referencing them
958      iterator = cache.iterator();
959      while (iterator.hasNext()) {
960        CachedBlock next = iterator.next();
961        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
962        if (cache instanceof BucketCache) {
963          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
964        } else if (cache instanceof CombinedBlockCache) {
965          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
966        } else {
967          continue;
968        }
969        if (refCount != 0) {
970          // Blocks will be with count 3 as they are not yet cleared
971          assertEquals(NO_OF_THREADS, refCount);
972          usedBlocksFound = true;
973        }
974      }
975      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
976      // Should not throw exception
977      compactionLatch.countDown();
978      latch.countDown();
979      for (ScanThread thread : scanThreads) {
980        thread.join();
981      }
982      // by this time all blocks should have been evicted
983      iterator = cache.iterator();
984      iterateBlockCache(cache, iterator);
985      Result r = table.get(new Get(ROW));
986      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
987      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
988      // The gets would be working on new blocks
989      iterator = cache.iterator();
990      iterateBlockCache(cache, iterator);
991    } finally {
992      if (table != null) {
993        table.close();
994      }
995    }
996  }
997
998  @Test
999  public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
1000    throws IOException, InterruptedException {
1001    // do flush and scan in parallel
1002    Table table = null;
1003    try {
1004      latch = new CountDownLatch(1);
1005      compactionLatch = new CountDownLatch(1);
1006      final TableName tableName = TableName.valueOf(name.getMethodName());
1007      // Create a table with block size as 1024
1008      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1009        CustomInnerRegionObserverWrapper.class.getName());
1010      // get the block cache and region
1011      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1012      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1013      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1014      HStore store = region.getStores().iterator().next();
1015      CacheConfig cacheConf = store.getCacheConfig();
1016      cacheConf.setCacheDataOnWrite(true);
1017      cacheConf.setEvictOnClose(true);
1018      BlockCache cache = cacheConf.getBlockCache().get();
1019
1020      // insert data. 2 Rows are added
1021      Put put = new Put(ROW);
1022      put.addColumn(FAMILY, QUALIFIER, data);
1023      table.put(put);
1024      put = new Put(ROW1);
1025      put.addColumn(FAMILY, QUALIFIER, data);
1026      table.put(put);
1027      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1028      // Should create one Hfile with 2 blocks
1029      region.flush(true);
1030      // read the data and expect same blocks, one new hit, no misses
1031      int refCount = 0;
1032      // Check how this miss is happening
1033      // insert a second column, read the row, no new blocks, 3 new hits
1034      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1035      byte[] data2 = Bytes.add(data, data);
1036      put = new Put(ROW);
1037      put.addColumn(FAMILY, QUALIFIER2, data2);
1038      table.put(put);
1039      // flush, one new block
1040      System.out.println("Flushing cache");
1041      region.flush(true);
1042      Iterator<CachedBlock> iterator = cache.iterator();
1043      iterateBlockCache(cache, iterator);
1044      // Create three sets of scan
1045      ScanThread[] scanThreads = initiateScan(table, false);
1046      Thread.sleep(100);
1047      iterator = cache.iterator();
1048      boolean usedBlocksFound = false;
1049      while (iterator.hasNext()) {
1050        CachedBlock next = iterator.next();
1051        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1052        if (cache instanceof BucketCache) {
1053          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1054        } else if (cache instanceof CombinedBlockCache) {
1055          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1056        } else {
1057          continue;
1058        }
1059        if (refCount != 0) {
1060          // Blocks will be with count 3
1061          assertEquals(NO_OF_THREADS, refCount);
1062          usedBlocksFound = true;
1063        }
1064      }
1065      // Make a put and do a flush
1066      QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1067      data2 = Bytes.add(data, data);
1068      put = new Put(ROW1);
1069      put.addColumn(FAMILY, QUALIFIER2, data2);
1070      table.put(put);
1071      // flush, one new block
1072      System.out.println("Flushing cache");
1073      region.flush(true);
1074      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1075      usedBlocksFound = false;
1076      System.out.println("Compacting");
1077      assertEquals(3, store.getStorefilesCount());
1078      store.triggerMajorCompaction();
1079      region.compact(true);
1080      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1081      assertEquals(1, store.getStorefilesCount());
1082      // Even after compaction is done we will have some blocks that cannot
1083      // be evicted this is because the scan is still referencing them
1084      iterator = cache.iterator();
1085      while (iterator.hasNext()) {
1086        CachedBlock next = iterator.next();
1087        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1088        if (cache instanceof BucketCache) {
1089          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1090        } else if (cache instanceof CombinedBlockCache) {
1091          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1092        } else {
1093          continue;
1094        }
1095        if (refCount != 0) {
1096          // Blocks will be with count 3 as they are not yet cleared
1097          assertEquals(NO_OF_THREADS, refCount);
1098          usedBlocksFound = true;
1099        }
1100      }
1101      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1102      // Should not throw exception
1103      compactionLatch.countDown();
1104      latch.countDown();
1105      for (ScanThread thread : scanThreads) {
1106        thread.join();
1107      }
1108      // by this time all blocks should have been evicted
1109      iterator = cache.iterator();
1110      // Since a flush and compaction happened after a scan started
1111      // we need to ensure that all the original blocks of the compacted file
1112      // is also removed.
1113      iterateBlockCache(cache, iterator);
1114      Result r = table.get(new Get(ROW));
1115      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1116      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1117      // The gets would be working on new blocks
1118      iterator = cache.iterator();
1119      iterateBlockCache(cache, iterator);
1120    } finally {
1121      if (table != null) {
1122        table.close();
1123      }
1124    }
1125  }
1126
1127  @Test
1128  public void testScanWithException() throws IOException, InterruptedException {
1129    Table table = null;
1130    try {
1131      latch = new CountDownLatch(1);
1132      exceptionLatch = new CountDownLatch(1);
1133      final TableName tableName = TableName.valueOf(name.getMethodName());
1134      // Create KV that will give you two blocks
1135      // Create a table with block size as 1024
1136      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1137        CustomInnerRegionObserverWrapper.class.getName());
1138      // get the block cache and region
1139      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1140      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1141      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1142      HStore store = region.getStores().iterator().next();
1143      CacheConfig cacheConf = store.getCacheConfig();
1144      cacheConf.setCacheDataOnWrite(true);
1145      cacheConf.setEvictOnClose(true);
1146      BlockCache cache = cacheConf.getBlockCache().get();
1147      // insert data. 2 Rows are added
1148      insertData(table);
1149      // flush the data
1150      System.out.println("Flushing cache");
1151      // Should create one Hfile with 2 blocks
1152      region.flush(true);
1153      // CustomInnerRegionObserver.sleepTime.set(5000);
1154      CustomInnerRegionObserver.throwException.set(true);
1155      ScanThread[] scanThreads = initiateScan(table, false);
1156      // The block would have been decremented for the scan case as it was
1157      // wrapped
1158      // before even the postNext hook gets executed.
1159      // giving some time for the block to be decremented
1160      Thread.sleep(100);
1161      Iterator<CachedBlock> iterator = cache.iterator();
1162      boolean usedBlocksFound = false;
1163      int refCount = 0;
1164      while (iterator.hasNext()) {
1165        CachedBlock next = iterator.next();
1166        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1167        if (cache instanceof BucketCache) {
1168          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1169        } else if (cache instanceof CombinedBlockCache) {
1170          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1171        } else {
1172          continue;
1173        }
1174        if (refCount != 0) {
1175          // Blocks will be with count 3
1176          assertEquals(NO_OF_THREADS, refCount);
1177          usedBlocksFound = true;
1178        }
1179      }
1180      assertTrue(usedBlocksFound);
1181      exceptionLatch.countDown();
1182      // countdown the latch
1183      CustomInnerRegionObserver.getCdl().get().countDown();
1184      for (ScanThread thread : scanThreads) {
1185        thread.join();
1186      }
1187      iterator = cache.iterator();
1188      usedBlocksFound = false;
1189      refCount = 0;
1190      while (iterator.hasNext()) {
1191        CachedBlock next = iterator.next();
1192        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1193        if (cache instanceof BucketCache) {
1194          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1195        } else if (cache instanceof CombinedBlockCache) {
1196          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1197        } else {
1198          continue;
1199        }
1200        if (refCount != 0) {
1201          // Blocks will be with count 3
1202          assertEquals(NO_OF_THREADS, refCount);
1203          usedBlocksFound = true;
1204        }
1205      }
1206      assertFalse(usedBlocksFound);
1207      // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1208      assertEquals(0, refCount);
1209    } finally {
1210      if (table != null) {
1211        table.close();
1212      }
1213    }
1214  }
1215
1216  private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1217    int refCount;
1218    while (iterator.hasNext()) {
1219      CachedBlock next = iterator.next();
1220      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1221      if (cache instanceof BucketCache) {
1222        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1223        LOG.info("BucketCache {} {}", cacheKey, refCount);
1224      } else if (cache instanceof CombinedBlockCache) {
1225        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1226        LOG.info("CombinedBlockCache {} {}", cacheKey, refCount);
1227      } else {
1228        continue;
1229      }
1230      assertEquals(0, refCount);
1231    }
1232  }
1233
1234  private void insertData(Table table) throws IOException {
1235    Put put = new Put(ROW);
1236    put.addColumn(FAMILY, QUALIFIER, data);
1237    table.put(put);
1238    put = new Put(ROW1);
1239    put.addColumn(FAMILY, QUALIFIER, data);
1240    table.put(put);
1241    byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1242    put = new Put(ROW);
1243    put.addColumn(FAMILY, QUALIFIER2, data2);
1244    table.put(put);
1245  }
1246
1247  private ScanThread[] initiateScan(Table table, boolean reverse)
1248    throws IOException, InterruptedException {
1249    ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1250    for (int i = 0; i < NO_OF_THREADS; i++) {
1251      scanThreads[i] = new ScanThread(table, reverse);
1252    }
1253    for (ScanThread thread : scanThreads) {
1254      thread.start();
1255    }
1256    return scanThreads;
1257  }
1258
1259  private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1260    throws IOException, InterruptedException {
1261    GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1262    for (int i = 0; i < NO_OF_THREADS; i++) {
1263      getThreads[i] = new GetThread(table, tracker, multipleCFs);
1264    }
1265    for (GetThread thread : getThreads) {
1266      thread.start();
1267    }
1268    return getThreads;
1269  }
1270
1271  private MultiGetThread[] initiateMultiGet(Table table) throws IOException, InterruptedException {
1272    MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1273    for (int i = 0; i < NO_OF_THREADS; i++) {
1274      multiGetThreads[i] = new MultiGetThread(table);
1275    }
1276    for (MultiGetThread thread : multiGetThreads) {
1277      thread.start();
1278    }
1279    return multiGetThreads;
1280  }
1281
1282  private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1283    throws InterruptedException {
1284    int counter = NO_OF_THREADS;
1285    if (CustomInnerRegionObserver.waitForGets.get()) {
1286      // Because only one row is selected, it has only 2 blocks
1287      counter = counter - 1;
1288      while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1289        Thread.sleep(100);
1290      }
1291    } else {
1292      while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1293        Thread.sleep(100);
1294      }
1295    }
1296    Iterator<CachedBlock> iterator = cache.iterator();
1297    int refCount = 0;
1298    while (iterator.hasNext()) {
1299      CachedBlock next = iterator.next();
1300      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1301      if (cache instanceof BucketCache) {
1302        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1303      } else if (cache instanceof CombinedBlockCache) {
1304        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1305      } else {
1306        continue;
1307      }
1308      System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1309      if (CustomInnerRegionObserver.waitForGets.get()) {
1310        if (expectOnlyZero) {
1311          assertTrue(refCount == 0);
1312        }
1313        if (refCount != 0) {
1314          // Because the scan would have also touched up on these blocks but
1315          // it
1316          // would have touched
1317          // all 3
1318          if (getClosed) {
1319            // If get has closed only the scan's blocks would be available
1320            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1321          } else {
1322            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1323          }
1324        }
1325      } else {
1326        // Because the get would have also touched up on these blocks but it
1327        // would have touched
1328        // upon only 2 additionally
1329        if (expectOnlyZero) {
1330          assertTrue(refCount == 0);
1331        }
1332        if (refCount != 0) {
1333          if (getLatch == null) {
1334            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1335          } else {
1336            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1337          }
1338        }
1339      }
1340    }
1341    CustomInnerRegionObserver.getCdl().get().countDown();
1342  }
1343
1344  private static class MultiGetThread extends Thread {
1345    private final Table table;
1346    private final List<Get> gets = new ArrayList<>();
1347
1348    public MultiGetThread(Table table) {
1349      this.table = table;
1350    }
1351
1352    @Override
1353    public void run() {
1354      gets.add(new Get(ROW));
1355      gets.add(new Get(ROW1));
1356      try {
1357        CustomInnerRegionObserver.getCdl().set(latch);
1358        Result[] r = table.get(gets);
1359        assertTrue(Bytes.equals(r[0].getRow(), ROW));
1360        assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1361      } catch (IOException e) {
1362      }
1363    }
1364  }
1365
1366  private static class GetThread extends Thread {
1367    private final Table table;
1368    private final boolean tracker;
1369    private final boolean multipleCFs;
1370
1371    public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1372      this.table = table;
1373      this.tracker = tracker;
1374      this.multipleCFs = multipleCFs;
1375    }
1376
1377    @Override
1378    public void run() {
1379      try {
1380        initiateGet(table);
1381      } catch (IOException e) {
1382        // do nothing
1383      }
1384    }
1385
1386    private void initiateGet(Table table) throws IOException {
1387      Get get = new Get(ROW);
1388      if (tracker) {
1389        // Change this
1390        if (!multipleCFs) {
1391          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1392          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1393          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1394          // Unknown key
1395          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1396        } else {
1397          get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1398          get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1399          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1400          // Unknown key
1401          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1402        }
1403      }
1404      CustomInnerRegionObserver.getCdl().set(latch);
1405      Result r = table.get(get);
1406      System.out.println(r);
1407      if (!tracker) {
1408        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1409        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1410      } else {
1411        if (!multipleCFs) {
1412          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1413          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1414          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1415        } else {
1416          assertTrue(Bytes.equals(
1417            r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1418            data2));
1419          assertTrue(Bytes.equals(
1420            r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1421            data2));
1422          assertTrue(Bytes.equals(
1423            r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1424            data2));
1425        }
1426      }
1427    }
1428  }
1429
1430  private static class ScanThread extends Thread {
1431    private final Table table;
1432    private final boolean reverse;
1433
1434    public ScanThread(Table table, boolean reverse) {
1435      this.table = table;
1436      this.reverse = reverse;
1437    }
1438
1439    @Override
1440    public void run() {
1441      try {
1442        initiateScan(table);
1443      } catch (IOException e) {
1444        // do nothing
1445      }
1446    }
1447
1448    private void initiateScan(Table table) throws IOException {
1449      Scan scan = new Scan();
1450      if (reverse) {
1451        scan.setReversed(true);
1452      }
1453      CustomInnerRegionObserver.getCdl().set(latch);
1454      ResultScanner resScanner = table.getScanner(scan);
1455      int i = (reverse ? ROWS.length - 1 : 0);
1456      boolean resultFound = false;
1457      for (Result result : resScanner) {
1458        resultFound = true;
1459        System.out.println(result);
1460        if (!reverse) {
1461          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1462          i++;
1463        } else {
1464          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1465          i--;
1466        }
1467      }
1468      assertTrue(resultFound);
1469    }
1470  }
1471
1472  private void waitForStoreFileCount(HStore store, int count, int timeout)
1473    throws InterruptedException {
1474    long start = EnvironmentEdgeManager.currentTime();
1475    while (
1476      start + timeout > EnvironmentEdgeManager.currentTime() && store.getStorefilesCount() != count
1477    ) {
1478      Thread.sleep(100);
1479    }
1480    System.out.println("start=" + start + ", now=" + EnvironmentEdgeManager.currentTime() + ", cur="
1481      + store.getStorefilesCount());
1482    assertEquals(count, store.getStorefilesCount());
1483  }
1484
1485  private static class CustomScanner implements RegionScanner {
1486
1487    private RegionScanner delegate;
1488
1489    public CustomScanner(RegionScanner delegate) {
1490      this.delegate = delegate;
1491    }
1492
1493    @Override
1494    public boolean next(List<? super ExtendedCell> results) throws IOException {
1495      return delegate.next(results);
1496    }
1497
1498    @Override
1499    public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
1500      throws IOException {
1501      return delegate.next(result, scannerContext);
1502    }
1503
1504    @Override
1505    public boolean nextRaw(List<? super ExtendedCell> result) throws IOException {
1506      return delegate.nextRaw(result);
1507    }
1508
1509    @Override
1510    public boolean nextRaw(List<? super ExtendedCell> result, ScannerContext context)
1511      throws IOException {
1512      boolean nextRaw = delegate.nextRaw(result, context);
1513      if (compactionLatch != null && compactionLatch.getCount() > 0) {
1514        try {
1515          compactionLatch.await();
1516        } catch (InterruptedException ie) {
1517        }
1518      }
1519
1520      if (CustomInnerRegionObserver.throwException.get()) {
1521        if (exceptionLatch.getCount() > 0) {
1522          try {
1523            exceptionLatch.await();
1524          } catch (InterruptedException e) {
1525          }
1526          throw new IOException("throw exception");
1527        }
1528      }
1529      return nextRaw;
1530    }
1531
1532    @Override
1533    public Set<Path> getFilesRead() {
1534      return delegate.getFilesRead();
1535    }
1536
1537    @Override
1538    public void close() throws IOException {
1539      delegate.close();
1540    }
1541
1542    @Override
1543    public RegionInfo getRegionInfo() {
1544      return delegate.getRegionInfo();
1545    }
1546
1547    @Override
1548    public boolean isFilterDone() throws IOException {
1549      return delegate.isFilterDone();
1550    }
1551
1552    @Override
1553    public boolean reseek(byte[] row) throws IOException {
1554      return false;
1555    }
1556
1557    @Override
1558    public long getMaxResultSize() {
1559      return delegate.getMaxResultSize();
1560    }
1561
1562    @Override
1563    public long getMvccReadPoint() {
1564      return delegate.getMvccReadPoint();
1565    }
1566
1567    @Override
1568    public int getBatch() {
1569      return delegate.getBatch();
1570    }
1571  }
1572
1573  public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1574    @Override
1575    public RegionScanner postScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> e,
1576      Scan scan, RegionScanner s) throws IOException {
1577      return new CustomScanner(s);
1578    }
1579  }
1580
1581  public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1582    static final AtomicInteger countOfNext = new AtomicInteger(0);
1583    static final AtomicInteger countOfGets = new AtomicInteger(0);
1584    static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1585    static final AtomicBoolean throwException = new AtomicBoolean(false);
1586    private static final AtomicReference<CountDownLatch> cdl =
1587      new AtomicReference<>(new CountDownLatch(0));
1588
1589    @Override
1590    public Optional<RegionObserver> getRegionObserver() {
1591      return Optional.of(this);
1592    }
1593
1594    @Override
1595    public boolean postScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> e,
1596      InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1597      slowdownCode(e, false);
1598      if (getLatch != null && getLatch.getCount() > 0) {
1599        try {
1600          getLatch.await();
1601        } catch (InterruptedException e1) {
1602        }
1603      }
1604      return hasMore;
1605    }
1606
1607    @Override
1608    public void postGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get,
1609      List<Cell> results) throws IOException {
1610      slowdownCode(e, true);
1611    }
1612
1613    public static AtomicReference<CountDownLatch> getCdl() {
1614      return cdl;
1615    }
1616
1617    private void slowdownCode(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
1618      boolean isGet) {
1619      CountDownLatch latch = getCdl().get();
1620      try {
1621        System.out.println(latch.getCount() + " is the count " + isGet);
1622        if (latch.getCount() > 0) {
1623          if (isGet) {
1624            countOfGets.incrementAndGet();
1625          } else {
1626            countOfNext.incrementAndGet();
1627          }
1628          LOG.info("Waiting for the counterCountDownLatch");
1629          latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1630          if (latch.getCount() > 0) {
1631            throw new RuntimeException("Can't wait more");
1632          }
1633        }
1634      } catch (InterruptedException e1) {
1635        LOG.error(e1.toString(), e1);
1636      }
1637    }
1638  }
1639}