001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicReference;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
042import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
043import org.apache.hadoop.hbase.coprocessor.ObserverContext;
044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
046import org.apache.hadoop.hbase.coprocessor.RegionObserver;
047import org.apache.hadoop.hbase.io.hfile.BlockCache;
048import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
049import org.apache.hadoop.hbase.io.hfile.CacheConfig;
050import org.apache.hadoop.hbase.io.hfile.CachedBlock;
051import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
052import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
053import org.apache.hadoop.hbase.regionserver.HRegion;
054import org.apache.hadoop.hbase.regionserver.HStore;
055import org.apache.hadoop.hbase.regionserver.InternalScanner;
056import org.apache.hadoop.hbase.regionserver.RegionScanner;
057import org.apache.hadoop.hbase.regionserver.ScannerContext;
058import org.apache.hadoop.hbase.testclassification.ClientTests;
059import org.apache.hadoop.hbase.testclassification.LargeTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.junit.After;
062import org.junit.AfterClass;
063import org.junit.Before;
064import org.junit.BeforeClass;
065import org.junit.ClassRule;
066import org.junit.Rule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.junit.rules.TestName;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
074
075@Category({ LargeTests.class, ClientTests.class })
076@SuppressWarnings("deprecation")
077public class TestBlockEvictionFromClient {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081      HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class);
082
083  private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
084  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
085  static byte[][] ROWS = new byte[2][];
086  private static int NO_OF_THREADS = 3;
087  private static byte[] ROW = Bytes.toBytes("testRow");
088  private static byte[] ROW1 = Bytes.toBytes("testRow1");
089  private static byte[] ROW2 = Bytes.toBytes("testRow2");
090  private static byte[] ROW3 = Bytes.toBytes("testRow3");
091  private static byte[] FAMILY = Bytes.toBytes("testFamily");
092  private static byte[][] FAMILIES_1 = new byte[1][0];
093  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
094  private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
095  private static byte[] data = new byte[1000];
096  private static byte[] data2 = Bytes.add(data, data);
097  protected static int SLAVES = 1;
098  private static CountDownLatch latch;
099  private static CountDownLatch getLatch;
100  private static CountDownLatch compactionLatch;
101  private static CountDownLatch exceptionLatch;
102
103  @Rule
104  public TestName name = new TestName();
105
106  /**
107   * @throws java.lang.Exception
108   */
109  @BeforeClass
110  public static void setUpBeforeClass() throws Exception {
111    ROWS[0] = ROW;
112    ROWS[1] = ROW1;
113    Configuration conf = TEST_UTIL.getConfiguration();
114    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
115        MultiRowMutationEndpoint.class.getName());
116    conf.setInt("hbase.regionserver.handler.count", 20);
117    conf.setInt("hbase.bucketcache.size", 400);
118    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
119    conf.setFloat("hfile.block.cache.size", 0.2f);
120    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
121    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
122    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
123    FAMILIES_1[0] = FAMILY;
124    TEST_UTIL.startMiniCluster(SLAVES);
125  }
126
127  /**
128   * @throws java.lang.Exception
129   */
130  @AfterClass
131  public static void tearDownAfterClass() throws Exception {
132    TEST_UTIL.shutdownMiniCluster();
133  }
134
135  /**
136   * @throws java.lang.Exception
137   */
138  @Before
139  public void setUp() throws Exception {
140    CustomInnerRegionObserver.waitForGets.set(false);
141    CustomInnerRegionObserver.countOfNext.set(0);
142    CustomInnerRegionObserver.countOfGets.set(0);
143  }
144
145  /**
146   * @throws java.lang.Exception
147   */
148  @After
149  public void tearDown() throws Exception {
150    if (latch != null) {
151      while (latch.getCount() > 0) {
152        latch.countDown();
153      }
154    }
155    if (getLatch != null) {
156      getLatch.countDown();
157    }
158    if (compactionLatch != null) {
159      compactionLatch.countDown();
160    }
161    if (exceptionLatch != null) {
162      exceptionLatch.countDown();
163    }
164    latch = null;
165    getLatch = null;
166    compactionLatch = null;
167    exceptionLatch = null;
168    CustomInnerRegionObserver.throwException.set(false);
169    // Clean up the tables for every test case
170    TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
171    for (TableName tableName : listTableNames) {
172      if (!tableName.isSystemTable()) {
173        TEST_UTIL.getAdmin().disableTable(tableName);
174        TEST_UTIL.getAdmin().deleteTable(tableName);
175      }
176    }
177  }
178
179  @Test
180  public void testBlockEvictionWithParallelScans() throws Exception {
181    Table table = null;
182    try {
183      latch = new CountDownLatch(1);
184      final TableName tableName = TableName.valueOf(name.getMethodName());
185      // Create a table with block size as 1024
186      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
187          CustomInnerRegionObserver.class.getName());
188      // get the block cache and region
189      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
190      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
191      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName)
192          .getRegion(regionName);
193      HStore store = region.getStores().iterator().next();
194      CacheConfig cacheConf = store.getCacheConfig();
195      cacheConf.setCacheDataOnWrite(true);
196      cacheConf.setEvictOnClose(true);
197      BlockCache cache = cacheConf.getBlockCache().get();
198
199      // insert data. 2 Rows are added
200      Put put = new Put(ROW);
201      put.addColumn(FAMILY, QUALIFIER, data);
202      table.put(put);
203      put = new Put(ROW1);
204      put.addColumn(FAMILY, QUALIFIER, data);
205      table.put(put);
206      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
207      // data was in memstore so don't expect any changes
208      // flush the data
209      // Should create one Hfile with 2 blocks
210      region.flush(true);
211      // Load cache
212      // Create three sets of scan
213      ScanThread[] scanThreads = initiateScan(table, false);
214      Thread.sleep(100);
215      checkForBlockEviction(cache, false, false);
216      for (ScanThread thread : scanThreads) {
217        thread.join();
218      }
219      // CustomInnerRegionObserver.sleepTime.set(0);
220      Iterator<CachedBlock> iterator = cache.iterator();
221      iterateBlockCache(cache, iterator);
222      // read the data and expect same blocks, one new hit, no misses
223      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
224      iterator = cache.iterator();
225      iterateBlockCache(cache, iterator);
226      // Check how this miss is happening
227      // insert a second column, read the row, no new blocks, 3 new hits
228      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
229      byte[] data2 = Bytes.add(data, data);
230      put = new Put(ROW);
231      put.addColumn(FAMILY, QUALIFIER2, data2);
232      table.put(put);
233      Result r = table.get(new Get(ROW));
234      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
235      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
236      iterator = cache.iterator();
237      iterateBlockCache(cache, iterator);
238      // flush, one new block
239      System.out.println("Flushing cache");
240      region.flush(true);
241      iterator = cache.iterator();
242      iterateBlockCache(cache, iterator);
243      // compact, net minus two blocks, two hits, no misses
244      System.out.println("Compacting");
245      assertEquals(2, store.getStorefilesCount());
246      store.triggerMajorCompaction();
247      region.compact(true);
248      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
249      assertEquals(1, store.getStorefilesCount());
250      iterator = cache.iterator();
251      iterateBlockCache(cache, iterator);
252      // read the row, this should be a cache miss because we don't cache data
253      // blocks on compaction
254      r = table.get(new Get(ROW));
255      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
256      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
257      iterator = cache.iterator();
258      iterateBlockCache(cache, iterator);
259    } finally {
260      if (table != null) {
261        table.close();
262      }
263    }
264  }
265
266  @Test
267  public void testParallelGetsAndScans() throws IOException, InterruptedException {
268    Table table = null;
269    try {
270      latch = new CountDownLatch(2);
271      // Check if get() returns blocks on its close() itself
272      getLatch = new CountDownLatch(1);
273      final TableName tableName = TableName.valueOf(name.getMethodName());
274      // Create KV that will give you two blocks
275      // Create a table with block size as 1024
276      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
277          CustomInnerRegionObserver.class.getName());
278      // get the block cache and region
279      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
280      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
281      HRegion region =
282          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
283      HStore store = region.getStores().iterator().next();
284      CacheConfig cacheConf = store.getCacheConfig();
285      cacheConf.setCacheDataOnWrite(true);
286      cacheConf.setEvictOnClose(true);
287      BlockCache cache = cacheConf.getBlockCache().get();
288
289      insertData(table);
290      // flush the data
291      System.out.println("Flushing cache");
292      // Should create one Hfile with 2 blocks
293      region.flush(true);
294      // Create three sets of scan
295      CustomInnerRegionObserver.waitForGets.set(true);
296      ScanThread[] scanThreads = initiateScan(table, false);
297      // Create three sets of gets
298      GetThread[] getThreads = initiateGet(table, false, false);
299      checkForBlockEviction(cache, false, false);
300      CustomInnerRegionObserver.waitForGets.set(false);
301      checkForBlockEviction(cache, false, false);
302      for (GetThread thread : getThreads) {
303        thread.join();
304      }
305      // Verify whether the gets have returned the blocks that it had
306      CustomInnerRegionObserver.waitForGets.set(true);
307      // giving some time for the block to be decremented
308      checkForBlockEviction(cache, true, false);
309      getLatch.countDown();
310      for (ScanThread thread : scanThreads) {
311        thread.join();
312      }
313      System.out.println("Scans should have returned the bloks");
314      // Check with either true or false
315      CustomInnerRegionObserver.waitForGets.set(false);
316      // The scan should also have released the blocks by now
317      checkForBlockEviction(cache, true, true);
318    } finally {
319      if (table != null) {
320        table.close();
321      }
322    }
323  }
324
325  @Test
326  public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
327    Table table = null;
328    try {
329      latch = new CountDownLatch(1);
330      // Check if get() returns blocks on its close() itself
331      getLatch = new CountDownLatch(1);
332      final TableName tableName = TableName.valueOf(name.getMethodName());
333      // Create KV that will give you two blocks
334      // Create a table with block size as 1024
335      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
336          CustomInnerRegionObserver.class.getName());
337      // get the block cache and region
338      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
339      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
340      HRegion region =
341          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
342      HStore store = region.getStores().iterator().next();
343      CacheConfig cacheConf = store.getCacheConfig();
344      cacheConf.setCacheDataOnWrite(true);
345      cacheConf.setEvictOnClose(true);
346      BlockCache cache = cacheConf.getBlockCache().get();
347
348      Put put = new Put(ROW);
349      put.addColumn(FAMILY, QUALIFIER, data);
350      table.put(put);
351      region.flush(true);
352      put = new Put(ROW1);
353      put.addColumn(FAMILY, QUALIFIER, data);
354      table.put(put);
355      region.flush(true);
356      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
357      put = new Put(ROW);
358      put.addColumn(FAMILY, QUALIFIER2, data2);
359      table.put(put);
360      region.flush(true);
361      // flush the data
362      System.out.println("Flushing cache");
363      // Should create one Hfile with 2 blocks
364      CustomInnerRegionObserver.waitForGets.set(true);
365      // Create three sets of gets
366      GetThread[] getThreads = initiateGet(table, false, false);
367      Thread.sleep(200);
368      CustomInnerRegionObserver.getCdl().get().countDown();
369      for (GetThread thread : getThreads) {
370        thread.join();
371      }
372      // Verify whether the gets have returned the blocks that it had
373      CustomInnerRegionObserver.waitForGets.set(true);
374      // giving some time for the block to be decremented
375      checkForBlockEviction(cache, true, false);
376      getLatch.countDown();
377      System.out.println("Gets should have returned the bloks");
378    } finally {
379      if (table != null) {
380        table.close();
381      }
382    }
383  }
384
385  @Test
386  // TODO : check how block index works here
387  public void testGetsWithMultiColumnsAndExplicitTracker()
388      throws IOException, InterruptedException {
389    Table table = null;
390    try {
391      latch = new CountDownLatch(1);
392      // Check if get() returns blocks on its close() itself
393      getLatch = new CountDownLatch(1);
394      final TableName tableName = TableName.valueOf(name.getMethodName());
395      // Create KV that will give you two blocks
396      // Create a table with block size as 1024
397      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
398          CustomInnerRegionObserver.class.getName());
399      // get the block cache and region
400      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
401      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
402      HRegion region =
403          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
404      BlockCache cache = setCacheProperties(region);
405      Put put = new Put(ROW);
406      put.addColumn(FAMILY, QUALIFIER, data);
407      table.put(put);
408      region.flush(true);
409      put = new Put(ROW1);
410      put.addColumn(FAMILY, QUALIFIER, data);
411      table.put(put);
412      region.flush(true);
413      for (int i = 1; i < 10; i++) {
414        put = new Put(ROW);
415        put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
416        table.put(put);
417        if (i % 2 == 0) {
418          region.flush(true);
419        }
420      }
421      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
422      put = new Put(ROW);
423      put.addColumn(FAMILY, QUALIFIER2, data2);
424      table.put(put);
425      region.flush(true);
426      // flush the data
427      System.out.println("Flushing cache");
428      // Should create one Hfile with 2 blocks
429      CustomInnerRegionObserver.waitForGets.set(true);
430      // Create three sets of gets
431      GetThread[] getThreads = initiateGet(table, true, false);
432      Thread.sleep(200);
433      Iterator<CachedBlock> iterator = cache.iterator();
434      boolean usedBlocksFound = false;
435      int refCount = 0;
436      int noOfBlocksWithRef = 0;
437      while (iterator.hasNext()) {
438        CachedBlock next = iterator.next();
439        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
440        if (cache instanceof BucketCache) {
441          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
442        } else if (cache instanceof CombinedBlockCache) {
443          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
444        } else {
445          continue;
446        }
447        if (refCount != 0) {
448          // Blocks will be with count 3
449          System.out.println("The refCount is " + refCount);
450          assertEquals(NO_OF_THREADS, refCount);
451          usedBlocksFound = true;
452          noOfBlocksWithRef++;
453        }
454      }
455      assertTrue(usedBlocksFound);
456      // the number of blocks referred
457      assertEquals(10, noOfBlocksWithRef);
458      CustomInnerRegionObserver.getCdl().get().countDown();
459      for (GetThread thread : getThreads) {
460        thread.join();
461      }
462      // Verify whether the gets have returned the blocks that it had
463      CustomInnerRegionObserver.waitForGets.set(true);
464      // giving some time for the block to be decremented
465      checkForBlockEviction(cache, true, false);
466      getLatch.countDown();
467      System.out.println("Gets should have returned the bloks");
468    } finally {
469      if (table != null) {
470        table.close();
471      }
472    }
473  }
474
475  @Test
476  public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
477    Table table = null;
478    try {
479      latch = new CountDownLatch(1);
480      // Check if get() returns blocks on its close() itself
481      getLatch = new CountDownLatch(1);
482      final TableName tableName = TableName.valueOf(name.getMethodName());
483      // Create KV that will give you two blocks
484      // Create a table with block size as 1024
485      byte[][] fams = new byte[10][];
486      fams[0] = FAMILY;
487      for (int i = 1; i < 10; i++) {
488        fams[i] = (Bytes.toBytes("testFamily" + i));
489      }
490      table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
491          CustomInnerRegionObserver.class.getName());
492      // get the block cache and region
493      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
494      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
495      HRegion region =
496          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
497      BlockCache cache = setCacheProperties(region);
498
499      Put put = new Put(ROW);
500      put.addColumn(FAMILY, QUALIFIER, data);
501      table.put(put);
502      region.flush(true);
503      put = new Put(ROW1);
504      put.addColumn(FAMILY, QUALIFIER, data);
505      table.put(put);
506      region.flush(true);
507      for (int i = 1; i < 10; i++) {
508        put = new Put(ROW);
509        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
510        table.put(put);
511        if (i % 2 == 0) {
512          region.flush(true);
513        }
514      }
515      region.flush(true);
516      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
517      put = new Put(ROW);
518      put.addColumn(FAMILY, QUALIFIER2, data2);
519      table.put(put);
520      region.flush(true);
521      // flush the data
522      System.out.println("Flushing cache");
523      // Should create one Hfile with 2 blocks
524      CustomInnerRegionObserver.waitForGets.set(true);
525      // Create three sets of gets
526      GetThread[] getThreads = initiateGet(table, true, true);
527      Thread.sleep(200);
528      Iterator<CachedBlock> iterator = cache.iterator();
529      boolean usedBlocksFound = false;
530      int refCount = 0;
531      int noOfBlocksWithRef = 0;
532      while (iterator.hasNext()) {
533        CachedBlock next = iterator.next();
534        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
535        if (cache instanceof BucketCache) {
536          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
537        } else if (cache instanceof CombinedBlockCache) {
538          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
539        } else {
540          continue;
541        }
542        if (refCount != 0) {
543          // Blocks will be with count 3
544          System.out.println("The refCount is " + refCount);
545          assertEquals(NO_OF_THREADS, refCount);
546          usedBlocksFound = true;
547          noOfBlocksWithRef++;
548        }
549      }
550      assertTrue(usedBlocksFound);
551      // the number of blocks referred
552      assertEquals(3, noOfBlocksWithRef);
553      CustomInnerRegionObserver.getCdl().get().countDown();
554      for (GetThread thread : getThreads) {
555        thread.join();
556      }
557      // Verify whether the gets have returned the blocks that it had
558      CustomInnerRegionObserver.waitForGets.set(true);
559      // giving some time for the block to be decremented
560      checkForBlockEviction(cache, true, false);
561      getLatch.countDown();
562      System.out.println("Gets should have returned the bloks");
563    } finally {
564      if (table != null) {
565        table.close();
566      }
567    }
568  }
569
570  @Test
571  public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
572    Table table = null;
573    try {
574      final TableName tableName = TableName.valueOf(name.getMethodName());
575      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024);
576      // get the block cache and region
577      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
578      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
579      HRegion region =
580          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
581      HStore store = region.getStores().iterator().next();
582      CacheConfig cacheConf = store.getCacheConfig();
583      cacheConf.setEvictOnClose(true);
584      BlockCache cache = cacheConf.getBlockCache().get();
585
586      Put put = new Put(ROW);
587      put.addColumn(FAMILY, QUALIFIER, data);
588      table.put(put);
589      region.flush(true);
590      put = new Put(ROW1);
591      put.addColumn(FAMILY, QUALIFIER, data);
592      table.put(put);
593      region.flush(true);
594      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
595      put = new Put(ROW2);
596      put.addColumn(FAMILY, QUALIFIER2, data2);
597      table.put(put);
598      put = new Put(ROW3);
599      put.addColumn(FAMILY, QUALIFIER2, data2);
600      table.put(put);
601      region.flush(true);
602      ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
603      int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
604      LOG.info("About to SPLIT on " + Bytes.toString(ROW1));
605      TEST_UTIL.getAdmin().split(tableName, ROW1);
606      // Wait for splits
607      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
608      region.compact(true);
609      Iterator<CachedBlock> iterator = cache.iterator();
610      // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
611      // should be closed inorder to return those blocks
612      iterateBlockCache(cache, iterator);
613    } finally {
614      if (table != null) {
615        table.close();
616      }
617    }
618  }
619
620  @Test
621  public void testMultiGets() throws IOException, InterruptedException {
622    Table table = null;
623    try {
624      latch = new CountDownLatch(2);
625      // Check if get() returns blocks on its close() itself
626      getLatch = new CountDownLatch(1);
627      final TableName tableName = TableName.valueOf(name.getMethodName());
628      // Create KV that will give you two blocks
629      // Create a table with block size as 1024
630      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
631          CustomInnerRegionObserver.class.getName());
632      // get the block cache and region
633      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
634      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
635      HRegion region =
636          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
637      HStore store = region.getStores().iterator().next();
638      CacheConfig cacheConf = store.getCacheConfig();
639      cacheConf.setCacheDataOnWrite(true);
640      cacheConf.setEvictOnClose(true);
641      BlockCache cache = cacheConf.getBlockCache().get();
642
643      Put put = new Put(ROW);
644      put.addColumn(FAMILY, QUALIFIER, data);
645      table.put(put);
646      region.flush(true);
647      put = new Put(ROW1);
648      put.addColumn(FAMILY, QUALIFIER, data);
649      table.put(put);
650      region.flush(true);
651      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
652      put = new Put(ROW);
653      put.addColumn(FAMILY, QUALIFIER2, data2);
654      table.put(put);
655      region.flush(true);
656      // flush the data
657      System.out.println("Flushing cache");
658      // Should create one Hfile with 2 blocks
659      CustomInnerRegionObserver.waitForGets.set(true);
660      // Create three sets of gets
661      MultiGetThread[] getThreads = initiateMultiGet(table);
662      Thread.sleep(200);
663      int refCount;
664      Iterator<CachedBlock> iterator = cache.iterator();
665      boolean foundNonZeroBlock = false;
666      while (iterator.hasNext()) {
667        CachedBlock next = iterator.next();
668        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
669        if (cache instanceof BucketCache) {
670          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
671        } else if (cache instanceof CombinedBlockCache) {
672          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
673        } else {
674          continue;
675        }
676        if (refCount != 0) {
677          assertEquals(NO_OF_THREADS, refCount);
678          foundNonZeroBlock = true;
679        }
680      }
681      assertTrue("Should have found nonzero ref count block",foundNonZeroBlock);
682      CustomInnerRegionObserver.getCdl().get().countDown();
683      CustomInnerRegionObserver.getCdl().get().countDown();
684      for (MultiGetThread thread : getThreads) {
685        thread.join();
686      }
687      // Verify whether the gets have returned the blocks that it had
688      CustomInnerRegionObserver.waitForGets.set(true);
689      // giving some time for the block to be decremented
690      iterateBlockCache(cache, iterator);
691      getLatch.countDown();
692      System.out.println("Gets should have returned the bloks");
693    } finally {
694      if (table != null) {
695        table.close();
696      }
697    }
698  }
699  @Test
700  public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
701    Table table = null;
702    try {
703      latch = new CountDownLatch(1);
704      // Check if get() returns blocks on its close() itself
705      final TableName tableName = TableName.valueOf(name.getMethodName());
706      // Create KV that will give you two blocks
707      // Create a table with block size as 1024
708      byte[][] fams = new byte[10][];
709      fams[0] = FAMILY;
710      for (int i = 1; i < 10; i++) {
711        fams[i] = (Bytes.toBytes("testFamily" + i));
712      }
713      table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
714          CustomInnerRegionObserver.class.getName());
715      // get the block cache and region
716      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
717      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
718      HRegion region =
719          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
720      BlockCache cache = setCacheProperties(region);
721
722      Put put = new Put(ROW);
723      put.addColumn(FAMILY, QUALIFIER, data);
724      table.put(put);
725      region.flush(true);
726      put = new Put(ROW1);
727      put.addColumn(FAMILY, QUALIFIER, data);
728      table.put(put);
729      region.flush(true);
730      for (int i = 1; i < 10; i++) {
731        put = new Put(ROW);
732        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
733        table.put(put);
734        if (i % 2 == 0) {
735          region.flush(true);
736        }
737      }
738      region.flush(true);
739      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
740      put = new Put(ROW);
741      put.addColumn(FAMILY, QUALIFIER2, data2);
742      table.put(put);
743      region.flush(true);
744      // flush the data
745      System.out.println("Flushing cache");
746      // Should create one Hfile with 2 blocks
747      // Create three sets of gets
748      ScanThread[] scanThreads = initiateScan(table, true);
749      Thread.sleep(200);
750      Iterator<CachedBlock> iterator = cache.iterator();
751      boolean usedBlocksFound = false;
752      int refCount = 0;
753      int noOfBlocksWithRef = 0;
754      while (iterator.hasNext()) {
755        CachedBlock next = iterator.next();
756        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
757        if (cache instanceof BucketCache) {
758          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
759        } else if (cache instanceof CombinedBlockCache) {
760          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
761        } else {
762          continue;
763        }
764        if (refCount != 0) {
765          // Blocks will be with count 3
766          System.out.println("The refCount is " + refCount);
767          assertEquals(NO_OF_THREADS, refCount);
768          usedBlocksFound = true;
769          noOfBlocksWithRef++;
770        }
771      }
772      assertTrue(usedBlocksFound);
773      // the number of blocks referred
774      assertEquals(12, noOfBlocksWithRef);
775      CustomInnerRegionObserver.getCdl().get().countDown();
776      for (ScanThread thread : scanThreads) {
777        thread.join();
778      }
779      // giving some time for the block to be decremented
780      checkForBlockEviction(cache, true, false);
781    } finally {
782      if (table != null) {
783        table.close();
784      }
785    }
786  }
787
788  private BlockCache setCacheProperties(HRegion region) {
789    Iterator<HStore> strItr = region.getStores().iterator();
790    BlockCache cache = null;
791    while (strItr.hasNext()) {
792      HStore store = strItr.next();
793      CacheConfig cacheConf = store.getCacheConfig();
794      cacheConf.setCacheDataOnWrite(true);
795      cacheConf.setEvictOnClose(true);
796      // Use the last one
797      cache = cacheConf.getBlockCache().get();
798    }
799    return cache;
800  }
801
802  @Test
803  public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException,
804      InterruptedException {
805    Table table = null;
806    try {
807      latch = new CountDownLatch(2);
808      // Check if get() returns blocks on its close() itself
809      getLatch = new CountDownLatch(1);
810      final TableName tableName = TableName.valueOf(name.getMethodName());
811      // Create KV that will give you two blocks
812      // Create a table with block size as 1024
813      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
814          CustomInnerRegionObserverWrapper.class.getName());
815      // get the block cache and region
816      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
817      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
818      HRegion region =
819          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
820      HStore store = region.getStores().iterator().next();
821      CacheConfig cacheConf = store.getCacheConfig();
822      cacheConf.setCacheDataOnWrite(true);
823      cacheConf.setEvictOnClose(true);
824      BlockCache cache = cacheConf.getBlockCache().get();
825
826      // insert data. 2 Rows are added
827      insertData(table);
828      // flush the data
829      System.out.println("Flushing cache");
830      // Should create one Hfile with 2 blocks
831      region.flush(true);
832      // CustomInnerRegionObserver.sleepTime.set(5000);
833      // Create three sets of scan
834      CustomInnerRegionObserver.waitForGets.set(true);
835      ScanThread[] scanThreads = initiateScan(table, false);
836      // Create three sets of gets
837      GetThread[] getThreads = initiateGet(table, false, false);
838      // The block would have been decremented for the scan case as it was
839      // wrapped
840      // before even the postNext hook gets executed.
841      // giving some time for the block to be decremented
842      Thread.sleep(100);
843      CustomInnerRegionObserver.waitForGets.set(false);
844      checkForBlockEviction(cache, false, false);
845      // countdown the latch
846      CustomInnerRegionObserver.getCdl().get().countDown();
847      for (GetThread thread : getThreads) {
848        thread.join();
849      }
850      getLatch.countDown();
851      for (ScanThread thread : scanThreads) {
852        thread.join();
853      }
854    } finally {
855      if (table != null) {
856        table.close();
857      }
858    }
859  }
860
861  @Test
862  public void testScanWithCompaction() throws IOException, InterruptedException {
863    testScanWithCompactionInternals(name.getMethodName(), false);
864  }
865
866  @Test
867  public void testReverseScanWithCompaction() throws IOException, InterruptedException {
868    testScanWithCompactionInternals(name.getMethodName(), true);
869  }
870
871  private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
872      throws IOException, InterruptedException {
873    Table table = null;
874    try {
875      latch = new CountDownLatch(1);
876      compactionLatch = new CountDownLatch(1);
877      TableName tableName = TableName.valueOf(tableNameStr);
878      // Create a table with block size as 1024
879      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
880          CustomInnerRegionObserverWrapper.class.getName());
881      // get the block cache and region
882      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
883      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
884      HRegion region =
885          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
886      HStore store = region.getStores().iterator().next();
887      CacheConfig cacheConf = store.getCacheConfig();
888      cacheConf.setCacheDataOnWrite(true);
889      cacheConf.setEvictOnClose(true);
890      BlockCache cache = cacheConf.getBlockCache().get();
891
892      // insert data. 2 Rows are added
893      Put put = new Put(ROW);
894      put.addColumn(FAMILY, QUALIFIER, data);
895      table.put(put);
896      put = new Put(ROW1);
897      put.addColumn(FAMILY, QUALIFIER, data);
898      table.put(put);
899      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
900      // Should create one Hfile with 2 blocks
901      region.flush(true);
902      // read the data and expect same blocks, one new hit, no misses
903      int refCount = 0;
904      // Check how this miss is happening
905      // insert a second column, read the row, no new blocks, 3 new hits
906      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
907      byte[] data2 = Bytes.add(data, data);
908      put = new Put(ROW);
909      put.addColumn(FAMILY, QUALIFIER2, data2);
910      table.put(put);
911      // flush, one new block
912      System.out.println("Flushing cache");
913      region.flush(true);
914      Iterator<CachedBlock> iterator = cache.iterator();
915      iterateBlockCache(cache, iterator);
916      // Create three sets of scan
917      ScanThread[] scanThreads = initiateScan(table, reversed);
918      Thread.sleep(100);
919      iterator = cache.iterator();
920      boolean usedBlocksFound = false;
921      while (iterator.hasNext()) {
922        CachedBlock next = iterator.next();
923        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
924        if (cache instanceof BucketCache) {
925          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
926        } else if (cache instanceof CombinedBlockCache) {
927          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
928        } else {
929          continue;
930        }
931        if (refCount != 0) {
932          // Blocks will be with count 3
933          assertEquals(NO_OF_THREADS, refCount);
934          usedBlocksFound = true;
935        }
936      }
937      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
938      usedBlocksFound = false;
939      System.out.println("Compacting");
940      assertEquals(2, store.getStorefilesCount());
941      store.triggerMajorCompaction();
942      region.compact(true);
943      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
944      assertEquals(1, store.getStorefilesCount());
945      // Even after compaction is done we will have some blocks that cannot
946      // be evicted this is because the scan is still referencing them
947      iterator = cache.iterator();
948      while (iterator.hasNext()) {
949        CachedBlock next = iterator.next();
950        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
951        if (cache instanceof BucketCache) {
952          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
953        } else if (cache instanceof CombinedBlockCache) {
954          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
955        } else {
956          continue;
957        }
958        if (refCount != 0) {
959          // Blocks will be with count 3 as they are not yet cleared
960          assertEquals(NO_OF_THREADS, refCount);
961          usedBlocksFound = true;
962        }
963      }
964      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
965      // Should not throw exception
966      compactionLatch.countDown();
967      latch.countDown();
968      for (ScanThread thread : scanThreads) {
969        thread.join();
970      }
971      // by this time all blocks should have been evicted
972      iterator = cache.iterator();
973      iterateBlockCache(cache, iterator);
974      Result r = table.get(new Get(ROW));
975      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
976      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
977      // The gets would be working on new blocks
978      iterator = cache.iterator();
979      iterateBlockCache(cache, iterator);
980    } finally {
981      if (table != null) {
982        table.close();
983      }
984    }
985  }
986
987  @Test
988  public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
989      throws IOException, InterruptedException {
990    // do flush and scan in parallel
991    Table table = null;
992    try {
993      latch = new CountDownLatch(1);
994      compactionLatch = new CountDownLatch(1);
995      final TableName tableName = TableName.valueOf(name.getMethodName());
996      // Create a table with block size as 1024
997      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
998          CustomInnerRegionObserverWrapper.class.getName());
999      // get the block cache and region
1000      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1001      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1002      HRegion region =
1003          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1004      HStore store = region.getStores().iterator().next();
1005      CacheConfig cacheConf = store.getCacheConfig();
1006      cacheConf.setCacheDataOnWrite(true);
1007      cacheConf.setEvictOnClose(true);
1008      BlockCache cache = cacheConf.getBlockCache().get();
1009
1010      // insert data. 2 Rows are added
1011      Put put = new Put(ROW);
1012      put.addColumn(FAMILY, QUALIFIER, data);
1013      table.put(put);
1014      put = new Put(ROW1);
1015      put.addColumn(FAMILY, QUALIFIER, data);
1016      table.put(put);
1017      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1018      // Should create one Hfile with 2 blocks
1019      region.flush(true);
1020      // read the data and expect same blocks, one new hit, no misses
1021      int refCount = 0;
1022      // Check how this miss is happening
1023      // insert a second column, read the row, no new blocks, 3 new hits
1024      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1025      byte[] data2 = Bytes.add(data, data);
1026      put = new Put(ROW);
1027      put.addColumn(FAMILY, QUALIFIER2, data2);
1028      table.put(put);
1029      // flush, one new block
1030      System.out.println("Flushing cache");
1031      region.flush(true);
1032      Iterator<CachedBlock> iterator = cache.iterator();
1033      iterateBlockCache(cache, iterator);
1034      // Create three sets of scan
1035      ScanThread[] scanThreads = initiateScan(table, false);
1036      Thread.sleep(100);
1037      iterator = cache.iterator();
1038      boolean usedBlocksFound = false;
1039      while (iterator.hasNext()) {
1040        CachedBlock next = iterator.next();
1041        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1042        if (cache instanceof BucketCache) {
1043          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1044        } else if (cache instanceof CombinedBlockCache) {
1045          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1046        } else {
1047          continue;
1048        }
1049        if (refCount != 0) {
1050          // Blocks will be with count 3
1051          assertEquals(NO_OF_THREADS, refCount);
1052          usedBlocksFound = true;
1053        }
1054      }
1055      // Make a put and do a flush
1056      QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1057      data2 = Bytes.add(data, data);
1058      put = new Put(ROW1);
1059      put.addColumn(FAMILY, QUALIFIER2, data2);
1060      table.put(put);
1061      // flush, one new block
1062      System.out.println("Flushing cache");
1063      region.flush(true);
1064      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1065      usedBlocksFound = false;
1066      System.out.println("Compacting");
1067      assertEquals(3, store.getStorefilesCount());
1068      store.triggerMajorCompaction();
1069      region.compact(true);
1070      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1071      assertEquals(1, store.getStorefilesCount());
1072      // Even after compaction is done we will have some blocks that cannot
1073      // be evicted this is because the scan is still referencing them
1074      iterator = cache.iterator();
1075      while (iterator.hasNext()) {
1076        CachedBlock next = iterator.next();
1077        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1078        if (cache instanceof BucketCache) {
1079          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1080        } else if (cache instanceof CombinedBlockCache) {
1081          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1082        } else {
1083          continue;
1084        }
1085        if (refCount != 0) {
1086          // Blocks will be with count 3 as they are not yet cleared
1087          assertEquals(NO_OF_THREADS, refCount);
1088          usedBlocksFound = true;
1089        }
1090      }
1091      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1092      // Should not throw exception
1093      compactionLatch.countDown();
1094      latch.countDown();
1095      for (ScanThread thread : scanThreads) {
1096        thread.join();
1097      }
1098      // by this time all blocks should have been evicted
1099      iterator = cache.iterator();
1100      // Since a flush and compaction happened after a scan started
1101      // we need to ensure that all the original blocks of the compacted file
1102      // is also removed.
1103      iterateBlockCache(cache, iterator);
1104      Result r = table.get(new Get(ROW));
1105      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1106      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1107      // The gets would be working on new blocks
1108      iterator = cache.iterator();
1109      iterateBlockCache(cache, iterator);
1110    } finally {
1111      if (table != null) {
1112        table.close();
1113      }
1114    }
1115  }
1116
1117
1118  @Test
1119  public void testScanWithException() throws IOException, InterruptedException {
1120    Table table = null;
1121    try {
1122      latch = new CountDownLatch(1);
1123      exceptionLatch = new CountDownLatch(1);
1124      final TableName tableName = TableName.valueOf(name.getMethodName());
1125      // Create KV that will give you two blocks
1126      // Create a table with block size as 1024
1127      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1128          CustomInnerRegionObserverWrapper.class.getName());
1129      // get the block cache and region
1130      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1131      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1132      HRegion region =
1133          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1134      HStore store = region.getStores().iterator().next();
1135      CacheConfig cacheConf = store.getCacheConfig();
1136      cacheConf.setCacheDataOnWrite(true);
1137      cacheConf.setEvictOnClose(true);
1138      BlockCache cache = cacheConf.getBlockCache().get();
1139      // insert data. 2 Rows are added
1140      insertData(table);
1141      // flush the data
1142      System.out.println("Flushing cache");
1143      // Should create one Hfile with 2 blocks
1144      region.flush(true);
1145      // CustomInnerRegionObserver.sleepTime.set(5000);
1146      CustomInnerRegionObserver.throwException.set(true);
1147      ScanThread[] scanThreads = initiateScan(table, false);
1148      // The block would have been decremented for the scan case as it was
1149      // wrapped
1150      // before even the postNext hook gets executed.
1151      // giving some time for the block to be decremented
1152      Thread.sleep(100);
1153      Iterator<CachedBlock> iterator = cache.iterator();
1154      boolean usedBlocksFound = false;
1155      int refCount = 0;
1156      while (iterator.hasNext()) {
1157        CachedBlock next = iterator.next();
1158        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1159        if (cache instanceof BucketCache) {
1160          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1161        } else if (cache instanceof CombinedBlockCache) {
1162          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1163        } else {
1164          continue;
1165        }
1166        if (refCount != 0) {
1167          // Blocks will be with count 3
1168          assertEquals(NO_OF_THREADS, refCount);
1169          usedBlocksFound = true;
1170        }
1171      }
1172      assertTrue(usedBlocksFound);
1173      exceptionLatch.countDown();
1174      // countdown the latch
1175      CustomInnerRegionObserver.getCdl().get().countDown();
1176      for (ScanThread thread : scanThreads) {
1177        thread.join();
1178      }
1179      iterator = cache.iterator();
1180      usedBlocksFound = false;
1181      refCount = 0;
1182      while (iterator.hasNext()) {
1183        CachedBlock next = iterator.next();
1184        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1185        if (cache instanceof BucketCache) {
1186          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1187        } else if (cache instanceof CombinedBlockCache) {
1188          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1189        } else {
1190          continue;
1191        }
1192        if (refCount != 0) {
1193          // Blocks will be with count 3
1194          assertEquals(NO_OF_THREADS, refCount);
1195          usedBlocksFound = true;
1196        }
1197      }
1198      assertFalse(usedBlocksFound);
1199      // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1200      assertEquals(0, refCount);
1201    } finally {
1202      if (table != null) {
1203        table.close();
1204      }
1205    }
1206  }
1207
1208  private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1209    int refCount;
1210    while (iterator.hasNext()) {
1211      CachedBlock next = iterator.next();
1212      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1213      if (cache instanceof BucketCache) {
1214        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1215      } else if (cache instanceof CombinedBlockCache) {
1216        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1217      } else {
1218        continue;
1219      }
1220      assertEquals(0, refCount);
1221    }
1222  }
1223
1224  private void insertData(Table table) throws IOException {
1225    Put put = new Put(ROW);
1226    put.addColumn(FAMILY, QUALIFIER, data);
1227    table.put(put);
1228    put = new Put(ROW1);
1229    put.addColumn(FAMILY, QUALIFIER, data);
1230    table.put(put);
1231    byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1232    put = new Put(ROW);
1233    put.addColumn(FAMILY, QUALIFIER2, data2);
1234    table.put(put);
1235  }
1236
1237  private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException,
1238      InterruptedException {
1239    ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1240    for (int i = 0; i < NO_OF_THREADS; i++) {
1241      scanThreads[i] = new ScanThread(table, reverse);
1242    }
1243    for (ScanThread thread : scanThreads) {
1244      thread.start();
1245    }
1246    return scanThreads;
1247  }
1248
1249  private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1250      throws IOException, InterruptedException {
1251    GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1252    for (int i = 0; i < NO_OF_THREADS; i++) {
1253      getThreads[i] = new GetThread(table, tracker, multipleCFs);
1254    }
1255    for (GetThread thread : getThreads) {
1256      thread.start();
1257    }
1258    return getThreads;
1259  }
1260
1261  private MultiGetThread[] initiateMultiGet(Table table)
1262      throws IOException, InterruptedException {
1263    MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1264    for (int i = 0; i < NO_OF_THREADS; i++) {
1265      multiGetThreads[i] = new MultiGetThread(table);
1266    }
1267    for (MultiGetThread thread : multiGetThreads) {
1268      thread.start();
1269    }
1270    return multiGetThreads;
1271  }
1272
1273  private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1274      throws InterruptedException {
1275    int counter = NO_OF_THREADS;
1276    if (CustomInnerRegionObserver.waitForGets.get()) {
1277      // Because only one row is selected, it has only 2 blocks
1278      counter = counter - 1;
1279      while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1280        Thread.sleep(100);
1281      }
1282    } else {
1283      while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1284        Thread.sleep(100);
1285      }
1286    }
1287    Iterator<CachedBlock> iterator = cache.iterator();
1288    int refCount = 0;
1289    while (iterator.hasNext()) {
1290      CachedBlock next = iterator.next();
1291      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1292      if (cache instanceof BucketCache) {
1293        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1294      } else if (cache instanceof CombinedBlockCache) {
1295        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1296      } else {
1297        continue;
1298      }
1299      System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1300      if (CustomInnerRegionObserver.waitForGets.get()) {
1301        if (expectOnlyZero) {
1302          assertTrue(refCount == 0);
1303        }
1304        if (refCount != 0) {
1305          // Because the scan would have also touched up on these blocks but
1306          // it
1307          // would have touched
1308          // all 3
1309          if (getClosed) {
1310            // If get has closed only the scan's blocks would be available
1311            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1312          } else {
1313              assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1314          }
1315        }
1316      } else {
1317        // Because the get would have also touched up on these blocks but it
1318        // would have touched
1319        // upon only 2 additionally
1320        if (expectOnlyZero) {
1321          assertTrue(refCount == 0);
1322        }
1323        if (refCount != 0) {
1324          if (getLatch == null) {
1325            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1326          } else {
1327            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1328          }
1329        }
1330      }
1331    }
1332    CustomInnerRegionObserver.getCdl().get().countDown();
1333  }
1334
1335  private static class MultiGetThread extends Thread {
1336    private final Table table;
1337    private final List<Get> gets = new ArrayList<>();
1338    public MultiGetThread(Table table) {
1339      this.table = table;
1340    }
1341    @Override
1342    public void run() {
1343      gets.add(new Get(ROW));
1344      gets.add(new Get(ROW1));
1345      try {
1346        CustomInnerRegionObserver.getCdl().set(latch);
1347        Result[] r = table.get(gets);
1348        assertTrue(Bytes.equals(r[0].getRow(), ROW));
1349        assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1350      } catch (IOException e) {
1351      }
1352    }
1353  }
1354
1355  private static class GetThread extends Thread {
1356    private final Table table;
1357    private final boolean tracker;
1358    private final boolean multipleCFs;
1359
1360    public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1361      this.table = table;
1362      this.tracker = tracker;
1363      this.multipleCFs = multipleCFs;
1364    }
1365
1366    @Override
1367    public void run() {
1368      try {
1369        initiateGet(table);
1370      } catch (IOException e) {
1371        // do nothing
1372      }
1373    }
1374
1375    private void initiateGet(Table table) throws IOException {
1376      Get get = new Get(ROW);
1377      if (tracker) {
1378        // Change this
1379        if (!multipleCFs) {
1380          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1381          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1382          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1383          // Unknown key
1384          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1385        } else {
1386          get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1387          get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1388          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1389          // Unknown key
1390          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1391        }
1392      }
1393      CustomInnerRegionObserver.getCdl().set(latch);
1394      Result r = table.get(get);
1395      System.out.println(r);
1396      if (!tracker) {
1397        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1398        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1399      } else {
1400        if (!multipleCFs) {
1401          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1402          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1403          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1404        } else {
1405          assertTrue(Bytes.equals(
1406              r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1407              data2));
1408          assertTrue(Bytes.equals(
1409              r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1410              data2));
1411          assertTrue(Bytes.equals(
1412              r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1413              data2));
1414        }
1415      }
1416    }
1417  }
1418
1419  private static class ScanThread extends Thread {
1420    private final Table table;
1421    private final boolean reverse;
1422
1423    public ScanThread(Table table, boolean reverse) {
1424      this.table = table;
1425      this.reverse = reverse;
1426    }
1427
1428    @Override
1429    public void run() {
1430      try {
1431        initiateScan(table);
1432      } catch (IOException e) {
1433        // do nothing
1434      }
1435    }
1436
1437    private void initiateScan(Table table) throws IOException {
1438      Scan scan = new Scan();
1439      if (reverse) {
1440        scan.setReversed(true);
1441      }
1442      CustomInnerRegionObserver.getCdl().set(latch);
1443      ResultScanner resScanner = table.getScanner(scan);
1444      int i = (reverse ? ROWS.length - 1 : 0);
1445      boolean resultFound = false;
1446      for (Result result : resScanner) {
1447        resultFound = true;
1448        System.out.println(result);
1449        if (!reverse) {
1450          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1451          i++;
1452        } else {
1453          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1454          i--;
1455        }
1456      }
1457      assertTrue(resultFound);
1458    }
1459  }
1460
1461  private void waitForStoreFileCount(HStore store, int count, int timeout)
1462      throws InterruptedException {
1463    long start = System.currentTimeMillis();
1464    while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) {
1465      Thread.sleep(100);
1466    }
1467    System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" +
1468        store.getStorefilesCount());
1469    assertEquals(count, store.getStorefilesCount());
1470  }
1471
1472  private static class CustomScanner implements RegionScanner {
1473
1474    private RegionScanner delegate;
1475
1476    public CustomScanner(RegionScanner delegate) {
1477      this.delegate = delegate;
1478    }
1479
1480    @Override
1481    public boolean next(List<Cell> results) throws IOException {
1482      return delegate.next(results);
1483    }
1484
1485    @Override
1486    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
1487      return delegate.next(result, scannerContext);
1488    }
1489
1490    @Override
1491    public boolean nextRaw(List<Cell> result) throws IOException {
1492      return delegate.nextRaw(result);
1493    }
1494
1495    @Override
1496    public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException {
1497      boolean nextRaw = delegate.nextRaw(result, context);
1498      if (compactionLatch != null && compactionLatch.getCount() > 0) {
1499        try {
1500          compactionLatch.await();
1501        } catch (InterruptedException ie) {
1502        }
1503      }
1504
1505      if (CustomInnerRegionObserver.throwException.get()) {
1506        if (exceptionLatch.getCount() > 0) {
1507          try {
1508            exceptionLatch.await();
1509          } catch (InterruptedException e) {
1510          }
1511          throw new IOException("throw exception");
1512        }
1513      }
1514      return nextRaw;
1515    }
1516
1517    @Override
1518    public void close() throws IOException {
1519      delegate.close();
1520    }
1521
1522    @Override
1523    public RegionInfo getRegionInfo() {
1524      return delegate.getRegionInfo();
1525    }
1526
1527    @Override
1528    public boolean isFilterDone() throws IOException {
1529      return delegate.isFilterDone();
1530    }
1531
1532    @Override
1533    public boolean reseek(byte[] row) throws IOException {
1534      return false;
1535    }
1536
1537    @Override
1538    public long getMaxResultSize() {
1539      return delegate.getMaxResultSize();
1540    }
1541
1542    @Override
1543    public long getMvccReadPoint() {
1544      return delegate.getMvccReadPoint();
1545    }
1546
1547    @Override
1548    public int getBatch() {
1549      return delegate.getBatch();
1550    }
1551  }
1552
1553  public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1554    @Override
1555    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
1556        Scan scan, RegionScanner s) throws IOException {
1557      return new CustomScanner(s);
1558    }
1559  }
1560
1561  public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1562    static final AtomicInteger countOfNext = new AtomicInteger(0);
1563    static final AtomicInteger countOfGets = new AtomicInteger(0);
1564    static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1565    static final AtomicBoolean throwException = new AtomicBoolean(false);
1566    private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(
1567        new CountDownLatch(0));
1568
1569    @Override
1570    public Optional<RegionObserver> getRegionObserver() {
1571      return Optional.of(this);
1572    }
1573
1574    @Override
1575    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
1576        InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1577      slowdownCode(e, false);
1578      if (getLatch != null && getLatch.getCount() > 0) {
1579        try {
1580          getLatch.await();
1581        } catch (InterruptedException e1) {
1582        }
1583      }
1584      return hasMore;
1585    }
1586
1587    @Override
1588    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
1589        List<Cell> results) throws IOException {
1590      slowdownCode(e, true);
1591    }
1592
1593    public static AtomicReference<CountDownLatch> getCdl() {
1594      return cdl;
1595    }
1596
1597    private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e,
1598        boolean isGet) {
1599      CountDownLatch latch = getCdl().get();
1600      try {
1601        System.out.println(latch.getCount() + " is the count " + isGet);
1602        if (latch.getCount() > 0) {
1603          if (isGet) {
1604            countOfGets.incrementAndGet();
1605          } else {
1606            countOfNext.incrementAndGet();
1607          }
1608          LOG.info("Waiting for the counterCountDownLatch");
1609          latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1610          if (latch.getCount() > 0) {
1611            throw new RuntimeException("Can't wait more");
1612          }
1613        }
1614      } catch (InterruptedException e1) {
1615        LOG.error(e1.toString(), e1);
1616      }
1617    }
1618  }
1619}