001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicLong;
034import java.util.concurrent.atomic.AtomicReference;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
043import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
044import org.apache.hadoop.hbase.coprocessor.ObserverContext;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
047import org.apache.hadoop.hbase.coprocessor.RegionObserver;
048import org.apache.hadoop.hbase.io.hfile.BlockCache;
049import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
050import org.apache.hadoop.hbase.io.hfile.CacheConfig;
051import org.apache.hadoop.hbase.io.hfile.CachedBlock;
052import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
054import org.apache.hadoop.hbase.regionserver.HRegion;
055import org.apache.hadoop.hbase.regionserver.HStore;
056import org.apache.hadoop.hbase.regionserver.InternalScanner;
057import org.apache.hadoop.hbase.regionserver.RegionScanner;
058import org.apache.hadoop.hbase.regionserver.ScannerContext;
059import org.apache.hadoop.hbase.testclassification.ClientTests;
060import org.apache.hadoop.hbase.testclassification.LargeTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.junit.After;
063import org.junit.AfterClass;
064import org.junit.Before;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.rules.TestName;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
075
076@Category({ LargeTests.class, ClientTests.class })
077@SuppressWarnings("deprecation")
078public class TestBlockEvictionFromClient {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082      HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class);
083
084  private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
085  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
086  static byte[][] ROWS = new byte[2][];
087  private static int NO_OF_THREADS = 3;
088  private static byte[] ROW = Bytes.toBytes("testRow");
089  private static byte[] ROW1 = Bytes.toBytes("testRow1");
090  private static byte[] ROW2 = Bytes.toBytes("testRow2");
091  private static byte[] ROW3 = Bytes.toBytes("testRow3");
092  private static byte[] FAMILY = Bytes.toBytes("testFamily");
093  private static byte[][] FAMILIES_1 = new byte[1][0];
094  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
095  private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
096  private static byte[] data = new byte[1000];
097  private static byte[] data2 = Bytes.add(data, data);
098  protected static int SLAVES = 1;
099  private static CountDownLatch latch;
100  private static CountDownLatch getLatch;
101  private static CountDownLatch compactionLatch;
102  private static CountDownLatch exceptionLatch;
103
104  @Rule
105  public TestName name = new TestName();
106
107  /**
108   * @throws java.lang.Exception
109   */
110  @BeforeClass
111  public static void setUpBeforeClass() throws Exception {
112    ROWS[0] = ROW;
113    ROWS[1] = ROW1;
114    Configuration conf = TEST_UTIL.getConfiguration();
115    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
116        MultiRowMutationEndpoint.class.getName());
117    conf.setBoolean("hbase.table.sanity.checks", true); // enable for below
118                                                        // tests
119    conf.setInt("hbase.regionserver.handler.count", 20);
120    conf.setInt("hbase.bucketcache.size", 400);
121    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
122    conf.setFloat("hfile.block.cache.size", 0.2f);
123    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
124    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
125    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
126    FAMILIES_1[0] = FAMILY;
127    TEST_UTIL.startMiniCluster(SLAVES);
128  }
129
130  /**
131   * @throws java.lang.Exception
132   */
133  @AfterClass
134  public static void tearDownAfterClass() throws Exception {
135    TEST_UTIL.shutdownMiniCluster();
136  }
137
138  /**
139   * @throws java.lang.Exception
140   */
141  @Before
142  public void setUp() throws Exception {
143    CustomInnerRegionObserver.waitForGets.set(false);
144    CustomInnerRegionObserver.countOfNext.set(0);
145    CustomInnerRegionObserver.countOfGets.set(0);
146  }
147
148  /**
149   * @throws java.lang.Exception
150   */
151  @After
152  public void tearDown() throws Exception {
153    if (latch != null) {
154      while (latch.getCount() > 0) {
155        latch.countDown();
156      }
157    }
158    if (getLatch != null) {
159      getLatch.countDown();
160    }
161    if (compactionLatch != null) {
162      compactionLatch.countDown();
163    }
164    if (exceptionLatch != null) {
165      exceptionLatch.countDown();
166    }
167    latch = null;
168    getLatch = null;
169    compactionLatch = null;
170    exceptionLatch = null;
171    CustomInnerRegionObserver.throwException.set(false);
172    // Clean up the tables for every test case
173    TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
174    for (TableName tableName : listTableNames) {
175      if (!tableName.isSystemTable()) {
176        TEST_UTIL.getAdmin().disableTable(tableName);
177        TEST_UTIL.getAdmin().deleteTable(tableName);
178      }
179    }
180  }
181
182  @Test
183  public void testBlockEvictionWithParallelScans() throws Exception {
184    Table table = null;
185    try {
186      latch = new CountDownLatch(1);
187      final TableName tableName = TableName.valueOf(name.getMethodName());
188      // Create a table with block size as 1024
189      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
190          CustomInnerRegionObserver.class.getName());
191      // get the block cache and region
192      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
193      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
194      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName)
195          .getRegion(regionName);
196      HStore store = region.getStores().iterator().next();
197      CacheConfig cacheConf = store.getCacheConfig();
198      cacheConf.setCacheDataOnWrite(true);
199      cacheConf.setEvictOnClose(true);
200      BlockCache cache = cacheConf.getBlockCache();
201
202      // insert data. 2 Rows are added
203      Put put = new Put(ROW);
204      put.addColumn(FAMILY, QUALIFIER, data);
205      table.put(put);
206      put = new Put(ROW1);
207      put.addColumn(FAMILY, QUALIFIER, data);
208      table.put(put);
209      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
210      // data was in memstore so don't expect any changes
211      // flush the data
212      // Should create one Hfile with 2 blocks
213      region.flush(true);
214      // Load cache
215      // Create three sets of scan
216      ScanThread[] scanThreads = initiateScan(table, false);
217      Thread.sleep(100);
218      checkForBlockEviction(cache, false, false);
219      for (ScanThread thread : scanThreads) {
220        thread.join();
221      }
222      // CustomInnerRegionObserver.sleepTime.set(0);
223      Iterator<CachedBlock> iterator = cache.iterator();
224      iterateBlockCache(cache, iterator);
225      // read the data and expect same blocks, one new hit, no misses
226      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
227      iterator = cache.iterator();
228      iterateBlockCache(cache, iterator);
229      // Check how this miss is happening
230      // insert a second column, read the row, no new blocks, 3 new hits
231      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
232      byte[] data2 = Bytes.add(data, data);
233      put = new Put(ROW);
234      put.addColumn(FAMILY, QUALIFIER2, data2);
235      table.put(put);
236      Result r = table.get(new Get(ROW));
237      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
238      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
239      iterator = cache.iterator();
240      iterateBlockCache(cache, iterator);
241      // flush, one new block
242      System.out.println("Flushing cache");
243      region.flush(true);
244      iterator = cache.iterator();
245      iterateBlockCache(cache, iterator);
246      // compact, net minus two blocks, two hits, no misses
247      System.out.println("Compacting");
248      assertEquals(2, store.getStorefilesCount());
249      store.triggerMajorCompaction();
250      region.compact(true);
251      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
252      assertEquals(1, store.getStorefilesCount());
253      iterator = cache.iterator();
254      iterateBlockCache(cache, iterator);
255      // read the row, this should be a cache miss because we don't cache data
256      // blocks on compaction
257      r = table.get(new Get(ROW));
258      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
259      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
260      iterator = cache.iterator();
261      iterateBlockCache(cache, iterator);
262    } finally {
263      if (table != null) {
264        table.close();
265      }
266    }
267  }
268
269  @Test
270  public void testParallelGetsAndScans() throws IOException, InterruptedException {
271    Table table = null;
272    try {
273      latch = new CountDownLatch(2);
274      // Check if get() returns blocks on its close() itself
275      getLatch = new CountDownLatch(1);
276      final TableName tableName = TableName.valueOf(name.getMethodName());
277      // Create KV that will give you two blocks
278      // Create a table with block size as 1024
279      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
280          CustomInnerRegionObserver.class.getName());
281      // get the block cache and region
282      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
283      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
284      HRegion region =
285          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
286      HStore store = region.getStores().iterator().next();
287      CacheConfig cacheConf = store.getCacheConfig();
288      cacheConf.setCacheDataOnWrite(true);
289      cacheConf.setEvictOnClose(true);
290      BlockCache cache = cacheConf.getBlockCache();
291
292      insertData(table);
293      // flush the data
294      System.out.println("Flushing cache");
295      // Should create one Hfile with 2 blocks
296      region.flush(true);
297      // Create three sets of scan
298      CustomInnerRegionObserver.waitForGets.set(true);
299      ScanThread[] scanThreads = initiateScan(table, false);
300      // Create three sets of gets
301      GetThread[] getThreads = initiateGet(table, false, false);
302      checkForBlockEviction(cache, false, false);
303      CustomInnerRegionObserver.waitForGets.set(false);
304      checkForBlockEviction(cache, false, false);
305      for (GetThread thread : getThreads) {
306        thread.join();
307      }
308      // Verify whether the gets have returned the blocks that it had
309      CustomInnerRegionObserver.waitForGets.set(true);
310      // giving some time for the block to be decremented
311      checkForBlockEviction(cache, true, false);
312      getLatch.countDown();
313      for (ScanThread thread : scanThreads) {
314        thread.join();
315      }
316      System.out.println("Scans should have returned the bloks");
317      // Check with either true or false
318      CustomInnerRegionObserver.waitForGets.set(false);
319      // The scan should also have released the blocks by now
320      checkForBlockEviction(cache, true, true);
321    } finally {
322      if (table != null) {
323        table.close();
324      }
325    }
326  }
327
328  @Test
329  public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
330    Table table = null;
331    try {
332      latch = new CountDownLatch(1);
333      // Check if get() returns blocks on its close() itself
334      getLatch = new CountDownLatch(1);
335      final TableName tableName = TableName.valueOf(name.getMethodName());
336      // Create KV that will give you two blocks
337      // Create a table with block size as 1024
338      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
339          CustomInnerRegionObserver.class.getName());
340      // get the block cache and region
341      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
342      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
343      HRegion region =
344          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
345      HStore store = region.getStores().iterator().next();
346      CacheConfig cacheConf = store.getCacheConfig();
347      cacheConf.setCacheDataOnWrite(true);
348      cacheConf.setEvictOnClose(true);
349      BlockCache cache = cacheConf.getBlockCache();
350
351      Put put = new Put(ROW);
352      put.addColumn(FAMILY, QUALIFIER, data);
353      table.put(put);
354      region.flush(true);
355      put = new Put(ROW1);
356      put.addColumn(FAMILY, QUALIFIER, data);
357      table.put(put);
358      region.flush(true);
359      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
360      put = new Put(ROW);
361      put.addColumn(FAMILY, QUALIFIER2, data2);
362      table.put(put);
363      region.flush(true);
364      // flush the data
365      System.out.println("Flushing cache");
366      // Should create one Hfile with 2 blocks
367      CustomInnerRegionObserver.waitForGets.set(true);
368      // Create three sets of gets
369      GetThread[] getThreads = initiateGet(table, false, false);
370      Thread.sleep(200);
371      CustomInnerRegionObserver.getCdl().get().countDown();
372      for (GetThread thread : getThreads) {
373        thread.join();
374      }
375      // Verify whether the gets have returned the blocks that it had
376      CustomInnerRegionObserver.waitForGets.set(true);
377      // giving some time for the block to be decremented
378      checkForBlockEviction(cache, true, false);
379      getLatch.countDown();
380      System.out.println("Gets should have returned the bloks");
381    } finally {
382      if (table != null) {
383        table.close();
384      }
385    }
386  }
387
388  @Test
389  // TODO : check how block index works here
390  public void testGetsWithMultiColumnsAndExplicitTracker()
391      throws IOException, InterruptedException {
392    Table table = null;
393    try {
394      latch = new CountDownLatch(1);
395      // Check if get() returns blocks on its close() itself
396      getLatch = new CountDownLatch(1);
397      final TableName tableName = TableName.valueOf(name.getMethodName());
398      // Create KV that will give you two blocks
399      // Create a table with block size as 1024
400      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
401          CustomInnerRegionObserver.class.getName());
402      // get the block cache and region
403      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
404      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
405      HRegion region =
406          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
407      BlockCache cache = setCacheProperties(region);
408      Put put = new Put(ROW);
409      put.addColumn(FAMILY, QUALIFIER, data);
410      table.put(put);
411      region.flush(true);
412      put = new Put(ROW1);
413      put.addColumn(FAMILY, QUALIFIER, data);
414      table.put(put);
415      region.flush(true);
416      for (int i = 1; i < 10; i++) {
417        put = new Put(ROW);
418        put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
419        table.put(put);
420        if (i % 2 == 0) {
421          region.flush(true);
422        }
423      }
424      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
425      put = new Put(ROW);
426      put.addColumn(FAMILY, QUALIFIER2, data2);
427      table.put(put);
428      region.flush(true);
429      // flush the data
430      System.out.println("Flushing cache");
431      // Should create one Hfile with 2 blocks
432      CustomInnerRegionObserver.waitForGets.set(true);
433      // Create three sets of gets
434      GetThread[] getThreads = initiateGet(table, true, false);
435      Thread.sleep(200);
436      Iterator<CachedBlock> iterator = cache.iterator();
437      boolean usedBlocksFound = false;
438      int refCount = 0;
439      int noOfBlocksWithRef = 0;
440      while (iterator.hasNext()) {
441        CachedBlock next = iterator.next();
442        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
443        if (cache instanceof BucketCache) {
444          refCount = ((BucketCache) cache).getRefCount(cacheKey);
445        } else if (cache instanceof CombinedBlockCache) {
446          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
447        } else {
448          continue;
449        }
450        if (refCount != 0) {
451          // Blocks will be with count 3
452          System.out.println("The refCount is " + refCount);
453          assertEquals(NO_OF_THREADS, refCount);
454          usedBlocksFound = true;
455          noOfBlocksWithRef++;
456        }
457      }
458      assertTrue(usedBlocksFound);
459      // the number of blocks referred
460      assertEquals(10, noOfBlocksWithRef);
461      CustomInnerRegionObserver.getCdl().get().countDown();
462      for (GetThread thread : getThreads) {
463        thread.join();
464      }
465      // Verify whether the gets have returned the blocks that it had
466      CustomInnerRegionObserver.waitForGets.set(true);
467      // giving some time for the block to be decremented
468      checkForBlockEviction(cache, true, false);
469      getLatch.countDown();
470      System.out.println("Gets should have returned the bloks");
471    } finally {
472      if (table != null) {
473        table.close();
474      }
475    }
476  }
477
478  @Test
479  public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
480    Table table = null;
481    try {
482      latch = new CountDownLatch(1);
483      // Check if get() returns blocks on its close() itself
484      getLatch = new CountDownLatch(1);
485      final TableName tableName = TableName.valueOf(name.getMethodName());
486      // Create KV that will give you two blocks
487      // Create a table with block size as 1024
488      byte[][] fams = new byte[10][];
489      fams[0] = FAMILY;
490      for (int i = 1; i < 10; i++) {
491        fams[i] = (Bytes.toBytes("testFamily" + i));
492      }
493      table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
494          CustomInnerRegionObserver.class.getName());
495      // get the block cache and region
496      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
497      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
498      HRegion region =
499          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
500      BlockCache cache = setCacheProperties(region);
501
502      Put put = new Put(ROW);
503      put.addColumn(FAMILY, QUALIFIER, data);
504      table.put(put);
505      region.flush(true);
506      put = new Put(ROW1);
507      put.addColumn(FAMILY, QUALIFIER, data);
508      table.put(put);
509      region.flush(true);
510      for (int i = 1; i < 10; i++) {
511        put = new Put(ROW);
512        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
513        table.put(put);
514        if (i % 2 == 0) {
515          region.flush(true);
516        }
517      }
518      region.flush(true);
519      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
520      put = new Put(ROW);
521      put.addColumn(FAMILY, QUALIFIER2, data2);
522      table.put(put);
523      region.flush(true);
524      // flush the data
525      System.out.println("Flushing cache");
526      // Should create one Hfile with 2 blocks
527      CustomInnerRegionObserver.waitForGets.set(true);
528      // Create three sets of gets
529      GetThread[] getThreads = initiateGet(table, true, true);
530      Thread.sleep(200);
531      Iterator<CachedBlock> iterator = cache.iterator();
532      boolean usedBlocksFound = false;
533      int refCount = 0;
534      int noOfBlocksWithRef = 0;
535      while (iterator.hasNext()) {
536        CachedBlock next = iterator.next();
537        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
538        if (cache instanceof BucketCache) {
539          refCount = ((BucketCache) cache).getRefCount(cacheKey);
540        } else if (cache instanceof CombinedBlockCache) {
541          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
542        } else {
543          continue;
544        }
545        if (refCount != 0) {
546          // Blocks will be with count 3
547          System.out.println("The refCount is " + refCount);
548          assertEquals(NO_OF_THREADS, refCount);
549          usedBlocksFound = true;
550          noOfBlocksWithRef++;
551        }
552      }
553      assertTrue(usedBlocksFound);
554      // the number of blocks referred
555      assertEquals(3, noOfBlocksWithRef);
556      CustomInnerRegionObserver.getCdl().get().countDown();
557      for (GetThread thread : getThreads) {
558        thread.join();
559      }
560      // Verify whether the gets have returned the blocks that it had
561      CustomInnerRegionObserver.waitForGets.set(true);
562      // giving some time for the block to be decremented
563      checkForBlockEviction(cache, true, false);
564      getLatch.countDown();
565      System.out.println("Gets should have returned the bloks");
566    } finally {
567      if (table != null) {
568        table.close();
569      }
570    }
571  }
572
573  @Test
574  public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
575    Table table = null;
576    try {
577      final TableName tableName = TableName.valueOf(name.getMethodName());
578      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024);
579      // get the block cache and region
580      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
581      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
582      HRegion region =
583          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
584      HStore store = region.getStores().iterator().next();
585      CacheConfig cacheConf = store.getCacheConfig();
586      cacheConf.setEvictOnClose(true);
587      BlockCache cache = cacheConf.getBlockCache();
588
589      Put put = new Put(ROW);
590      put.addColumn(FAMILY, QUALIFIER, data);
591      table.put(put);
592      region.flush(true);
593      put = new Put(ROW1);
594      put.addColumn(FAMILY, QUALIFIER, data);
595      table.put(put);
596      region.flush(true);
597      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
598      put = new Put(ROW2);
599      put.addColumn(FAMILY, QUALIFIER2, data2);
600      table.put(put);
601      put = new Put(ROW3);
602      put.addColumn(FAMILY, QUALIFIER2, data2);
603      table.put(put);
604      region.flush(true);
605      ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
606      int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
607      LOG.info("About to SPLIT on " + Bytes.toString(ROW1));
608      TEST_UTIL.getAdmin().split(tableName, ROW1);
609      // Wait for splits
610      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
611      region.compact(true);
612      Iterator<CachedBlock> iterator = cache.iterator();
613      // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
614      // should be closed inorder to return those blocks
615      iterateBlockCache(cache, iterator);
616    } finally {
617      if (table != null) {
618        table.close();
619      }
620    }
621  }
622
623  @Test
624  public void testMultiGets() throws IOException, InterruptedException {
625    Table table = null;
626    try {
627      latch = new CountDownLatch(2);
628      // Check if get() returns blocks on its close() itself
629      getLatch = new CountDownLatch(1);
630      final TableName tableName = TableName.valueOf(name.getMethodName());
631      // Create KV that will give you two blocks
632      // Create a table with block size as 1024
633      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
634          CustomInnerRegionObserver.class.getName());
635      // get the block cache and region
636      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
637      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
638      HRegion region =
639          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
640      HStore store = region.getStores().iterator().next();
641      CacheConfig cacheConf = store.getCacheConfig();
642      cacheConf.setCacheDataOnWrite(true);
643      cacheConf.setEvictOnClose(true);
644      BlockCache cache = cacheConf.getBlockCache();
645
646      Put put = new Put(ROW);
647      put.addColumn(FAMILY, QUALIFIER, data);
648      table.put(put);
649      region.flush(true);
650      put = new Put(ROW1);
651      put.addColumn(FAMILY, QUALIFIER, data);
652      table.put(put);
653      region.flush(true);
654      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
655      put = new Put(ROW);
656      put.addColumn(FAMILY, QUALIFIER2, data2);
657      table.put(put);
658      region.flush(true);
659      // flush the data
660      System.out.println("Flushing cache");
661      // Should create one Hfile with 2 blocks
662      CustomInnerRegionObserver.waitForGets.set(true);
663      // Create three sets of gets
664      MultiGetThread[] getThreads = initiateMultiGet(table);
665      Thread.sleep(200);
666      int refCount;
667      Iterator<CachedBlock> iterator = cache.iterator();
668      boolean foundNonZeroBlock = false;
669      while (iterator.hasNext()) {
670        CachedBlock next = iterator.next();
671        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
672        if (cache instanceof BucketCache) {
673          refCount = ((BucketCache) cache).getRefCount(cacheKey);
674        } else if (cache instanceof CombinedBlockCache) {
675          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
676        } else {
677          continue;
678        }
679        if (refCount != 0) {
680          assertEquals(NO_OF_THREADS, refCount);
681          foundNonZeroBlock = true;
682        }
683      }
684      assertTrue("Should have found nonzero ref count block",foundNonZeroBlock);
685      CustomInnerRegionObserver.getCdl().get().countDown();
686      CustomInnerRegionObserver.getCdl().get().countDown();
687      for (MultiGetThread thread : getThreads) {
688        thread.join();
689      }
690      // Verify whether the gets have returned the blocks that it had
691      CustomInnerRegionObserver.waitForGets.set(true);
692      // giving some time for the block to be decremented
693      iterateBlockCache(cache, iterator);
694      getLatch.countDown();
695      System.out.println("Gets should have returned the bloks");
696    } finally {
697      if (table != null) {
698        table.close();
699      }
700    }
701  }
702  @Test
703  public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
704    Table table = null;
705    try {
706      latch = new CountDownLatch(1);
707      // Check if get() returns blocks on its close() itself
708      final TableName tableName = TableName.valueOf(name.getMethodName());
709      // Create KV that will give you two blocks
710      // Create a table with block size as 1024
711      byte[][] fams = new byte[10][];
712      fams[0] = FAMILY;
713      for (int i = 1; i < 10; i++) {
714        fams[i] = (Bytes.toBytes("testFamily" + i));
715      }
716      table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
717          CustomInnerRegionObserver.class.getName());
718      // get the block cache and region
719      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
720      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
721      HRegion region =
722          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
723      BlockCache cache = setCacheProperties(region);
724
725      Put put = new Put(ROW);
726      put.addColumn(FAMILY, QUALIFIER, data);
727      table.put(put);
728      region.flush(true);
729      put = new Put(ROW1);
730      put.addColumn(FAMILY, QUALIFIER, data);
731      table.put(put);
732      region.flush(true);
733      for (int i = 1; i < 10; i++) {
734        put = new Put(ROW);
735        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
736        table.put(put);
737        if (i % 2 == 0) {
738          region.flush(true);
739        }
740      }
741      region.flush(true);
742      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
743      put = new Put(ROW);
744      put.addColumn(FAMILY, QUALIFIER2, data2);
745      table.put(put);
746      region.flush(true);
747      // flush the data
748      System.out.println("Flushing cache");
749      // Should create one Hfile with 2 blocks
750      // Create three sets of gets
751      ScanThread[] scanThreads = initiateScan(table, true);
752      Thread.sleep(200);
753      Iterator<CachedBlock> iterator = cache.iterator();
754      boolean usedBlocksFound = false;
755      int refCount = 0;
756      int noOfBlocksWithRef = 0;
757      while (iterator.hasNext()) {
758        CachedBlock next = iterator.next();
759        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
760        if (cache instanceof BucketCache) {
761          refCount = ((BucketCache) cache).getRefCount(cacheKey);
762        } else if (cache instanceof CombinedBlockCache) {
763          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
764        } else {
765          continue;
766        }
767        if (refCount != 0) {
768          // Blocks will be with count 3
769          System.out.println("The refCount is " + refCount);
770          assertEquals(NO_OF_THREADS, refCount);
771          usedBlocksFound = true;
772          noOfBlocksWithRef++;
773        }
774      }
775      assertTrue(usedBlocksFound);
776      // the number of blocks referred
777      assertEquals(12, noOfBlocksWithRef);
778      CustomInnerRegionObserver.getCdl().get().countDown();
779      for (ScanThread thread : scanThreads) {
780        thread.join();
781      }
782      // giving some time for the block to be decremented
783      checkForBlockEviction(cache, true, false);
784    } finally {
785      if (table != null) {
786        table.close();
787      }
788    }
789  }
790
791  private BlockCache setCacheProperties(HRegion region) {
792    Iterator<HStore> strItr = region.getStores().iterator();
793    BlockCache cache = null;
794    while (strItr.hasNext()) {
795      HStore store = strItr.next();
796      CacheConfig cacheConf = store.getCacheConfig();
797      cacheConf.setCacheDataOnWrite(true);
798      cacheConf.setEvictOnClose(true);
799      // Use the last one
800      cache = cacheConf.getBlockCache();
801    }
802    return cache;
803  }
804
805  @Test
806  public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException,
807      InterruptedException {
808    Table table = null;
809    try {
810      latch = new CountDownLatch(2);
811      // Check if get() returns blocks on its close() itself
812      getLatch = new CountDownLatch(1);
813      final TableName tableName = TableName.valueOf(name.getMethodName());
814      // Create KV that will give you two blocks
815      // Create a table with block size as 1024
816      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
817          CustomInnerRegionObserverWrapper.class.getName());
818      // get the block cache and region
819      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
820      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
821      HRegion region =
822          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
823      HStore store = region.getStores().iterator().next();
824      CacheConfig cacheConf = store.getCacheConfig();
825      cacheConf.setCacheDataOnWrite(true);
826      cacheConf.setEvictOnClose(true);
827      BlockCache cache = cacheConf.getBlockCache();
828
829      // insert data. 2 Rows are added
830      insertData(table);
831      // flush the data
832      System.out.println("Flushing cache");
833      // Should create one Hfile with 2 blocks
834      region.flush(true);
835      // CustomInnerRegionObserver.sleepTime.set(5000);
836      // Create three sets of scan
837      CustomInnerRegionObserver.waitForGets.set(true);
838      ScanThread[] scanThreads = initiateScan(table, false);
839      // Create three sets of gets
840      GetThread[] getThreads = initiateGet(table, false, false);
841      // The block would have been decremented for the scan case as it was
842      // wrapped
843      // before even the postNext hook gets executed.
844      // giving some time for the block to be decremented
845      Thread.sleep(100);
846      CustomInnerRegionObserver.waitForGets.set(false);
847      checkForBlockEviction(cache, false, false);
848      // countdown the latch
849      CustomInnerRegionObserver.getCdl().get().countDown();
850      for (GetThread thread : getThreads) {
851        thread.join();
852      }
853      getLatch.countDown();
854      for (ScanThread thread : scanThreads) {
855        thread.join();
856      }
857    } finally {
858      if (table != null) {
859        table.close();
860      }
861    }
862  }
863
864  @Test
865  public void testScanWithCompaction() throws IOException, InterruptedException {
866    testScanWithCompactionInternals(name.getMethodName(), false);
867  }
868
869  @Test
870  public void testReverseScanWithCompaction() throws IOException, InterruptedException {
871    testScanWithCompactionInternals(name.getMethodName(), true);
872  }
873
874  private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
875      throws IOException, InterruptedException {
876    Table table = null;
877    try {
878      latch = new CountDownLatch(1);
879      compactionLatch = new CountDownLatch(1);
880      TableName tableName = TableName.valueOf(tableNameStr);
881      // Create a table with block size as 1024
882      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
883          CustomInnerRegionObserverWrapper.class.getName());
884      // get the block cache and region
885      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
886      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
887      HRegion region =
888          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
889      HStore store = region.getStores().iterator().next();
890      CacheConfig cacheConf = store.getCacheConfig();
891      cacheConf.setCacheDataOnWrite(true);
892      cacheConf.setEvictOnClose(true);
893      BlockCache cache = cacheConf.getBlockCache();
894
895      // insert data. 2 Rows are added
896      Put put = new Put(ROW);
897      put.addColumn(FAMILY, QUALIFIER, data);
898      table.put(put);
899      put = new Put(ROW1);
900      put.addColumn(FAMILY, QUALIFIER, data);
901      table.put(put);
902      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
903      // Should create one Hfile with 2 blocks
904      region.flush(true);
905      // read the data and expect same blocks, one new hit, no misses
906      int refCount = 0;
907      // Check how this miss is happening
908      // insert a second column, read the row, no new blocks, 3 new hits
909      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
910      byte[] data2 = Bytes.add(data, data);
911      put = new Put(ROW);
912      put.addColumn(FAMILY, QUALIFIER2, data2);
913      table.put(put);
914      // flush, one new block
915      System.out.println("Flushing cache");
916      region.flush(true);
917      Iterator<CachedBlock> iterator = cache.iterator();
918      iterateBlockCache(cache, iterator);
919      // Create three sets of scan
920      ScanThread[] scanThreads = initiateScan(table, reversed);
921      Thread.sleep(100);
922      iterator = cache.iterator();
923      boolean usedBlocksFound = false;
924      while (iterator.hasNext()) {
925        CachedBlock next = iterator.next();
926        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
927        if (cache instanceof BucketCache) {
928          refCount = ((BucketCache) cache).getRefCount(cacheKey);
929        } else if (cache instanceof CombinedBlockCache) {
930          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
931        } else {
932          continue;
933        }
934        if (refCount != 0) {
935          // Blocks will be with count 3
936          assertEquals(NO_OF_THREADS, refCount);
937          usedBlocksFound = true;
938        }
939      }
940      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
941      usedBlocksFound = false;
942      System.out.println("Compacting");
943      assertEquals(2, store.getStorefilesCount());
944      store.triggerMajorCompaction();
945      region.compact(true);
946      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
947      assertEquals(1, store.getStorefilesCount());
948      // Even after compaction is done we will have some blocks that cannot
949      // be evicted this is because the scan is still referencing them
950      iterator = cache.iterator();
951      while (iterator.hasNext()) {
952        CachedBlock next = iterator.next();
953        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
954        if (cache instanceof BucketCache) {
955          refCount = ((BucketCache) cache).getRefCount(cacheKey);
956        } else if (cache instanceof CombinedBlockCache) {
957          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
958        } else {
959          continue;
960        }
961        if (refCount != 0) {
962          // Blocks will be with count 3 as they are not yet cleared
963          assertEquals(NO_OF_THREADS, refCount);
964          usedBlocksFound = true;
965        }
966      }
967      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
968      // Should not throw exception
969      compactionLatch.countDown();
970      latch.countDown();
971      for (ScanThread thread : scanThreads) {
972        thread.join();
973      }
974      // by this time all blocks should have been evicted
975      iterator = cache.iterator();
976      iterateBlockCache(cache, iterator);
977      Result r = table.get(new Get(ROW));
978      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
979      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
980      // The gets would be working on new blocks
981      iterator = cache.iterator();
982      iterateBlockCache(cache, iterator);
983    } finally {
984      if (table != null) {
985        table.close();
986      }
987    }
988  }
989
990  @Test
991  public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
992      throws IOException, InterruptedException {
993    // do flush and scan in parallel
994    Table table = null;
995    try {
996      latch = new CountDownLatch(1);
997      compactionLatch = new CountDownLatch(1);
998      final TableName tableName = TableName.valueOf(name.getMethodName());
999      // Create a table with block size as 1024
1000      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1001          CustomInnerRegionObserverWrapper.class.getName());
1002      // get the block cache and region
1003      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1004      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
1005      HRegion region =
1006          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1007      HStore store = region.getStores().iterator().next();
1008      CacheConfig cacheConf = store.getCacheConfig();
1009      cacheConf.setCacheDataOnWrite(true);
1010      cacheConf.setEvictOnClose(true);
1011      BlockCache cache = cacheConf.getBlockCache();
1012
1013      // insert data. 2 Rows are added
1014      Put put = new Put(ROW);
1015      put.addColumn(FAMILY, QUALIFIER, data);
1016      table.put(put);
1017      put = new Put(ROW1);
1018      put.addColumn(FAMILY, QUALIFIER, data);
1019      table.put(put);
1020      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1021      // Should create one Hfile with 2 blocks
1022      region.flush(true);
1023      // read the data and expect same blocks, one new hit, no misses
1024      int refCount = 0;
1025      // Check how this miss is happening
1026      // insert a second column, read the row, no new blocks, 3 new hits
1027      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1028      byte[] data2 = Bytes.add(data, data);
1029      put = new Put(ROW);
1030      put.addColumn(FAMILY, QUALIFIER2, data2);
1031      table.put(put);
1032      // flush, one new block
1033      System.out.println("Flushing cache");
1034      region.flush(true);
1035      Iterator<CachedBlock> iterator = cache.iterator();
1036      iterateBlockCache(cache, iterator);
1037      // Create three sets of scan
1038      ScanThread[] scanThreads = initiateScan(table, false);
1039      Thread.sleep(100);
1040      iterator = cache.iterator();
1041      boolean usedBlocksFound = false;
1042      while (iterator.hasNext()) {
1043        CachedBlock next = iterator.next();
1044        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1045        if (cache instanceof BucketCache) {
1046          refCount = ((BucketCache) cache).getRefCount(cacheKey);
1047        } else if (cache instanceof CombinedBlockCache) {
1048          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1049        } else {
1050          continue;
1051        }
1052        if (refCount != 0) {
1053          // Blocks will be with count 3
1054          assertEquals(NO_OF_THREADS, refCount);
1055          usedBlocksFound = true;
1056        }
1057      }
1058      // Make a put and do a flush
1059      QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1060      data2 = Bytes.add(data, data);
1061      put = new Put(ROW1);
1062      put.addColumn(FAMILY, QUALIFIER2, data2);
1063      table.put(put);
1064      // flush, one new block
1065      System.out.println("Flushing cache");
1066      region.flush(true);
1067      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1068      usedBlocksFound = false;
1069      System.out.println("Compacting");
1070      assertEquals(3, store.getStorefilesCount());
1071      store.triggerMajorCompaction();
1072      region.compact(true);
1073      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1074      assertEquals(1, store.getStorefilesCount());
1075      // Even after compaction is done we will have some blocks that cannot
1076      // be evicted this is because the scan is still referencing them
1077      iterator = cache.iterator();
1078      while (iterator.hasNext()) {
1079        CachedBlock next = iterator.next();
1080        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1081        if (cache instanceof BucketCache) {
1082          refCount = ((BucketCache) cache).getRefCount(cacheKey);
1083        } else if (cache instanceof CombinedBlockCache) {
1084          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1085        } else {
1086          continue;
1087        }
1088        if (refCount != 0) {
1089          // Blocks will be with count 3 as they are not yet cleared
1090          assertEquals(NO_OF_THREADS, refCount);
1091          usedBlocksFound = true;
1092        }
1093      }
1094      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1095      // Should not throw exception
1096      compactionLatch.countDown();
1097      latch.countDown();
1098      for (ScanThread thread : scanThreads) {
1099        thread.join();
1100      }
1101      // by this time all blocks should have been evicted
1102      iterator = cache.iterator();
1103      // Since a flush and compaction happened after a scan started
1104      // we need to ensure that all the original blocks of the compacted file
1105      // is also removed.
1106      iterateBlockCache(cache, iterator);
1107      Result r = table.get(new Get(ROW));
1108      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1109      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1110      // The gets would be working on new blocks
1111      iterator = cache.iterator();
1112      iterateBlockCache(cache, iterator);
1113    } finally {
1114      if (table != null) {
1115        table.close();
1116      }
1117    }
1118  }
1119
1120
1121  @Test
1122  public void testScanWithException() throws IOException, InterruptedException {
1123    Table table = null;
1124    try {
1125      latch = new CountDownLatch(1);
1126      exceptionLatch = new CountDownLatch(1);
1127      final TableName tableName = TableName.valueOf(name.getMethodName());
1128      // Create KV that will give you two blocks
1129      // Create a table with block size as 1024
1130      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1131          CustomInnerRegionObserverWrapper.class.getName());
1132      // get the block cache and region
1133      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1134      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
1135      HRegion region =
1136          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1137      HStore store = region.getStores().iterator().next();
1138      CacheConfig cacheConf = store.getCacheConfig();
1139      cacheConf.setCacheDataOnWrite(true);
1140      cacheConf.setEvictOnClose(true);
1141      BlockCache cache = cacheConf.getBlockCache();
1142      // insert data. 2 Rows are added
1143      insertData(table);
1144      // flush the data
1145      System.out.println("Flushing cache");
1146      // Should create one Hfile with 2 blocks
1147      region.flush(true);
1148      // CustomInnerRegionObserver.sleepTime.set(5000);
1149      CustomInnerRegionObserver.throwException.set(true);
1150      ScanThread[] scanThreads = initiateScan(table, false);
1151      // The block would have been decremented for the scan case as it was
1152      // wrapped
1153      // before even the postNext hook gets executed.
1154      // giving some time for the block to be decremented
1155      Thread.sleep(100);
1156      Iterator<CachedBlock> iterator = cache.iterator();
1157      boolean usedBlocksFound = false;
1158      int refCount = 0;
1159      while (iterator.hasNext()) {
1160        CachedBlock next = iterator.next();
1161        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1162        if (cache instanceof BucketCache) {
1163          refCount = ((BucketCache) cache).getRefCount(cacheKey);
1164        } else if (cache instanceof CombinedBlockCache) {
1165          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1166        } else {
1167          continue;
1168        }
1169        if (refCount != 0) {
1170          // Blocks will be with count 3
1171          assertEquals(NO_OF_THREADS, refCount);
1172          usedBlocksFound = true;
1173        }
1174      }
1175      assertTrue(usedBlocksFound);
1176      exceptionLatch.countDown();
1177      // countdown the latch
1178      CustomInnerRegionObserver.getCdl().get().countDown();
1179      for (ScanThread thread : scanThreads) {
1180        thread.join();
1181      }
1182      iterator = cache.iterator();
1183      usedBlocksFound = false;
1184      refCount = 0;
1185      while (iterator.hasNext()) {
1186        CachedBlock next = iterator.next();
1187        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1188        if (cache instanceof BucketCache) {
1189          refCount = ((BucketCache) cache).getRefCount(cacheKey);
1190        } else if (cache instanceof CombinedBlockCache) {
1191          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1192        } else {
1193          continue;
1194        }
1195        if (refCount != 0) {
1196          // Blocks will be with count 3
1197          assertEquals(NO_OF_THREADS, refCount);
1198          usedBlocksFound = true;
1199        }
1200      }
1201      assertFalse(usedBlocksFound);
1202      // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1203      assertEquals(0, refCount);
1204    } finally {
1205      if (table != null) {
1206        table.close();
1207      }
1208    }
1209  }
1210
1211  private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1212    int refCount;
1213    while (iterator.hasNext()) {
1214      CachedBlock next = iterator.next();
1215      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1216      if (cache instanceof BucketCache) {
1217        refCount = ((BucketCache) cache).getRefCount(cacheKey);
1218      } else if (cache instanceof CombinedBlockCache) {
1219        refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1220      } else {
1221        continue;
1222      }
1223      assertEquals(0, refCount);
1224    }
1225  }
1226
1227  private void insertData(Table table) throws IOException {
1228    Put put = new Put(ROW);
1229    put.addColumn(FAMILY, QUALIFIER, data);
1230    table.put(put);
1231    put = new Put(ROW1);
1232    put.addColumn(FAMILY, QUALIFIER, data);
1233    table.put(put);
1234    byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1235    put = new Put(ROW);
1236    put.addColumn(FAMILY, QUALIFIER2, data2);
1237    table.put(put);
1238  }
1239
1240  private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException,
1241      InterruptedException {
1242    ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1243    for (int i = 0; i < NO_OF_THREADS; i++) {
1244      scanThreads[i] = new ScanThread(table, reverse);
1245    }
1246    for (ScanThread thread : scanThreads) {
1247      thread.start();
1248    }
1249    return scanThreads;
1250  }
1251
1252  private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1253      throws IOException, InterruptedException {
1254    GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1255    for (int i = 0; i < NO_OF_THREADS; i++) {
1256      getThreads[i] = new GetThread(table, tracker, multipleCFs);
1257    }
1258    for (GetThread thread : getThreads) {
1259      thread.start();
1260    }
1261    return getThreads;
1262  }
1263
1264  private MultiGetThread[] initiateMultiGet(Table table)
1265      throws IOException, InterruptedException {
1266    MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1267    for (int i = 0; i < NO_OF_THREADS; i++) {
1268      multiGetThreads[i] = new MultiGetThread(table);
1269    }
1270    for (MultiGetThread thread : multiGetThreads) {
1271      thread.start();
1272    }
1273    return multiGetThreads;
1274  }
1275
1276  private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1277      throws InterruptedException {
1278    int counter = NO_OF_THREADS;
1279    if (CustomInnerRegionObserver.waitForGets.get()) {
1280      // Because only one row is selected, it has only 2 blocks
1281      counter = counter - 1;
1282      while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1283        Thread.sleep(100);
1284      }
1285    } else {
1286      while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1287        Thread.sleep(100);
1288      }
1289    }
1290    Iterator<CachedBlock> iterator = cache.iterator();
1291    int refCount = 0;
1292    while (iterator.hasNext()) {
1293      CachedBlock next = iterator.next();
1294      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1295      if (cache instanceof BucketCache) {
1296        refCount = ((BucketCache) cache).getRefCount(cacheKey);
1297      } else if (cache instanceof CombinedBlockCache) {
1298        refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1299      } else {
1300        continue;
1301      }
1302      System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1303      if (CustomInnerRegionObserver.waitForGets.get()) {
1304        if (expectOnlyZero) {
1305          assertTrue(refCount == 0);
1306        }
1307        if (refCount != 0) {
1308          // Because the scan would have also touched up on these blocks but
1309          // it
1310          // would have touched
1311          // all 3
1312          if (getClosed) {
1313            // If get has closed only the scan's blocks would be available
1314            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1315          } else {
1316              assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1317          }
1318        }
1319      } else {
1320        // Because the get would have also touched up on these blocks but it
1321        // would have touched
1322        // upon only 2 additionally
1323        if (expectOnlyZero) {
1324          assertTrue(refCount == 0);
1325        }
1326        if (refCount != 0) {
1327          if (getLatch == null) {
1328            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1329          } else {
1330            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1331          }
1332        }
1333      }
1334    }
1335    CustomInnerRegionObserver.getCdl().get().countDown();
1336  }
1337
1338  private static class MultiGetThread extends Thread {
1339    private final Table table;
1340    private final List<Get> gets = new ArrayList<>();
1341    public MultiGetThread(Table table) {
1342      this.table = table;
1343    }
1344    @Override
1345    public void run() {
1346      gets.add(new Get(ROW));
1347      gets.add(new Get(ROW1));
1348      try {
1349        CustomInnerRegionObserver.getCdl().set(latch);
1350        Result[] r = table.get(gets);
1351        assertTrue(Bytes.equals(r[0].getRow(), ROW));
1352        assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1353      } catch (IOException e) {
1354      }
1355    }
1356  }
1357
1358  private static class GetThread extends Thread {
1359    private final Table table;
1360    private final boolean tracker;
1361    private final boolean multipleCFs;
1362
1363    public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1364      this.table = table;
1365      this.tracker = tracker;
1366      this.multipleCFs = multipleCFs;
1367    }
1368
1369    @Override
1370    public void run() {
1371      try {
1372        initiateGet(table);
1373      } catch (IOException e) {
1374        // do nothing
1375      }
1376    }
1377
1378    private void initiateGet(Table table) throws IOException {
1379      Get get = new Get(ROW);
1380      if (tracker) {
1381        // Change this
1382        if (!multipleCFs) {
1383          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1384          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1385          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1386          // Unknown key
1387          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1388        } else {
1389          get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1390          get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1391          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1392          // Unknown key
1393          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1394        }
1395      }
1396      CustomInnerRegionObserver.getCdl().set(latch);
1397      Result r = table.get(get);
1398      System.out.println(r);
1399      if (!tracker) {
1400        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1401        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1402      } else {
1403        if (!multipleCFs) {
1404          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1405          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1406          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1407        } else {
1408          assertTrue(Bytes.equals(
1409              r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1410              data2));
1411          assertTrue(Bytes.equals(
1412              r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1413              data2));
1414          assertTrue(Bytes.equals(
1415              r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1416              data2));
1417        }
1418      }
1419    }
1420  }
1421
1422  private static class ScanThread extends Thread {
1423    private final Table table;
1424    private final boolean reverse;
1425
1426    public ScanThread(Table table, boolean reverse) {
1427      this.table = table;
1428      this.reverse = reverse;
1429    }
1430
1431    @Override
1432    public void run() {
1433      try {
1434        initiateScan(table);
1435      } catch (IOException e) {
1436        // do nothing
1437      }
1438    }
1439
1440    private void initiateScan(Table table) throws IOException {
1441      Scan scan = new Scan();
1442      if (reverse) {
1443        scan.setReversed(true);
1444      }
1445      CustomInnerRegionObserver.getCdl().set(latch);
1446      ResultScanner resScanner = table.getScanner(scan);
1447      int i = (reverse ? ROWS.length - 1 : 0);
1448      boolean resultFound = false;
1449      for (Result result : resScanner) {
1450        resultFound = true;
1451        System.out.println(result);
1452        if (!reverse) {
1453          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1454          i++;
1455        } else {
1456          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1457          i--;
1458        }
1459      }
1460      assertTrue(resultFound);
1461    }
1462  }
1463
1464  private void waitForStoreFileCount(HStore store, int count, int timeout)
1465      throws InterruptedException {
1466    long start = System.currentTimeMillis();
1467    while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) {
1468      Thread.sleep(100);
1469    }
1470    System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" +
1471        store.getStorefilesCount());
1472    assertEquals(count, store.getStorefilesCount());
1473  }
1474
1475  private static class CustomScanner implements RegionScanner {
1476
1477    private RegionScanner delegate;
1478
1479    public CustomScanner(RegionScanner delegate) {
1480      this.delegate = delegate;
1481    }
1482
1483    @Override
1484    public boolean next(List<Cell> results) throws IOException {
1485      return delegate.next(results);
1486    }
1487
1488    @Override
1489    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
1490      return delegate.next(result, scannerContext);
1491    }
1492
1493    @Override
1494    public boolean nextRaw(List<Cell> result) throws IOException {
1495      return delegate.nextRaw(result);
1496    }
1497
1498    @Override
1499    public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException {
1500      boolean nextRaw = delegate.nextRaw(result, context);
1501      if (compactionLatch != null && compactionLatch.getCount() > 0) {
1502        try {
1503          compactionLatch.await();
1504        } catch (InterruptedException ie) {
1505        }
1506      }
1507
1508      if (CustomInnerRegionObserver.throwException.get()) {
1509        if (exceptionLatch.getCount() > 0) {
1510          try {
1511            exceptionLatch.await();
1512          } catch (InterruptedException e) {
1513          }
1514          throw new IOException("throw exception");
1515        }
1516      }
1517      return nextRaw;
1518    }
1519
1520    @Override
1521    public void close() throws IOException {
1522      delegate.close();
1523    }
1524
1525    @Override
1526    public RegionInfo getRegionInfo() {
1527      return delegate.getRegionInfo();
1528    }
1529
1530    @Override
1531    public boolean isFilterDone() throws IOException {
1532      return delegate.isFilterDone();
1533    }
1534
1535    @Override
1536    public boolean reseek(byte[] row) throws IOException {
1537      return false;
1538    }
1539
1540    @Override
1541    public long getMaxResultSize() {
1542      return delegate.getMaxResultSize();
1543    }
1544
1545    @Override
1546    public long getMvccReadPoint() {
1547      return delegate.getMvccReadPoint();
1548    }
1549
1550    @Override
1551    public int getBatch() {
1552      return delegate.getBatch();
1553    }
1554  }
1555
1556  public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1557    @Override
1558    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
1559        Scan scan, RegionScanner s) throws IOException {
1560      return new CustomScanner(s);
1561    }
1562  }
1563
1564  public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1565    static final AtomicLong sleepTime = new AtomicLong(0);
1566    static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
1567    static final AtomicInteger countOfNext = new AtomicInteger(0);
1568    static final AtomicInteger countOfGets = new AtomicInteger(0);
1569    static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1570    static final AtomicBoolean throwException = new AtomicBoolean(false);
1571    private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(
1572        new CountDownLatch(0));
1573
1574    @Override
1575    public Optional<RegionObserver> getRegionObserver() {
1576      return Optional.of(this);
1577    }
1578
1579    @Override
1580    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
1581        InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1582      slowdownCode(e, false);
1583      if (getLatch != null && getLatch.getCount() > 0) {
1584        try {
1585          getLatch.await();
1586        } catch (InterruptedException e1) {
1587        }
1588      }
1589      return hasMore;
1590    }
1591
1592    @Override
1593    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
1594        List<Cell> results) throws IOException {
1595      slowdownCode(e, true);
1596    }
1597
1598    public static AtomicReference<CountDownLatch> getCdl() {
1599      return cdl;
1600    }
1601
1602    private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e,
1603        boolean isGet) {
1604      CountDownLatch latch = getCdl().get();
1605      try {
1606        System.out.println(latch.getCount() + " is the count " + isGet);
1607        if (latch.getCount() > 0) {
1608          if (isGet) {
1609            countOfGets.incrementAndGet();
1610          } else {
1611            countOfNext.incrementAndGet();
1612          }
1613          LOG.info("Waiting for the counterCountDownLatch");
1614          latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1615          if (latch.getCount() > 0) {
1616            throw new RuntimeException("Can't wait more");
1617          }
1618        }
1619      } catch (InterruptedException e1) {
1620        LOG.error(e1.toString(), e1);
1621      }
1622    }
1623  }
1624}