001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicReference;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HRegionLocation;
039import org.apache.hadoop.hbase.HTableDescriptor;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
043import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
044import org.apache.hadoop.hbase.coprocessor.ObserverContext;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
047import org.apache.hadoop.hbase.coprocessor.RegionObserver;
048import org.apache.hadoop.hbase.io.hfile.BlockCache;
049import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
050import org.apache.hadoop.hbase.io.hfile.CacheConfig;
051import org.apache.hadoop.hbase.io.hfile.CachedBlock;
052import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
054import org.apache.hadoop.hbase.regionserver.BloomType;
055import org.apache.hadoop.hbase.regionserver.HRegion;
056import org.apache.hadoop.hbase.regionserver.HStore;
057import org.apache.hadoop.hbase.regionserver.InternalScanner;
058import org.apache.hadoop.hbase.regionserver.RegionScanner;
059import org.apache.hadoop.hbase.regionserver.ScannerContext;
060import org.apache.hadoop.hbase.testclassification.ClientTests;
061import org.apache.hadoop.hbase.testclassification.LargeTests;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.junit.After;
064import org.junit.AfterClass;
065import org.junit.Before;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Rule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.junit.rules.TestName;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
075
076@Category({ LargeTests.class, ClientTests.class })
077@SuppressWarnings("deprecation")
078public class TestBlockEvictionFromClient {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082      HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class);
083
084  private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
085  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
086  static byte[][] ROWS = new byte[2][];
087  private static int NO_OF_THREADS = 3;
088  private static byte[] ROW = Bytes.toBytes("testRow");
089  private static byte[] ROW1 = Bytes.toBytes("testRow1");
090  private static byte[] ROW2 = Bytes.toBytes("testRow2");
091  private static byte[] ROW3 = Bytes.toBytes("testRow3");
092  private static byte[] FAMILY = Bytes.toBytes("testFamily");
093  private static byte[][] FAMILIES_1 = new byte[1][0];
094  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
095  private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
096  private static byte[] data = new byte[1000];
097  private static byte[] data2 = Bytes.add(data, data);
098  protected static int SLAVES = 1;
099  private static CountDownLatch latch;
100  private static CountDownLatch getLatch;
101  private static CountDownLatch compactionLatch;
102  private static CountDownLatch exceptionLatch;
103
104  @Rule
105  public TestName name = new TestName();
106
107  @BeforeClass
108  public static void setUpBeforeClass() throws Exception {
109    ROWS[0] = ROW;
110    ROWS[1] = ROW1;
111    Configuration conf = TEST_UTIL.getConfiguration();
112    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
113        MultiRowMutationEndpoint.class.getName());
114    conf.setInt("hbase.regionserver.handler.count", 20);
115    conf.setInt("hbase.bucketcache.size", 400);
116    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
117    conf.setFloat("hfile.block.cache.size", 0.2f);
118    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
119    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
120    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
121    FAMILIES_1[0] = FAMILY;
122    TEST_UTIL.startMiniCluster(SLAVES);
123  }
124
125  @AfterClass
126  public static void tearDownAfterClass() throws Exception {
127    TEST_UTIL.shutdownMiniCluster();
128  }
129
130  @Before
131  public void setUp() throws Exception {
132    CustomInnerRegionObserver.waitForGets.set(false);
133    CustomInnerRegionObserver.countOfNext.set(0);
134    CustomInnerRegionObserver.countOfGets.set(0);
135  }
136
137  @After
138  public void tearDown() throws Exception {
139    if (latch != null) {
140      while (latch.getCount() > 0) {
141        latch.countDown();
142      }
143    }
144    if (getLatch != null) {
145      getLatch.countDown();
146    }
147    if (compactionLatch != null) {
148      compactionLatch.countDown();
149    }
150    if (exceptionLatch != null) {
151      exceptionLatch.countDown();
152    }
153    latch = null;
154    getLatch = null;
155    compactionLatch = null;
156    exceptionLatch = null;
157    CustomInnerRegionObserver.throwException.set(false);
158    // Clean up the tables for every test case
159    TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
160    for (TableName tableName : listTableNames) {
161      if (!tableName.isSystemTable()) {
162        TEST_UTIL.getAdmin().disableTable(tableName);
163        TEST_UTIL.getAdmin().deleteTable(tableName);
164      }
165    }
166  }
167
168  @Test
169  public void testBlockEvictionWithParallelScans() throws Exception {
170    Table table = null;
171    try {
172      latch = new CountDownLatch(1);
173      final TableName tableName = TableName.valueOf(name.getMethodName());
174      // Create a table with block size as 1024
175      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
176          CustomInnerRegionObserver.class.getName());
177      // get the block cache and region
178      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
179      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
180      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName)
181          .getRegion(regionName);
182      HStore store = region.getStores().iterator().next();
183      CacheConfig cacheConf = store.getCacheConfig();
184      cacheConf.setCacheDataOnWrite(true);
185      cacheConf.setEvictOnClose(true);
186      BlockCache cache = cacheConf.getBlockCache().get();
187
188      // insert data. 2 Rows are added
189      Put put = new Put(ROW);
190      put.addColumn(FAMILY, QUALIFIER, data);
191      table.put(put);
192      put = new Put(ROW1);
193      put.addColumn(FAMILY, QUALIFIER, data);
194      table.put(put);
195      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
196      // data was in memstore so don't expect any changes
197      // flush the data
198      // Should create one Hfile with 2 blocks
199      region.flush(true);
200      // Load cache
201      // Create three sets of scan
202      ScanThread[] scanThreads = initiateScan(table, false);
203      Thread.sleep(100);
204      checkForBlockEviction(cache, false, false);
205      for (ScanThread thread : scanThreads) {
206        thread.join();
207      }
208      // CustomInnerRegionObserver.sleepTime.set(0);
209      Iterator<CachedBlock> iterator = cache.iterator();
210      iterateBlockCache(cache, iterator);
211      // read the data and expect same blocks, one new hit, no misses
212      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
213      iterator = cache.iterator();
214      iterateBlockCache(cache, iterator);
215      // Check how this miss is happening
216      // insert a second column, read the row, no new blocks, 3 new hits
217      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
218      byte[] data2 = Bytes.add(data, data);
219      put = new Put(ROW);
220      put.addColumn(FAMILY, QUALIFIER2, data2);
221      table.put(put);
222      Result r = table.get(new Get(ROW));
223      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
224      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
225      iterator = cache.iterator();
226      iterateBlockCache(cache, iterator);
227      // flush, one new block
228      System.out.println("Flushing cache");
229      region.flush(true);
230      iterator = cache.iterator();
231      iterateBlockCache(cache, iterator);
232      // compact, net minus two blocks, two hits, no misses
233      System.out.println("Compacting");
234      assertEquals(2, store.getStorefilesCount());
235      store.triggerMajorCompaction();
236      region.compact(true);
237      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
238      assertEquals(1, store.getStorefilesCount());
239      iterator = cache.iterator();
240      iterateBlockCache(cache, iterator);
241      // read the row, this should be a cache miss because we don't cache data
242      // blocks on compaction
243      r = table.get(new Get(ROW));
244      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
245      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
246      iterator = cache.iterator();
247      iterateBlockCache(cache, iterator);
248    } finally {
249      if (table != null) {
250        table.close();
251      }
252    }
253  }
254
255  @Test
256  public void testParallelGetsAndScans() throws IOException, InterruptedException {
257    Table table = null;
258    try {
259      latch = new CountDownLatch(2);
260      // Check if get() returns blocks on its close() itself
261      getLatch = new CountDownLatch(1);
262      final TableName tableName = TableName.valueOf(name.getMethodName());
263      // Create KV that will give you two blocks
264      // Create a table with block size as 1024
265      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
266          CustomInnerRegionObserver.class.getName());
267      // get the block cache and region
268      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
269      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
270      HRegion region =
271          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
272      HStore store = region.getStores().iterator().next();
273      CacheConfig cacheConf = store.getCacheConfig();
274      cacheConf.setCacheDataOnWrite(true);
275      cacheConf.setEvictOnClose(true);
276      BlockCache cache = cacheConf.getBlockCache().get();
277
278      insertData(table);
279      // flush the data
280      System.out.println("Flushing cache");
281      // Should create one Hfile with 2 blocks
282      region.flush(true);
283      // Create three sets of scan
284      CustomInnerRegionObserver.waitForGets.set(true);
285      ScanThread[] scanThreads = initiateScan(table, false);
286      // Create three sets of gets
287      GetThread[] getThreads = initiateGet(table, false, false);
288      checkForBlockEviction(cache, false, false);
289      CustomInnerRegionObserver.waitForGets.set(false);
290      checkForBlockEviction(cache, false, false);
291      for (GetThread thread : getThreads) {
292        thread.join();
293      }
294      // Verify whether the gets have returned the blocks that it had
295      CustomInnerRegionObserver.waitForGets.set(true);
296      // giving some time for the block to be decremented
297      checkForBlockEviction(cache, true, false);
298      getLatch.countDown();
299      for (ScanThread thread : scanThreads) {
300        thread.join();
301      }
302      System.out.println("Scans should have returned the bloks");
303      // Check with either true or false
304      CustomInnerRegionObserver.waitForGets.set(false);
305      // The scan should also have released the blocks by now
306      checkForBlockEviction(cache, true, true);
307    } finally {
308      if (table != null) {
309        table.close();
310      }
311    }
312  }
313
314  @Test
315  public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
316    Table table = null;
317    try {
318      latch = new CountDownLatch(1);
319      // Check if get() returns blocks on its close() itself
320      getLatch = new CountDownLatch(1);
321      final TableName tableName = TableName.valueOf(name.getMethodName());
322      // Create KV that will give you two blocks
323      // Create a table with block size as 1024
324      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
325          CustomInnerRegionObserver.class.getName());
326      // get the block cache and region
327      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
328      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
329      HRegion region =
330          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
331      HStore store = region.getStores().iterator().next();
332      CacheConfig cacheConf = store.getCacheConfig();
333      cacheConf.setCacheDataOnWrite(true);
334      cacheConf.setEvictOnClose(true);
335      BlockCache cache = cacheConf.getBlockCache().get();
336
337      Put put = new Put(ROW);
338      put.addColumn(FAMILY, QUALIFIER, data);
339      table.put(put);
340      region.flush(true);
341      put = new Put(ROW1);
342      put.addColumn(FAMILY, QUALIFIER, data);
343      table.put(put);
344      region.flush(true);
345      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
346      put = new Put(ROW);
347      put.addColumn(FAMILY, QUALIFIER2, data2);
348      table.put(put);
349      region.flush(true);
350      // flush the data
351      System.out.println("Flushing cache");
352      // Should create one Hfile with 2 blocks
353      CustomInnerRegionObserver.waitForGets.set(true);
354      // Create three sets of gets
355      GetThread[] getThreads = initiateGet(table, false, false);
356      Thread.sleep(200);
357      CustomInnerRegionObserver.getCdl().get().countDown();
358      for (GetThread thread : getThreads) {
359        thread.join();
360      }
361      // Verify whether the gets have returned the blocks that it had
362      CustomInnerRegionObserver.waitForGets.set(true);
363      // giving some time for the block to be decremented
364      checkForBlockEviction(cache, true, false);
365      getLatch.countDown();
366      System.out.println("Gets should have returned the bloks");
367    } finally {
368      if (table != null) {
369        table.close();
370      }
371    }
372  }
373
374  @Test
375  // TODO : check how block index works here
376  public void testGetsWithMultiColumnsAndExplicitTracker()
377      throws IOException, InterruptedException {
378    Table table = null;
379    try {
380      latch = new CountDownLatch(1);
381      // Check if get() returns blocks on its close() itself
382      getLatch = new CountDownLatch(1);
383      final TableName tableName = TableName.valueOf(name.getMethodName());
384      // Create KV that will give you two blocks
385      // Create a table with block size as 1024
386      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
387          CustomInnerRegionObserver.class.getName());
388      // get the block cache and region
389      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
390      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
391      HRegion region =
392          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
393      BlockCache cache = setCacheProperties(region);
394      Put put = new Put(ROW);
395      put.addColumn(FAMILY, QUALIFIER, data);
396      table.put(put);
397      region.flush(true);
398      put = new Put(ROW1);
399      put.addColumn(FAMILY, QUALIFIER, data);
400      table.put(put);
401      region.flush(true);
402      for (int i = 1; i < 10; i++) {
403        put = new Put(ROW);
404        put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
405        table.put(put);
406        if (i % 2 == 0) {
407          region.flush(true);
408        }
409      }
410      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
411      put = new Put(ROW);
412      put.addColumn(FAMILY, QUALIFIER2, data2);
413      table.put(put);
414      region.flush(true);
415      // flush the data
416      System.out.println("Flushing cache");
417      // Should create one Hfile with 2 blocks
418      CustomInnerRegionObserver.waitForGets.set(true);
419      // Create three sets of gets
420      GetThread[] getThreads = initiateGet(table, true, false);
421      Thread.sleep(200);
422      Iterator<CachedBlock> iterator = cache.iterator();
423      boolean usedBlocksFound = false;
424      int refCount = 0;
425      int noOfBlocksWithRef = 0;
426      while (iterator.hasNext()) {
427        CachedBlock next = iterator.next();
428        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
429        if (cache instanceof BucketCache) {
430          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
431        } else if (cache instanceof CombinedBlockCache) {
432          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
433        } else {
434          continue;
435        }
436        if (refCount != 0) {
437          // Blocks will be with count 3
438          System.out.println("The refCount is " + refCount);
439          assertEquals(NO_OF_THREADS, refCount);
440          usedBlocksFound = true;
441          noOfBlocksWithRef++;
442        }
443      }
444      assertTrue(usedBlocksFound);
445      // the number of blocks referred
446      assertEquals(10, noOfBlocksWithRef);
447      CustomInnerRegionObserver.getCdl().get().countDown();
448      for (GetThread thread : getThreads) {
449        thread.join();
450      }
451      // Verify whether the gets have returned the blocks that it had
452      CustomInnerRegionObserver.waitForGets.set(true);
453      // giving some time for the block to be decremented
454      checkForBlockEviction(cache, true, false);
455      getLatch.countDown();
456      System.out.println("Gets should have returned the bloks");
457    } finally {
458      if (table != null) {
459        table.close();
460      }
461    }
462  }
463
464  @Test
465  public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
466    Table table = null;
467    try {
468      latch = new CountDownLatch(1);
469      // Check if get() returns blocks on its close() itself
470      getLatch = new CountDownLatch(1);
471      final TableName tableName = TableName.valueOf(name.getMethodName());
472      // Create KV that will give you two blocks
473      // Create a table with block size as 1024
474      byte[][] fams = new byte[10][];
475      fams[0] = FAMILY;
476      for (int i = 1; i < 10; i++) {
477        fams[i] = (Bytes.toBytes("testFamily" + i));
478      }
479      table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
480          CustomInnerRegionObserver.class.getName());
481      // get the block cache and region
482      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
483      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
484      HRegion region =
485          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
486      BlockCache cache = setCacheProperties(region);
487
488      Put put = new Put(ROW);
489      put.addColumn(FAMILY, QUALIFIER, data);
490      table.put(put);
491      region.flush(true);
492      put = new Put(ROW1);
493      put.addColumn(FAMILY, QUALIFIER, data);
494      table.put(put);
495      region.flush(true);
496      for (int i = 1; i < 10; i++) {
497        put = new Put(ROW);
498        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
499        table.put(put);
500        if (i % 2 == 0) {
501          region.flush(true);
502        }
503      }
504      region.flush(true);
505      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
506      put = new Put(ROW);
507      put.addColumn(FAMILY, QUALIFIER2, data2);
508      table.put(put);
509      region.flush(true);
510      // flush the data
511      System.out.println("Flushing cache");
512      // Should create one Hfile with 2 blocks
513      CustomInnerRegionObserver.waitForGets.set(true);
514      // Create three sets of gets
515      GetThread[] getThreads = initiateGet(table, true, true);
516      Thread.sleep(200);
517      Iterator<CachedBlock> iterator = cache.iterator();
518      boolean usedBlocksFound = false;
519      int refCount = 0;
520      int noOfBlocksWithRef = 0;
521      while (iterator.hasNext()) {
522        CachedBlock next = iterator.next();
523        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
524        if (cache instanceof BucketCache) {
525          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
526        } else if (cache instanceof CombinedBlockCache) {
527          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
528        } else {
529          continue;
530        }
531        if (refCount != 0) {
532          // Blocks will be with count 3
533          System.out.println("The refCount is " + refCount);
534          assertEquals(NO_OF_THREADS, refCount);
535          usedBlocksFound = true;
536          noOfBlocksWithRef++;
537        }
538      }
539      assertTrue(usedBlocksFound);
540      // the number of blocks referred
541      assertEquals(3, noOfBlocksWithRef);
542      CustomInnerRegionObserver.getCdl().get().countDown();
543      for (GetThread thread : getThreads) {
544        thread.join();
545      }
546      // Verify whether the gets have returned the blocks that it had
547      CustomInnerRegionObserver.waitForGets.set(true);
548      // giving some time for the block to be decremented
549      checkForBlockEviction(cache, true, false);
550      getLatch.countDown();
551      System.out.println("Gets should have returned the bloks");
552    } finally {
553      if (table != null) {
554        table.close();
555      }
556    }
557  }
558
559  @Test
560  public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
561    Table table = null;
562    try {
563      final TableName tableName = TableName.valueOf(name.getMethodName());
564      HTableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName);
565      // This test expects rpc refcount of cached data blocks to be 0 after split. After split,
566      // two daughter regions are opened and a compaction is scheduled to get rid of reference
567      // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1.
568      // It is flakey since compaction can kick in anytime. To solve this issue, table is created
569      // with compaction disabled.
570      desc.setCompactionEnabled(false);
571      table = TEST_UTIL.createTable(desc, FAMILIES_1, null, BloomType.ROW, 1024, null);
572      // get the block cache and region
573      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
574      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
575      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
576      HStore store = region.getStores().iterator().next();
577      CacheConfig cacheConf = store.getCacheConfig();
578      cacheConf.setEvictOnClose(true);
579      BlockCache cache = cacheConf.getBlockCache().get();
580      Put put = new Put(ROW);
581      put.addColumn(FAMILY, QUALIFIER, data);
582      table.put(put);
583      region.flush(true);
584      put = new Put(ROW1);
585      put.addColumn(FAMILY, QUALIFIER, data);
586      table.put(put);
587      region.flush(true);
588      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
589      put = new Put(ROW2);
590      put.addColumn(FAMILY, QUALIFIER2, data2);
591      table.put(put);
592      put = new Put(ROW3);
593      put.addColumn(FAMILY, QUALIFIER2, data2);
594      table.put(put);
595      region.flush(true);
596      ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
597      int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
598      LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(),
599        regionCount);
600      TEST_UTIL.getAdmin().split(tableName, ROW1);
601      // Wait for splits
602      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
603      List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions();
604      for (HRegion r: regions) {
605        LOG.info("" + r.getCompactionState());
606        TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE));
607      }
608      LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache);
609      Iterator<CachedBlock> iterator = cache.iterator();
610      // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
611      // should be closed inorder to return those blocks
612      iterateBlockCache(cache, iterator);
613    } finally {
614      if (table != null) {
615        table.close();
616      }
617    }
618  }
619
620  @Test
621  public void testMultiGets() throws IOException, InterruptedException {
622    Table table = null;
623    try {
624      latch = new CountDownLatch(2);
625      // Check if get() returns blocks on its close() itself
626      getLatch = new CountDownLatch(1);
627      final TableName tableName = TableName.valueOf(name.getMethodName());
628      // Create KV that will give you two blocks
629      // Create a table with block size as 1024
630      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
631          CustomInnerRegionObserver.class.getName());
632      // get the block cache and region
633      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
634      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
635      HRegion region =
636          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
637      HStore store = region.getStores().iterator().next();
638      CacheConfig cacheConf = store.getCacheConfig();
639      cacheConf.setCacheDataOnWrite(true);
640      cacheConf.setEvictOnClose(true);
641      BlockCache cache = cacheConf.getBlockCache().get();
642
643      Put put = new Put(ROW);
644      put.addColumn(FAMILY, QUALIFIER, data);
645      table.put(put);
646      region.flush(true);
647      put = new Put(ROW1);
648      put.addColumn(FAMILY, QUALIFIER, data);
649      table.put(put);
650      region.flush(true);
651      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
652      put = new Put(ROW);
653      put.addColumn(FAMILY, QUALIFIER2, data2);
654      table.put(put);
655      region.flush(true);
656      // flush the data
657      System.out.println("Flushing cache");
658      // Should create one Hfile with 2 blocks
659      CustomInnerRegionObserver.waitForGets.set(true);
660      // Create three sets of gets
661      MultiGetThread[] getThreads = initiateMultiGet(table);
662      Thread.sleep(200);
663      int refCount;
664      Iterator<CachedBlock> iterator = cache.iterator();
665      boolean foundNonZeroBlock = false;
666      while (iterator.hasNext()) {
667        CachedBlock next = iterator.next();
668        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
669        if (cache instanceof BucketCache) {
670          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
671        } else if (cache instanceof CombinedBlockCache) {
672          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
673        } else {
674          continue;
675        }
676        if (refCount != 0) {
677          assertEquals(NO_OF_THREADS, refCount);
678          foundNonZeroBlock = true;
679        }
680      }
681      assertTrue("Should have found nonzero ref count block",foundNonZeroBlock);
682      CustomInnerRegionObserver.getCdl().get().countDown();
683      CustomInnerRegionObserver.getCdl().get().countDown();
684      for (MultiGetThread thread : getThreads) {
685        thread.join();
686      }
687      // Verify whether the gets have returned the blocks that it had
688      CustomInnerRegionObserver.waitForGets.set(true);
689      // giving some time for the block to be decremented
690      iterateBlockCache(cache, iterator);
691      getLatch.countDown();
692      System.out.println("Gets should have returned the bloks");
693    } finally {
694      if (table != null) {
695        table.close();
696      }
697    }
698  }
699  @Test
700  public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
701    Table table = null;
702    try {
703      latch = new CountDownLatch(1);
704      // Check if get() returns blocks on its close() itself
705      final TableName tableName = TableName.valueOf(name.getMethodName());
706      // Create KV that will give you two blocks
707      // Create a table with block size as 1024
708      byte[][] fams = new byte[10][];
709      fams[0] = FAMILY;
710      for (int i = 1; i < 10; i++) {
711        fams[i] = (Bytes.toBytes("testFamily" + i));
712      }
713      table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
714          CustomInnerRegionObserver.class.getName());
715      // get the block cache and region
716      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
717      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
718      HRegion region =
719          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
720      BlockCache cache = setCacheProperties(region);
721
722      Put put = new Put(ROW);
723      put.addColumn(FAMILY, QUALIFIER, data);
724      table.put(put);
725      region.flush(true);
726      put = new Put(ROW1);
727      put.addColumn(FAMILY, QUALIFIER, data);
728      table.put(put);
729      region.flush(true);
730      for (int i = 1; i < 10; i++) {
731        put = new Put(ROW);
732        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
733        table.put(put);
734        if (i % 2 == 0) {
735          region.flush(true);
736        }
737      }
738      region.flush(true);
739      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
740      put = new Put(ROW);
741      put.addColumn(FAMILY, QUALIFIER2, data2);
742      table.put(put);
743      region.flush(true);
744      // flush the data
745      System.out.println("Flushing cache");
746      // Should create one Hfile with 2 blocks
747      // Create three sets of gets
748      ScanThread[] scanThreads = initiateScan(table, true);
749      Thread.sleep(200);
750      Iterator<CachedBlock> iterator = cache.iterator();
751      boolean usedBlocksFound = false;
752      int refCount = 0;
753      int noOfBlocksWithRef = 0;
754      while (iterator.hasNext()) {
755        CachedBlock next = iterator.next();
756        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
757        if (cache instanceof BucketCache) {
758          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
759        } else if (cache instanceof CombinedBlockCache) {
760          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
761        } else {
762          continue;
763        }
764        if (refCount != 0) {
765          // Blocks will be with count 3
766          System.out.println("The refCount is " + refCount);
767          assertEquals(NO_OF_THREADS, refCount);
768          usedBlocksFound = true;
769          noOfBlocksWithRef++;
770        }
771      }
772      assertTrue(usedBlocksFound);
773      // the number of blocks referred
774      assertEquals(12, noOfBlocksWithRef);
775      CustomInnerRegionObserver.getCdl().get().countDown();
776      for (ScanThread thread : scanThreads) {
777        thread.join();
778      }
779      // giving some time for the block to be decremented
780      checkForBlockEviction(cache, true, false);
781    } finally {
782      if (table != null) {
783        table.close();
784      }
785    }
786  }
787
788  private BlockCache setCacheProperties(HRegion region) {
789    Iterator<HStore> strItr = region.getStores().iterator();
790    BlockCache cache = null;
791    while (strItr.hasNext()) {
792      HStore store = strItr.next();
793      CacheConfig cacheConf = store.getCacheConfig();
794      cacheConf.setCacheDataOnWrite(true);
795      cacheConf.setEvictOnClose(true);
796      // Use the last one
797      cache = cacheConf.getBlockCache().get();
798    }
799    return cache;
800  }
801
802  @Test
803  public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException,
804      InterruptedException {
805    Table table = null;
806    try {
807      latch = new CountDownLatch(2);
808      // Check if get() returns blocks on its close() itself
809      getLatch = new CountDownLatch(1);
810      final TableName tableName = TableName.valueOf(name.getMethodName());
811      // Create KV that will give you two blocks
812      // Create a table with block size as 1024
813      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
814          CustomInnerRegionObserverWrapper.class.getName());
815      // get the block cache and region
816      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
817      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
818      HRegion region =
819          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
820      HStore store = region.getStores().iterator().next();
821      CacheConfig cacheConf = store.getCacheConfig();
822      cacheConf.setCacheDataOnWrite(true);
823      cacheConf.setEvictOnClose(true);
824      BlockCache cache = cacheConf.getBlockCache().get();
825
826      // insert data. 2 Rows are added
827      insertData(table);
828      // flush the data
829      System.out.println("Flushing cache");
830      // Should create one Hfile with 2 blocks
831      region.flush(true);
832      // CustomInnerRegionObserver.sleepTime.set(5000);
833      // Create three sets of scan
834      CustomInnerRegionObserver.waitForGets.set(true);
835      ScanThread[] scanThreads = initiateScan(table, false);
836      // Create three sets of gets
837      GetThread[] getThreads = initiateGet(table, false, false);
838      // The block would have been decremented for the scan case as it was
839      // wrapped
840      // before even the postNext hook gets executed.
841      // giving some time for the block to be decremented
842      Thread.sleep(100);
843      CustomInnerRegionObserver.waitForGets.set(false);
844      checkForBlockEviction(cache, false, false);
845      // countdown the latch
846      CustomInnerRegionObserver.getCdl().get().countDown();
847      for (GetThread thread : getThreads) {
848        thread.join();
849      }
850      getLatch.countDown();
851      for (ScanThread thread : scanThreads) {
852        thread.join();
853      }
854    } finally {
855      if (table != null) {
856        table.close();
857      }
858    }
859  }
860
861  @Test
862  public void testScanWithCompaction() throws IOException, InterruptedException {
863    testScanWithCompactionInternals(name.getMethodName(), false);
864  }
865
866  @Test
867  public void testReverseScanWithCompaction() throws IOException, InterruptedException {
868    testScanWithCompactionInternals(name.getMethodName(), true);
869  }
870
871  private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
872      throws IOException, InterruptedException {
873    Table table = null;
874    try {
875      latch = new CountDownLatch(1);
876      compactionLatch = new CountDownLatch(1);
877      TableName tableName = TableName.valueOf(tableNameStr);
878      // Create a table with block size as 1024
879      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
880          CustomInnerRegionObserverWrapper.class.getName());
881      // get the block cache and region
882      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
883      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
884      HRegion region =
885          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
886      HStore store = region.getStores().iterator().next();
887      CacheConfig cacheConf = store.getCacheConfig();
888      cacheConf.setCacheDataOnWrite(true);
889      cacheConf.setEvictOnClose(true);
890      BlockCache cache = cacheConf.getBlockCache().get();
891
892      // insert data. 2 Rows are added
893      Put put = new Put(ROW);
894      put.addColumn(FAMILY, QUALIFIER, data);
895      table.put(put);
896      put = new Put(ROW1);
897      put.addColumn(FAMILY, QUALIFIER, data);
898      table.put(put);
899      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
900      // Should create one Hfile with 2 blocks
901      region.flush(true);
902      // read the data and expect same blocks, one new hit, no misses
903      int refCount = 0;
904      // Check how this miss is happening
905      // insert a second column, read the row, no new blocks, 3 new hits
906      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
907      byte[] data2 = Bytes.add(data, data);
908      put = new Put(ROW);
909      put.addColumn(FAMILY, QUALIFIER2, data2);
910      table.put(put);
911      // flush, one new block
912      System.out.println("Flushing cache");
913      region.flush(true);
914      Iterator<CachedBlock> iterator = cache.iterator();
915      iterateBlockCache(cache, iterator);
916      // Create three sets of scan
917      ScanThread[] scanThreads = initiateScan(table, reversed);
918      Thread.sleep(100);
919      iterator = cache.iterator();
920      boolean usedBlocksFound = false;
921      while (iterator.hasNext()) {
922        CachedBlock next = iterator.next();
923        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
924        if (cache instanceof BucketCache) {
925          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
926        } else if (cache instanceof CombinedBlockCache) {
927          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
928        } else {
929          continue;
930        }
931        if (refCount != 0) {
932          // Blocks will be with count 3
933          assertEquals(NO_OF_THREADS, refCount);
934          usedBlocksFound = true;
935        }
936      }
937      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
938      usedBlocksFound = false;
939      System.out.println("Compacting");
940      assertEquals(2, store.getStorefilesCount());
941      store.triggerMajorCompaction();
942      region.compact(true);
943      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
944      assertEquals(1, store.getStorefilesCount());
945      // Even after compaction is done we will have some blocks that cannot
946      // be evicted this is because the scan is still referencing them
947      iterator = cache.iterator();
948      while (iterator.hasNext()) {
949        CachedBlock next = iterator.next();
950        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
951        if (cache instanceof BucketCache) {
952          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
953        } else if (cache instanceof CombinedBlockCache) {
954          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
955        } else {
956          continue;
957        }
958        if (refCount != 0) {
959          // Blocks will be with count 3 as they are not yet cleared
960          assertEquals(NO_OF_THREADS, refCount);
961          usedBlocksFound = true;
962        }
963      }
964      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
965      // Should not throw exception
966      compactionLatch.countDown();
967      latch.countDown();
968      for (ScanThread thread : scanThreads) {
969        thread.join();
970      }
971      // by this time all blocks should have been evicted
972      iterator = cache.iterator();
973      iterateBlockCache(cache, iterator);
974      Result r = table.get(new Get(ROW));
975      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
976      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
977      // The gets would be working on new blocks
978      iterator = cache.iterator();
979      iterateBlockCache(cache, iterator);
980    } finally {
981      if (table != null) {
982        table.close();
983      }
984    }
985  }
986
987  @Test
988  public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
989      throws IOException, InterruptedException {
990    // do flush and scan in parallel
991    Table table = null;
992    try {
993      latch = new CountDownLatch(1);
994      compactionLatch = new CountDownLatch(1);
995      final TableName tableName = TableName.valueOf(name.getMethodName());
996      // Create a table with block size as 1024
997      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
998          CustomInnerRegionObserverWrapper.class.getName());
999      // get the block cache and region
1000      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1001      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
1002      HRegion region =
1003          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1004      HStore store = region.getStores().iterator().next();
1005      CacheConfig cacheConf = store.getCacheConfig();
1006      cacheConf.setCacheDataOnWrite(true);
1007      cacheConf.setEvictOnClose(true);
1008      BlockCache cache = cacheConf.getBlockCache().get();
1009
1010      // insert data. 2 Rows are added
1011      Put put = new Put(ROW);
1012      put.addColumn(FAMILY, QUALIFIER, data);
1013      table.put(put);
1014      put = new Put(ROW1);
1015      put.addColumn(FAMILY, QUALIFIER, data);
1016      table.put(put);
1017      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1018      // Should create one Hfile with 2 blocks
1019      region.flush(true);
1020      // read the data and expect same blocks, one new hit, no misses
1021      int refCount = 0;
1022      // Check how this miss is happening
1023      // insert a second column, read the row, no new blocks, 3 new hits
1024      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1025      byte[] data2 = Bytes.add(data, data);
1026      put = new Put(ROW);
1027      put.addColumn(FAMILY, QUALIFIER2, data2);
1028      table.put(put);
1029      // flush, one new block
1030      System.out.println("Flushing cache");
1031      region.flush(true);
1032      Iterator<CachedBlock> iterator = cache.iterator();
1033      iterateBlockCache(cache, iterator);
1034      // Create three sets of scan
1035      ScanThread[] scanThreads = initiateScan(table, false);
1036      Thread.sleep(100);
1037      iterator = cache.iterator();
1038      boolean usedBlocksFound = false;
1039      while (iterator.hasNext()) {
1040        CachedBlock next = iterator.next();
1041        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1042        if (cache instanceof BucketCache) {
1043          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1044        } else if (cache instanceof CombinedBlockCache) {
1045          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1046        } else {
1047          continue;
1048        }
1049        if (refCount != 0) {
1050          // Blocks will be with count 3
1051          assertEquals(NO_OF_THREADS, refCount);
1052          usedBlocksFound = true;
1053        }
1054      }
1055      // Make a put and do a flush
1056      QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1057      data2 = Bytes.add(data, data);
1058      put = new Put(ROW1);
1059      put.addColumn(FAMILY, QUALIFIER2, data2);
1060      table.put(put);
1061      // flush, one new block
1062      System.out.println("Flushing cache");
1063      region.flush(true);
1064      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1065      usedBlocksFound = false;
1066      System.out.println("Compacting");
1067      assertEquals(3, store.getStorefilesCount());
1068      store.triggerMajorCompaction();
1069      region.compact(true);
1070      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1071      assertEquals(1, store.getStorefilesCount());
1072      // Even after compaction is done we will have some blocks that cannot
1073      // be evicted this is because the scan is still referencing them
1074      iterator = cache.iterator();
1075      while (iterator.hasNext()) {
1076        CachedBlock next = iterator.next();
1077        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1078        if (cache instanceof BucketCache) {
1079          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1080        } else if (cache instanceof CombinedBlockCache) {
1081          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1082        } else {
1083          continue;
1084        }
1085        if (refCount != 0) {
1086          // Blocks will be with count 3 as they are not yet cleared
1087          assertEquals(NO_OF_THREADS, refCount);
1088          usedBlocksFound = true;
1089        }
1090      }
1091      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1092      // Should not throw exception
1093      compactionLatch.countDown();
1094      latch.countDown();
1095      for (ScanThread thread : scanThreads) {
1096        thread.join();
1097      }
1098      // by this time all blocks should have been evicted
1099      iterator = cache.iterator();
1100      // Since a flush and compaction happened after a scan started
1101      // we need to ensure that all the original blocks of the compacted file
1102      // is also removed.
1103      iterateBlockCache(cache, iterator);
1104      Result r = table.get(new Get(ROW));
1105      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1106      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1107      // The gets would be working on new blocks
1108      iterator = cache.iterator();
1109      iterateBlockCache(cache, iterator);
1110    } finally {
1111      if (table != null) {
1112        table.close();
1113      }
1114    }
1115  }
1116
1117
1118  @Test
1119  public void testScanWithException() throws IOException, InterruptedException {
1120    Table table = null;
1121    try {
1122      latch = new CountDownLatch(1);
1123      exceptionLatch = new CountDownLatch(1);
1124      final TableName tableName = TableName.valueOf(name.getMethodName());
1125      // Create KV that will give you two blocks
1126      // Create a table with block size as 1024
1127      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1128          CustomInnerRegionObserverWrapper.class.getName());
1129      // get the block cache and region
1130      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1131      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
1132      HRegion region =
1133          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1134      HStore store = region.getStores().iterator().next();
1135      CacheConfig cacheConf = store.getCacheConfig();
1136      cacheConf.setCacheDataOnWrite(true);
1137      cacheConf.setEvictOnClose(true);
1138      BlockCache cache = cacheConf.getBlockCache().get();
1139      // insert data. 2 Rows are added
1140      insertData(table);
1141      // flush the data
1142      System.out.println("Flushing cache");
1143      // Should create one Hfile with 2 blocks
1144      region.flush(true);
1145      // CustomInnerRegionObserver.sleepTime.set(5000);
1146      CustomInnerRegionObserver.throwException.set(true);
1147      ScanThread[] scanThreads = initiateScan(table, false);
1148      // The block would have been decremented for the scan case as it was
1149      // wrapped
1150      // before even the postNext hook gets executed.
1151      // giving some time for the block to be decremented
1152      Thread.sleep(100);
1153      Iterator<CachedBlock> iterator = cache.iterator();
1154      boolean usedBlocksFound = false;
1155      int refCount = 0;
1156      while (iterator.hasNext()) {
1157        CachedBlock next = iterator.next();
1158        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1159        if (cache instanceof BucketCache) {
1160          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1161        } else if (cache instanceof CombinedBlockCache) {
1162          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1163        } else {
1164          continue;
1165        }
1166        if (refCount != 0) {
1167          // Blocks will be with count 3
1168          assertEquals(NO_OF_THREADS, refCount);
1169          usedBlocksFound = true;
1170        }
1171      }
1172      assertTrue(usedBlocksFound);
1173      exceptionLatch.countDown();
1174      // countdown the latch
1175      CustomInnerRegionObserver.getCdl().get().countDown();
1176      for (ScanThread thread : scanThreads) {
1177        thread.join();
1178      }
1179      iterator = cache.iterator();
1180      usedBlocksFound = false;
1181      refCount = 0;
1182      while (iterator.hasNext()) {
1183        CachedBlock next = iterator.next();
1184        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1185        if (cache instanceof BucketCache) {
1186          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1187        } else if (cache instanceof CombinedBlockCache) {
1188          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1189        } else {
1190          continue;
1191        }
1192        if (refCount != 0) {
1193          // Blocks will be with count 3
1194          assertEquals(NO_OF_THREADS, refCount);
1195          usedBlocksFound = true;
1196        }
1197      }
1198      assertFalse(usedBlocksFound);
1199      // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1200      assertEquals(0, refCount);
1201    } finally {
1202      if (table != null) {
1203        table.close();
1204      }
1205    }
1206  }
1207
1208  private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1209    int refCount;
1210    while (iterator.hasNext()) {
1211      CachedBlock next = iterator.next();
1212      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1213      if (cache instanceof BucketCache) {
1214        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1215        LOG.info("BucketCache {} {}", cacheKey, refCount);
1216      } else if (cache instanceof CombinedBlockCache) {
1217        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1218        LOG.info("CombinedBlockCache {} {}", cacheKey, refCount);
1219      } else {
1220        continue;
1221      }
1222      assertEquals(0, refCount);
1223    }
1224  }
1225
1226  private void insertData(Table table) throws IOException {
1227    Put put = new Put(ROW);
1228    put.addColumn(FAMILY, QUALIFIER, data);
1229    table.put(put);
1230    put = new Put(ROW1);
1231    put.addColumn(FAMILY, QUALIFIER, data);
1232    table.put(put);
1233    byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1234    put = new Put(ROW);
1235    put.addColumn(FAMILY, QUALIFIER2, data2);
1236    table.put(put);
1237  }
1238
1239  private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException,
1240      InterruptedException {
1241    ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1242    for (int i = 0; i < NO_OF_THREADS; i++) {
1243      scanThreads[i] = new ScanThread(table, reverse);
1244    }
1245    for (ScanThread thread : scanThreads) {
1246      thread.start();
1247    }
1248    return scanThreads;
1249  }
1250
1251  private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1252      throws IOException, InterruptedException {
1253    GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1254    for (int i = 0; i < NO_OF_THREADS; i++) {
1255      getThreads[i] = new GetThread(table, tracker, multipleCFs);
1256    }
1257    for (GetThread thread : getThreads) {
1258      thread.start();
1259    }
1260    return getThreads;
1261  }
1262
1263  private MultiGetThread[] initiateMultiGet(Table table)
1264      throws IOException, InterruptedException {
1265    MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1266    for (int i = 0; i < NO_OF_THREADS; i++) {
1267      multiGetThreads[i] = new MultiGetThread(table);
1268    }
1269    for (MultiGetThread thread : multiGetThreads) {
1270      thread.start();
1271    }
1272    return multiGetThreads;
1273  }
1274
1275  private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1276      throws InterruptedException {
1277    int counter = NO_OF_THREADS;
1278    if (CustomInnerRegionObserver.waitForGets.get()) {
1279      // Because only one row is selected, it has only 2 blocks
1280      counter = counter - 1;
1281      while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1282        Thread.sleep(100);
1283      }
1284    } else {
1285      while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1286        Thread.sleep(100);
1287      }
1288    }
1289    Iterator<CachedBlock> iterator = cache.iterator();
1290    int refCount = 0;
1291    while (iterator.hasNext()) {
1292      CachedBlock next = iterator.next();
1293      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1294      if (cache instanceof BucketCache) {
1295        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1296      } else if (cache instanceof CombinedBlockCache) {
1297        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1298      } else {
1299        continue;
1300      }
1301      System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1302      if (CustomInnerRegionObserver.waitForGets.get()) {
1303        if (expectOnlyZero) {
1304          assertTrue(refCount == 0);
1305        }
1306        if (refCount != 0) {
1307          // Because the scan would have also touched up on these blocks but
1308          // it
1309          // would have touched
1310          // all 3
1311          if (getClosed) {
1312            // If get has closed only the scan's blocks would be available
1313            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1314          } else {
1315            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1316          }
1317        }
1318      } else {
1319        // Because the get would have also touched up on these blocks but it
1320        // would have touched
1321        // upon only 2 additionally
1322        if (expectOnlyZero) {
1323          assertTrue(refCount == 0);
1324        }
1325        if (refCount != 0) {
1326          if (getLatch == null) {
1327            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1328          } else {
1329            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1330          }
1331        }
1332      }
1333    }
1334    CustomInnerRegionObserver.getCdl().get().countDown();
1335  }
1336
1337  private static class MultiGetThread extends Thread {
1338    private final Table table;
1339    private final List<Get> gets = new ArrayList<>();
1340    public MultiGetThread(Table table) {
1341      this.table = table;
1342    }
1343    @Override
1344    public void run() {
1345      gets.add(new Get(ROW));
1346      gets.add(new Get(ROW1));
1347      try {
1348        CustomInnerRegionObserver.getCdl().set(latch);
1349        Result[] r = table.get(gets);
1350        assertTrue(Bytes.equals(r[0].getRow(), ROW));
1351        assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1352      } catch (IOException e) {
1353      }
1354    }
1355  }
1356
1357  private static class GetThread extends Thread {
1358    private final Table table;
1359    private final boolean tracker;
1360    private final boolean multipleCFs;
1361
1362    public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1363      this.table = table;
1364      this.tracker = tracker;
1365      this.multipleCFs = multipleCFs;
1366    }
1367
1368    @Override
1369    public void run() {
1370      try {
1371        initiateGet(table);
1372      } catch (IOException e) {
1373        // do nothing
1374      }
1375    }
1376
1377    private void initiateGet(Table table) throws IOException {
1378      Get get = new Get(ROW);
1379      if (tracker) {
1380        // Change this
1381        if (!multipleCFs) {
1382          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1383          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1384          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1385          // Unknown key
1386          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1387        } else {
1388          get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1389          get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1390          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1391          // Unknown key
1392          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1393        }
1394      }
1395      CustomInnerRegionObserver.getCdl().set(latch);
1396      Result r = table.get(get);
1397      System.out.println(r);
1398      if (!tracker) {
1399        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1400        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1401      } else {
1402        if (!multipleCFs) {
1403          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1404          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1405          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1406        } else {
1407          assertTrue(Bytes.equals(
1408              r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1409              data2));
1410          assertTrue(Bytes.equals(
1411              r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1412              data2));
1413          assertTrue(Bytes.equals(
1414              r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1415              data2));
1416        }
1417      }
1418    }
1419  }
1420
1421  private static class ScanThread extends Thread {
1422    private final Table table;
1423    private final boolean reverse;
1424
1425    public ScanThread(Table table, boolean reverse) {
1426      this.table = table;
1427      this.reverse = reverse;
1428    }
1429
1430    @Override
1431    public void run() {
1432      try {
1433        initiateScan(table);
1434      } catch (IOException e) {
1435        // do nothing
1436      }
1437    }
1438
1439    private void initiateScan(Table table) throws IOException {
1440      Scan scan = new Scan();
1441      if (reverse) {
1442        scan.setReversed(true);
1443      }
1444      CustomInnerRegionObserver.getCdl().set(latch);
1445      ResultScanner resScanner = table.getScanner(scan);
1446      int i = (reverse ? ROWS.length - 1 : 0);
1447      boolean resultFound = false;
1448      for (Result result : resScanner) {
1449        resultFound = true;
1450        System.out.println(result);
1451        if (!reverse) {
1452          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1453          i++;
1454        } else {
1455          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1456          i--;
1457        }
1458      }
1459      assertTrue(resultFound);
1460    }
1461  }
1462
1463  private void waitForStoreFileCount(HStore store, int count, int timeout)
1464      throws InterruptedException {
1465    long start = System.currentTimeMillis();
1466    while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) {
1467      Thread.sleep(100);
1468    }
1469    System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" +
1470        store.getStorefilesCount());
1471    assertEquals(count, store.getStorefilesCount());
1472  }
1473
1474  private static class CustomScanner implements RegionScanner {
1475
1476    private RegionScanner delegate;
1477
1478    public CustomScanner(RegionScanner delegate) {
1479      this.delegate = delegate;
1480    }
1481
1482    @Override
1483    public boolean next(List<Cell> results) throws IOException {
1484      return delegate.next(results);
1485    }
1486
1487    @Override
1488    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
1489      return delegate.next(result, scannerContext);
1490    }
1491
1492    @Override
1493    public boolean nextRaw(List<Cell> result) throws IOException {
1494      return delegate.nextRaw(result);
1495    }
1496
1497    @Override
1498    public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException {
1499      boolean nextRaw = delegate.nextRaw(result, context);
1500      if (compactionLatch != null && compactionLatch.getCount() > 0) {
1501        try {
1502          compactionLatch.await();
1503        } catch (InterruptedException ie) {
1504        }
1505      }
1506
1507      if (CustomInnerRegionObserver.throwException.get()) {
1508        if (exceptionLatch.getCount() > 0) {
1509          try {
1510            exceptionLatch.await();
1511          } catch (InterruptedException e) {
1512          }
1513          throw new IOException("throw exception");
1514        }
1515      }
1516      return nextRaw;
1517    }
1518
1519    @Override
1520    public void close() throws IOException {
1521      delegate.close();
1522    }
1523
1524    @Override
1525    public RegionInfo getRegionInfo() {
1526      return delegate.getRegionInfo();
1527    }
1528
1529    @Override
1530    public boolean isFilterDone() throws IOException {
1531      return delegate.isFilterDone();
1532    }
1533
1534    @Override
1535    public boolean reseek(byte[] row) throws IOException {
1536      return false;
1537    }
1538
1539    @Override
1540    public long getMaxResultSize() {
1541      return delegate.getMaxResultSize();
1542    }
1543
1544    @Override
1545    public long getMvccReadPoint() {
1546      return delegate.getMvccReadPoint();
1547    }
1548
1549    @Override
1550    public int getBatch() {
1551      return delegate.getBatch();
1552    }
1553  }
1554
1555  public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1556    @Override
1557    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
1558        Scan scan, RegionScanner s) throws IOException {
1559      return new CustomScanner(s);
1560    }
1561  }
1562
1563  public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1564    static final AtomicInteger countOfNext = new AtomicInteger(0);
1565    static final AtomicInteger countOfGets = new AtomicInteger(0);
1566    static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1567    static final AtomicBoolean throwException = new AtomicBoolean(false);
1568    private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(
1569        new CountDownLatch(0));
1570
1571    @Override
1572    public Optional<RegionObserver> getRegionObserver() {
1573      return Optional.of(this);
1574    }
1575
1576    @Override
1577    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
1578        InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1579      slowdownCode(e, false);
1580      if (getLatch != null && getLatch.getCount() > 0) {
1581        try {
1582          getLatch.await();
1583        } catch (InterruptedException e1) {
1584        }
1585      }
1586      return hasMore;
1587    }
1588
1589    @Override
1590    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
1591        List<Cell> results) throws IOException {
1592      slowdownCode(e, true);
1593    }
1594
1595    public static AtomicReference<CountDownLatch> getCdl() {
1596      return cdl;
1597    }
1598
1599    private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e,
1600        boolean isGet) {
1601      CountDownLatch latch = getCdl().get();
1602      try {
1603        System.out.println(latch.getCount() + " is the count " + isGet);
1604        if (latch.getCount() > 0) {
1605          if (isGet) {
1606            countOfGets.incrementAndGet();
1607          } else {
1608            countOfNext.incrementAndGet();
1609          }
1610          LOG.info("Waiting for the counterCountDownLatch");
1611          latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1612          if (latch.getCount() > 0) {
1613            throw new RuntimeException("Can't wait more");
1614          }
1615        }
1616      } catch (InterruptedException e1) {
1617        LOG.error(e1.toString(), e1);
1618      }
1619    }
1620  }
1621}