001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicLong;
034import java.util.concurrent.atomic.AtomicReference;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
043import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
044import org.apache.hadoop.hbase.coprocessor.ObserverContext;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
047import org.apache.hadoop.hbase.coprocessor.RegionObserver;
048import org.apache.hadoop.hbase.io.hfile.BlockCache;
049import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
050import org.apache.hadoop.hbase.io.hfile.CacheConfig;
051import org.apache.hadoop.hbase.io.hfile.CachedBlock;
052import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
054import org.apache.hadoop.hbase.regionserver.HRegion;
055import org.apache.hadoop.hbase.regionserver.HStore;
056import org.apache.hadoop.hbase.regionserver.InternalScanner;
057import org.apache.hadoop.hbase.regionserver.RegionScanner;
058import org.apache.hadoop.hbase.regionserver.ScannerContext;
059import org.apache.hadoop.hbase.testclassification.ClientTests;
060import org.apache.hadoop.hbase.testclassification.LargeTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.junit.After;
063import org.junit.AfterClass;
064import org.junit.Before;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.rules.TestName;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
075
076@Category({ LargeTests.class, ClientTests.class })
077@SuppressWarnings("deprecation")
078public class TestBlockEvictionFromClient {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082      HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class);
083
084  private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
085  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
086  static byte[][] ROWS = new byte[2][];
087  private static int NO_OF_THREADS = 3;
088  private static byte[] ROW = Bytes.toBytes("testRow");
089  private static byte[] ROW1 = Bytes.toBytes("testRow1");
090  private static byte[] ROW2 = Bytes.toBytes("testRow2");
091  private static byte[] ROW3 = Bytes.toBytes("testRow3");
092  private static byte[] FAMILY = Bytes.toBytes("testFamily");
093  private static byte[][] FAMILIES_1 = new byte[1][0];
094  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
095  private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
096  private static byte[] data = new byte[1000];
097  private static byte[] data2 = Bytes.add(data, data);
098  protected static int SLAVES = 1;
099  private static CountDownLatch latch;
100  private static CountDownLatch getLatch;
101  private static CountDownLatch compactionLatch;
102  private static CountDownLatch exceptionLatch;
103
104  @Rule
105  public TestName name = new TestName();
106
107  /**
108   * @throws java.lang.Exception
109   */
110  @BeforeClass
111  public static void setUpBeforeClass() throws Exception {
112    ROWS[0] = ROW;
113    ROWS[1] = ROW1;
114    Configuration conf = TEST_UTIL.getConfiguration();
115    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
116        MultiRowMutationEndpoint.class.getName());
117    conf.setInt("hbase.regionserver.handler.count", 20);
118    conf.setInt("hbase.bucketcache.size", 400);
119    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
120    conf.setFloat("hfile.block.cache.size", 0.2f);
121    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
122    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
123    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
124    FAMILIES_1[0] = FAMILY;
125    TEST_UTIL.startMiniCluster(SLAVES);
126  }
127
128  /**
129   * @throws java.lang.Exception
130   */
131  @AfterClass
132  public static void tearDownAfterClass() throws Exception {
133    TEST_UTIL.shutdownMiniCluster();
134  }
135
136  /**
137   * @throws java.lang.Exception
138   */
139  @Before
140  public void setUp() throws Exception {
141    CustomInnerRegionObserver.waitForGets.set(false);
142    CustomInnerRegionObserver.countOfNext.set(0);
143    CustomInnerRegionObserver.countOfGets.set(0);
144  }
145
146  /**
147   * @throws java.lang.Exception
148   */
149  @After
150  public void tearDown() throws Exception {
151    if (latch != null) {
152      while (latch.getCount() > 0) {
153        latch.countDown();
154      }
155    }
156    if (getLatch != null) {
157      getLatch.countDown();
158    }
159    if (compactionLatch != null) {
160      compactionLatch.countDown();
161    }
162    if (exceptionLatch != null) {
163      exceptionLatch.countDown();
164    }
165    latch = null;
166    getLatch = null;
167    compactionLatch = null;
168    exceptionLatch = null;
169    CustomInnerRegionObserver.throwException.set(false);
170    // Clean up the tables for every test case
171    TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
172    for (TableName tableName : listTableNames) {
173      if (!tableName.isSystemTable()) {
174        TEST_UTIL.getAdmin().disableTable(tableName);
175        TEST_UTIL.getAdmin().deleteTable(tableName);
176      }
177    }
178  }
179
180  @Test
181  public void testBlockEvictionWithParallelScans() throws Exception {
182    Table table = null;
183    try {
184      latch = new CountDownLatch(1);
185      final TableName tableName = TableName.valueOf(name.getMethodName());
186      // Create a table with block size as 1024
187      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
188          CustomInnerRegionObserver.class.getName());
189      // get the block cache and region
190      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
191      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
192      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName)
193          .getRegion(regionName);
194      HStore store = region.getStores().iterator().next();
195      CacheConfig cacheConf = store.getCacheConfig();
196      cacheConf.setCacheDataOnWrite(true);
197      cacheConf.setEvictOnClose(true);
198      BlockCache cache = cacheConf.getBlockCache().get();
199
200      // insert data. 2 Rows are added
201      Put put = new Put(ROW);
202      put.addColumn(FAMILY, QUALIFIER, data);
203      table.put(put);
204      put = new Put(ROW1);
205      put.addColumn(FAMILY, QUALIFIER, data);
206      table.put(put);
207      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
208      // data was in memstore so don't expect any changes
209      // flush the data
210      // Should create one Hfile with 2 blocks
211      region.flush(true);
212      // Load cache
213      // Create three sets of scan
214      ScanThread[] scanThreads = initiateScan(table, false);
215      Thread.sleep(100);
216      checkForBlockEviction(cache, false, false);
217      for (ScanThread thread : scanThreads) {
218        thread.join();
219      }
220      // CustomInnerRegionObserver.sleepTime.set(0);
221      Iterator<CachedBlock> iterator = cache.iterator();
222      iterateBlockCache(cache, iterator);
223      // read the data and expect same blocks, one new hit, no misses
224      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
225      iterator = cache.iterator();
226      iterateBlockCache(cache, iterator);
227      // Check how this miss is happening
228      // insert a second column, read the row, no new blocks, 3 new hits
229      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
230      byte[] data2 = Bytes.add(data, data);
231      put = new Put(ROW);
232      put.addColumn(FAMILY, QUALIFIER2, data2);
233      table.put(put);
234      Result r = table.get(new Get(ROW));
235      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
236      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
237      iterator = cache.iterator();
238      iterateBlockCache(cache, iterator);
239      // flush, one new block
240      System.out.println("Flushing cache");
241      region.flush(true);
242      iterator = cache.iterator();
243      iterateBlockCache(cache, iterator);
244      // compact, net minus two blocks, two hits, no misses
245      System.out.println("Compacting");
246      assertEquals(2, store.getStorefilesCount());
247      store.triggerMajorCompaction();
248      region.compact(true);
249      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
250      assertEquals(1, store.getStorefilesCount());
251      iterator = cache.iterator();
252      iterateBlockCache(cache, iterator);
253      // read the row, this should be a cache miss because we don't cache data
254      // blocks on compaction
255      r = table.get(new Get(ROW));
256      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
257      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
258      iterator = cache.iterator();
259      iterateBlockCache(cache, iterator);
260    } finally {
261      if (table != null) {
262        table.close();
263      }
264    }
265  }
266
267  @Test
268  public void testParallelGetsAndScans() throws IOException, InterruptedException {
269    Table table = null;
270    try {
271      latch = new CountDownLatch(2);
272      // Check if get() returns blocks on its close() itself
273      getLatch = new CountDownLatch(1);
274      final TableName tableName = TableName.valueOf(name.getMethodName());
275      // Create KV that will give you two blocks
276      // Create a table with block size as 1024
277      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
278          CustomInnerRegionObserver.class.getName());
279      // get the block cache and region
280      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
281      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
282      HRegion region =
283          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
284      HStore store = region.getStores().iterator().next();
285      CacheConfig cacheConf = store.getCacheConfig();
286      cacheConf.setCacheDataOnWrite(true);
287      cacheConf.setEvictOnClose(true);
288      BlockCache cache = cacheConf.getBlockCache().get();
289
290      insertData(table);
291      // flush the data
292      System.out.println("Flushing cache");
293      // Should create one Hfile with 2 blocks
294      region.flush(true);
295      // Create three sets of scan
296      CustomInnerRegionObserver.waitForGets.set(true);
297      ScanThread[] scanThreads = initiateScan(table, false);
298      // Create three sets of gets
299      GetThread[] getThreads = initiateGet(table, false, false);
300      checkForBlockEviction(cache, false, false);
301      CustomInnerRegionObserver.waitForGets.set(false);
302      checkForBlockEviction(cache, false, false);
303      for (GetThread thread : getThreads) {
304        thread.join();
305      }
306      // Verify whether the gets have returned the blocks that it had
307      CustomInnerRegionObserver.waitForGets.set(true);
308      // giving some time for the block to be decremented
309      checkForBlockEviction(cache, true, false);
310      getLatch.countDown();
311      for (ScanThread thread : scanThreads) {
312        thread.join();
313      }
314      System.out.println("Scans should have returned the bloks");
315      // Check with either true or false
316      CustomInnerRegionObserver.waitForGets.set(false);
317      // The scan should also have released the blocks by now
318      checkForBlockEviction(cache, true, true);
319    } finally {
320      if (table != null) {
321        table.close();
322      }
323    }
324  }
325
326  @Test
327  public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
328    Table table = null;
329    try {
330      latch = new CountDownLatch(1);
331      // Check if get() returns blocks on its close() itself
332      getLatch = new CountDownLatch(1);
333      final TableName tableName = TableName.valueOf(name.getMethodName());
334      // Create KV that will give you two blocks
335      // Create a table with block size as 1024
336      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
337          CustomInnerRegionObserver.class.getName());
338      // get the block cache and region
339      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
340      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
341      HRegion region =
342          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
343      HStore store = region.getStores().iterator().next();
344      CacheConfig cacheConf = store.getCacheConfig();
345      cacheConf.setCacheDataOnWrite(true);
346      cacheConf.setEvictOnClose(true);
347      BlockCache cache = cacheConf.getBlockCache().get();
348
349      Put put = new Put(ROW);
350      put.addColumn(FAMILY, QUALIFIER, data);
351      table.put(put);
352      region.flush(true);
353      put = new Put(ROW1);
354      put.addColumn(FAMILY, QUALIFIER, data);
355      table.put(put);
356      region.flush(true);
357      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
358      put = new Put(ROW);
359      put.addColumn(FAMILY, QUALIFIER2, data2);
360      table.put(put);
361      region.flush(true);
362      // flush the data
363      System.out.println("Flushing cache");
364      // Should create one Hfile with 2 blocks
365      CustomInnerRegionObserver.waitForGets.set(true);
366      // Create three sets of gets
367      GetThread[] getThreads = initiateGet(table, false, false);
368      Thread.sleep(200);
369      CustomInnerRegionObserver.getCdl().get().countDown();
370      for (GetThread thread : getThreads) {
371        thread.join();
372      }
373      // Verify whether the gets have returned the blocks that it had
374      CustomInnerRegionObserver.waitForGets.set(true);
375      // giving some time for the block to be decremented
376      checkForBlockEviction(cache, true, false);
377      getLatch.countDown();
378      System.out.println("Gets should have returned the bloks");
379    } finally {
380      if (table != null) {
381        table.close();
382      }
383    }
384  }
385
386  @Test
387  // TODO : check how block index works here
388  public void testGetsWithMultiColumnsAndExplicitTracker()
389      throws IOException, InterruptedException {
390    Table table = null;
391    try {
392      latch = new CountDownLatch(1);
393      // Check if get() returns blocks on its close() itself
394      getLatch = new CountDownLatch(1);
395      final TableName tableName = TableName.valueOf(name.getMethodName());
396      // Create KV that will give you two blocks
397      // Create a table with block size as 1024
398      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
399          CustomInnerRegionObserver.class.getName());
400      // get the block cache and region
401      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
402      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
403      HRegion region =
404          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
405      BlockCache cache = setCacheProperties(region);
406      Put put = new Put(ROW);
407      put.addColumn(FAMILY, QUALIFIER, data);
408      table.put(put);
409      region.flush(true);
410      put = new Put(ROW1);
411      put.addColumn(FAMILY, QUALIFIER, data);
412      table.put(put);
413      region.flush(true);
414      for (int i = 1; i < 10; i++) {
415        put = new Put(ROW);
416        put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
417        table.put(put);
418        if (i % 2 == 0) {
419          region.flush(true);
420        }
421      }
422      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
423      put = new Put(ROW);
424      put.addColumn(FAMILY, QUALIFIER2, data2);
425      table.put(put);
426      region.flush(true);
427      // flush the data
428      System.out.println("Flushing cache");
429      // Should create one Hfile with 2 blocks
430      CustomInnerRegionObserver.waitForGets.set(true);
431      // Create three sets of gets
432      GetThread[] getThreads = initiateGet(table, true, false);
433      Thread.sleep(200);
434      Iterator<CachedBlock> iterator = cache.iterator();
435      boolean usedBlocksFound = false;
436      int refCount = 0;
437      int noOfBlocksWithRef = 0;
438      while (iterator.hasNext()) {
439        CachedBlock next = iterator.next();
440        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
441        if (cache instanceof BucketCache) {
442          refCount = ((BucketCache) cache).getRefCount(cacheKey);
443        } else if (cache instanceof CombinedBlockCache) {
444          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
445        } else {
446          continue;
447        }
448        if (refCount != 0) {
449          // Blocks will be with count 3
450          System.out.println("The refCount is " + refCount);
451          assertEquals(NO_OF_THREADS, refCount);
452          usedBlocksFound = true;
453          noOfBlocksWithRef++;
454        }
455      }
456      assertTrue(usedBlocksFound);
457      // the number of blocks referred
458      assertEquals(10, noOfBlocksWithRef);
459      CustomInnerRegionObserver.getCdl().get().countDown();
460      for (GetThread thread : getThreads) {
461        thread.join();
462      }
463      // Verify whether the gets have returned the blocks that it had
464      CustomInnerRegionObserver.waitForGets.set(true);
465      // giving some time for the block to be decremented
466      checkForBlockEviction(cache, true, false);
467      getLatch.countDown();
468      System.out.println("Gets should have returned the bloks");
469    } finally {
470      if (table != null) {
471        table.close();
472      }
473    }
474  }
475
476  @Test
477  public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
478    Table table = null;
479    try {
480      latch = new CountDownLatch(1);
481      // Check if get() returns blocks on its close() itself
482      getLatch = new CountDownLatch(1);
483      final TableName tableName = TableName.valueOf(name.getMethodName());
484      // Create KV that will give you two blocks
485      // Create a table with block size as 1024
486      byte[][] fams = new byte[10][];
487      fams[0] = FAMILY;
488      for (int i = 1; i < 10; i++) {
489        fams[i] = (Bytes.toBytes("testFamily" + i));
490      }
491      table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
492          CustomInnerRegionObserver.class.getName());
493      // get the block cache and region
494      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
495      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
496      HRegion region =
497          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
498      BlockCache cache = setCacheProperties(region);
499
500      Put put = new Put(ROW);
501      put.addColumn(FAMILY, QUALIFIER, data);
502      table.put(put);
503      region.flush(true);
504      put = new Put(ROW1);
505      put.addColumn(FAMILY, QUALIFIER, data);
506      table.put(put);
507      region.flush(true);
508      for (int i = 1; i < 10; i++) {
509        put = new Put(ROW);
510        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
511        table.put(put);
512        if (i % 2 == 0) {
513          region.flush(true);
514        }
515      }
516      region.flush(true);
517      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
518      put = new Put(ROW);
519      put.addColumn(FAMILY, QUALIFIER2, data2);
520      table.put(put);
521      region.flush(true);
522      // flush the data
523      System.out.println("Flushing cache");
524      // Should create one Hfile with 2 blocks
525      CustomInnerRegionObserver.waitForGets.set(true);
526      // Create three sets of gets
527      GetThread[] getThreads = initiateGet(table, true, true);
528      Thread.sleep(200);
529      Iterator<CachedBlock> iterator = cache.iterator();
530      boolean usedBlocksFound = false;
531      int refCount = 0;
532      int noOfBlocksWithRef = 0;
533      while (iterator.hasNext()) {
534        CachedBlock next = iterator.next();
535        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
536        if (cache instanceof BucketCache) {
537          refCount = ((BucketCache) cache).getRefCount(cacheKey);
538        } else if (cache instanceof CombinedBlockCache) {
539          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
540        } else {
541          continue;
542        }
543        if (refCount != 0) {
544          // Blocks will be with count 3
545          System.out.println("The refCount is " + refCount);
546          assertEquals(NO_OF_THREADS, refCount);
547          usedBlocksFound = true;
548          noOfBlocksWithRef++;
549        }
550      }
551      assertTrue(usedBlocksFound);
552      // the number of blocks referred
553      assertEquals(3, noOfBlocksWithRef);
554      CustomInnerRegionObserver.getCdl().get().countDown();
555      for (GetThread thread : getThreads) {
556        thread.join();
557      }
558      // Verify whether the gets have returned the blocks that it had
559      CustomInnerRegionObserver.waitForGets.set(true);
560      // giving some time for the block to be decremented
561      checkForBlockEviction(cache, true, false);
562      getLatch.countDown();
563      System.out.println("Gets should have returned the bloks");
564    } finally {
565      if (table != null) {
566        table.close();
567      }
568    }
569  }
570
571  @Test
572  public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
573    Table table = null;
574    try {
575      final TableName tableName = TableName.valueOf(name.getMethodName());
576      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024);
577      // get the block cache and region
578      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
579      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
580      HRegion region =
581          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
582      HStore store = region.getStores().iterator().next();
583      CacheConfig cacheConf = store.getCacheConfig();
584      cacheConf.setEvictOnClose(true);
585      BlockCache cache = cacheConf.getBlockCache().get();
586
587      Put put = new Put(ROW);
588      put.addColumn(FAMILY, QUALIFIER, data);
589      table.put(put);
590      region.flush(true);
591      put = new Put(ROW1);
592      put.addColumn(FAMILY, QUALIFIER, data);
593      table.put(put);
594      region.flush(true);
595      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
596      put = new Put(ROW2);
597      put.addColumn(FAMILY, QUALIFIER2, data2);
598      table.put(put);
599      put = new Put(ROW3);
600      put.addColumn(FAMILY, QUALIFIER2, data2);
601      table.put(put);
602      region.flush(true);
603      ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
604      int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
605      LOG.info("About to SPLIT on " + Bytes.toString(ROW1));
606      TEST_UTIL.getAdmin().split(tableName, ROW1);
607      // Wait for splits
608      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
609      region.compact(true);
610      Iterator<CachedBlock> iterator = cache.iterator();
611      // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
612      // should be closed inorder to return those blocks
613      iterateBlockCache(cache, iterator);
614    } finally {
615      if (table != null) {
616        table.close();
617      }
618    }
619  }
620
621  @Test
622  public void testMultiGets() throws IOException, InterruptedException {
623    Table table = null;
624    try {
625      latch = new CountDownLatch(2);
626      // Check if get() returns blocks on its close() itself
627      getLatch = new CountDownLatch(1);
628      final TableName tableName = TableName.valueOf(name.getMethodName());
629      // Create KV that will give you two blocks
630      // Create a table with block size as 1024
631      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
632          CustomInnerRegionObserver.class.getName());
633      // get the block cache and region
634      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
635      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
636      HRegion region =
637          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
638      HStore store = region.getStores().iterator().next();
639      CacheConfig cacheConf = store.getCacheConfig();
640      cacheConf.setCacheDataOnWrite(true);
641      cacheConf.setEvictOnClose(true);
642      BlockCache cache = cacheConf.getBlockCache().get();
643
644      Put put = new Put(ROW);
645      put.addColumn(FAMILY, QUALIFIER, data);
646      table.put(put);
647      region.flush(true);
648      put = new Put(ROW1);
649      put.addColumn(FAMILY, QUALIFIER, data);
650      table.put(put);
651      region.flush(true);
652      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
653      put = new Put(ROW);
654      put.addColumn(FAMILY, QUALIFIER2, data2);
655      table.put(put);
656      region.flush(true);
657      // flush the data
658      System.out.println("Flushing cache");
659      // Should create one Hfile with 2 blocks
660      CustomInnerRegionObserver.waitForGets.set(true);
661      // Create three sets of gets
662      MultiGetThread[] getThreads = initiateMultiGet(table);
663      Thread.sleep(200);
664      int refCount;
665      Iterator<CachedBlock> iterator = cache.iterator();
666      boolean foundNonZeroBlock = false;
667      while (iterator.hasNext()) {
668        CachedBlock next = iterator.next();
669        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
670        if (cache instanceof BucketCache) {
671          refCount = ((BucketCache) cache).getRefCount(cacheKey);
672        } else if (cache instanceof CombinedBlockCache) {
673          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
674        } else {
675          continue;
676        }
677        if (refCount != 0) {
678          assertEquals(NO_OF_THREADS, refCount);
679          foundNonZeroBlock = true;
680        }
681      }
682      assertTrue("Should have found nonzero ref count block",foundNonZeroBlock);
683      CustomInnerRegionObserver.getCdl().get().countDown();
684      CustomInnerRegionObserver.getCdl().get().countDown();
685      for (MultiGetThread thread : getThreads) {
686        thread.join();
687      }
688      // Verify whether the gets have returned the blocks that it had
689      CustomInnerRegionObserver.waitForGets.set(true);
690      // giving some time for the block to be decremented
691      iterateBlockCache(cache, iterator);
692      getLatch.countDown();
693      System.out.println("Gets should have returned the bloks");
694    } finally {
695      if (table != null) {
696        table.close();
697      }
698    }
699  }
700  @Test
701  public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
702    Table table = null;
703    try {
704      latch = new CountDownLatch(1);
705      // Check if get() returns blocks on its close() itself
706      final TableName tableName = TableName.valueOf(name.getMethodName());
707      // Create KV that will give you two blocks
708      // Create a table with block size as 1024
709      byte[][] fams = new byte[10][];
710      fams[0] = FAMILY;
711      for (int i = 1; i < 10; i++) {
712        fams[i] = (Bytes.toBytes("testFamily" + i));
713      }
714      table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
715          CustomInnerRegionObserver.class.getName());
716      // get the block cache and region
717      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
718      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
719      HRegion region =
720          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
721      BlockCache cache = setCacheProperties(region);
722
723      Put put = new Put(ROW);
724      put.addColumn(FAMILY, QUALIFIER, data);
725      table.put(put);
726      region.flush(true);
727      put = new Put(ROW1);
728      put.addColumn(FAMILY, QUALIFIER, data);
729      table.put(put);
730      region.flush(true);
731      for (int i = 1; i < 10; i++) {
732        put = new Put(ROW);
733        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
734        table.put(put);
735        if (i % 2 == 0) {
736          region.flush(true);
737        }
738      }
739      region.flush(true);
740      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
741      put = new Put(ROW);
742      put.addColumn(FAMILY, QUALIFIER2, data2);
743      table.put(put);
744      region.flush(true);
745      // flush the data
746      System.out.println("Flushing cache");
747      // Should create one Hfile with 2 blocks
748      // Create three sets of gets
749      ScanThread[] scanThreads = initiateScan(table, true);
750      Thread.sleep(200);
751      Iterator<CachedBlock> iterator = cache.iterator();
752      boolean usedBlocksFound = false;
753      int refCount = 0;
754      int noOfBlocksWithRef = 0;
755      while (iterator.hasNext()) {
756        CachedBlock next = iterator.next();
757        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
758        if (cache instanceof BucketCache) {
759          refCount = ((BucketCache) cache).getRefCount(cacheKey);
760        } else if (cache instanceof CombinedBlockCache) {
761          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
762        } else {
763          continue;
764        }
765        if (refCount != 0) {
766          // Blocks will be with count 3
767          System.out.println("The refCount is " + refCount);
768          assertEquals(NO_OF_THREADS, refCount);
769          usedBlocksFound = true;
770          noOfBlocksWithRef++;
771        }
772      }
773      assertTrue(usedBlocksFound);
774      // the number of blocks referred
775      assertEquals(12, noOfBlocksWithRef);
776      CustomInnerRegionObserver.getCdl().get().countDown();
777      for (ScanThread thread : scanThreads) {
778        thread.join();
779      }
780      // giving some time for the block to be decremented
781      checkForBlockEviction(cache, true, false);
782    } finally {
783      if (table != null) {
784        table.close();
785      }
786    }
787  }
788
789  private BlockCache setCacheProperties(HRegion region) {
790    Iterator<HStore> strItr = region.getStores().iterator();
791    BlockCache cache = null;
792    while (strItr.hasNext()) {
793      HStore store = strItr.next();
794      CacheConfig cacheConf = store.getCacheConfig();
795      cacheConf.setCacheDataOnWrite(true);
796      cacheConf.setEvictOnClose(true);
797      // Use the last one
798      cache = cacheConf.getBlockCache().get();
799    }
800    return cache;
801  }
802
803  @Test
804  public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException,
805      InterruptedException {
806    Table table = null;
807    try {
808      latch = new CountDownLatch(2);
809      // Check if get() returns blocks on its close() itself
810      getLatch = new CountDownLatch(1);
811      final TableName tableName = TableName.valueOf(name.getMethodName());
812      // Create KV that will give you two blocks
813      // Create a table with block size as 1024
814      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
815          CustomInnerRegionObserverWrapper.class.getName());
816      // get the block cache and region
817      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
818      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
819      HRegion region =
820          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
821      HStore store = region.getStores().iterator().next();
822      CacheConfig cacheConf = store.getCacheConfig();
823      cacheConf.setCacheDataOnWrite(true);
824      cacheConf.setEvictOnClose(true);
825      BlockCache cache = cacheConf.getBlockCache().get();
826
827      // insert data. 2 Rows are added
828      insertData(table);
829      // flush the data
830      System.out.println("Flushing cache");
831      // Should create one Hfile with 2 blocks
832      region.flush(true);
833      // CustomInnerRegionObserver.sleepTime.set(5000);
834      // Create three sets of scan
835      CustomInnerRegionObserver.waitForGets.set(true);
836      ScanThread[] scanThreads = initiateScan(table, false);
837      // Create three sets of gets
838      GetThread[] getThreads = initiateGet(table, false, false);
839      // The block would have been decremented for the scan case as it was
840      // wrapped
841      // before even the postNext hook gets executed.
842      // giving some time for the block to be decremented
843      Thread.sleep(100);
844      CustomInnerRegionObserver.waitForGets.set(false);
845      checkForBlockEviction(cache, false, false);
846      // countdown the latch
847      CustomInnerRegionObserver.getCdl().get().countDown();
848      for (GetThread thread : getThreads) {
849        thread.join();
850      }
851      getLatch.countDown();
852      for (ScanThread thread : scanThreads) {
853        thread.join();
854      }
855    } finally {
856      if (table != null) {
857        table.close();
858      }
859    }
860  }
861
862  @Test
863  public void testScanWithCompaction() throws IOException, InterruptedException {
864    testScanWithCompactionInternals(name.getMethodName(), false);
865  }
866
867  @Test
868  public void testReverseScanWithCompaction() throws IOException, InterruptedException {
869    testScanWithCompactionInternals(name.getMethodName(), true);
870  }
871
872  private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
873      throws IOException, InterruptedException {
874    Table table = null;
875    try {
876      latch = new CountDownLatch(1);
877      compactionLatch = new CountDownLatch(1);
878      TableName tableName = TableName.valueOf(tableNameStr);
879      // Create a table with block size as 1024
880      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
881          CustomInnerRegionObserverWrapper.class.getName());
882      // get the block cache and region
883      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
884      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
885      HRegion region =
886          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
887      HStore store = region.getStores().iterator().next();
888      CacheConfig cacheConf = store.getCacheConfig();
889      cacheConf.setCacheDataOnWrite(true);
890      cacheConf.setEvictOnClose(true);
891      BlockCache cache = cacheConf.getBlockCache().get();
892
893      // insert data. 2 Rows are added
894      Put put = new Put(ROW);
895      put.addColumn(FAMILY, QUALIFIER, data);
896      table.put(put);
897      put = new Put(ROW1);
898      put.addColumn(FAMILY, QUALIFIER, data);
899      table.put(put);
900      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
901      // Should create one Hfile with 2 blocks
902      region.flush(true);
903      // read the data and expect same blocks, one new hit, no misses
904      int refCount = 0;
905      // Check how this miss is happening
906      // insert a second column, read the row, no new blocks, 3 new hits
907      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
908      byte[] data2 = Bytes.add(data, data);
909      put = new Put(ROW);
910      put.addColumn(FAMILY, QUALIFIER2, data2);
911      table.put(put);
912      // flush, one new block
913      System.out.println("Flushing cache");
914      region.flush(true);
915      Iterator<CachedBlock> iterator = cache.iterator();
916      iterateBlockCache(cache, iterator);
917      // Create three sets of scan
918      ScanThread[] scanThreads = initiateScan(table, reversed);
919      Thread.sleep(100);
920      iterator = cache.iterator();
921      boolean usedBlocksFound = false;
922      while (iterator.hasNext()) {
923        CachedBlock next = iterator.next();
924        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
925        if (cache instanceof BucketCache) {
926          refCount = ((BucketCache) cache).getRefCount(cacheKey);
927        } else if (cache instanceof CombinedBlockCache) {
928          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
929        } else {
930          continue;
931        }
932        if (refCount != 0) {
933          // Blocks will be with count 3
934          assertEquals(NO_OF_THREADS, refCount);
935          usedBlocksFound = true;
936        }
937      }
938      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
939      usedBlocksFound = false;
940      System.out.println("Compacting");
941      assertEquals(2, store.getStorefilesCount());
942      store.triggerMajorCompaction();
943      region.compact(true);
944      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
945      assertEquals(1, store.getStorefilesCount());
946      // Even after compaction is done we will have some blocks that cannot
947      // be evicted this is because the scan is still referencing them
948      iterator = cache.iterator();
949      while (iterator.hasNext()) {
950        CachedBlock next = iterator.next();
951        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
952        if (cache instanceof BucketCache) {
953          refCount = ((BucketCache) cache).getRefCount(cacheKey);
954        } else if (cache instanceof CombinedBlockCache) {
955          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
956        } else {
957          continue;
958        }
959        if (refCount != 0) {
960          // Blocks will be with count 3 as they are not yet cleared
961          assertEquals(NO_OF_THREADS, refCount);
962          usedBlocksFound = true;
963        }
964      }
965      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
966      // Should not throw exception
967      compactionLatch.countDown();
968      latch.countDown();
969      for (ScanThread thread : scanThreads) {
970        thread.join();
971      }
972      // by this time all blocks should have been evicted
973      iterator = cache.iterator();
974      iterateBlockCache(cache, iterator);
975      Result r = table.get(new Get(ROW));
976      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
977      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
978      // The gets would be working on new blocks
979      iterator = cache.iterator();
980      iterateBlockCache(cache, iterator);
981    } finally {
982      if (table != null) {
983        table.close();
984      }
985    }
986  }
987
988  @Test
989  public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
990      throws IOException, InterruptedException {
991    // do flush and scan in parallel
992    Table table = null;
993    try {
994      latch = new CountDownLatch(1);
995      compactionLatch = new CountDownLatch(1);
996      final TableName tableName = TableName.valueOf(name.getMethodName());
997      // Create a table with block size as 1024
998      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
999          CustomInnerRegionObserverWrapper.class.getName());
1000      // get the block cache and region
1001      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1002      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
1003      HRegion region =
1004          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1005      HStore store = region.getStores().iterator().next();
1006      CacheConfig cacheConf = store.getCacheConfig();
1007      cacheConf.setCacheDataOnWrite(true);
1008      cacheConf.setEvictOnClose(true);
1009      BlockCache cache = cacheConf.getBlockCache().get();
1010
1011      // insert data. 2 Rows are added
1012      Put put = new Put(ROW);
1013      put.addColumn(FAMILY, QUALIFIER, data);
1014      table.put(put);
1015      put = new Put(ROW1);
1016      put.addColumn(FAMILY, QUALIFIER, data);
1017      table.put(put);
1018      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1019      // Should create one Hfile with 2 blocks
1020      region.flush(true);
1021      // read the data and expect same blocks, one new hit, no misses
1022      int refCount = 0;
1023      // Check how this miss is happening
1024      // insert a second column, read the row, no new blocks, 3 new hits
1025      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1026      byte[] data2 = Bytes.add(data, data);
1027      put = new Put(ROW);
1028      put.addColumn(FAMILY, QUALIFIER2, data2);
1029      table.put(put);
1030      // flush, one new block
1031      System.out.println("Flushing cache");
1032      region.flush(true);
1033      Iterator<CachedBlock> iterator = cache.iterator();
1034      iterateBlockCache(cache, iterator);
1035      // Create three sets of scan
1036      ScanThread[] scanThreads = initiateScan(table, false);
1037      Thread.sleep(100);
1038      iterator = cache.iterator();
1039      boolean usedBlocksFound = false;
1040      while (iterator.hasNext()) {
1041        CachedBlock next = iterator.next();
1042        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1043        if (cache instanceof BucketCache) {
1044          refCount = ((BucketCache) cache).getRefCount(cacheKey);
1045        } else if (cache instanceof CombinedBlockCache) {
1046          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1047        } else {
1048          continue;
1049        }
1050        if (refCount != 0) {
1051          // Blocks will be with count 3
1052          assertEquals(NO_OF_THREADS, refCount);
1053          usedBlocksFound = true;
1054        }
1055      }
1056      // Make a put and do a flush
1057      QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1058      data2 = Bytes.add(data, data);
1059      put = new Put(ROW1);
1060      put.addColumn(FAMILY, QUALIFIER2, data2);
1061      table.put(put);
1062      // flush, one new block
1063      System.out.println("Flushing cache");
1064      region.flush(true);
1065      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1066      usedBlocksFound = false;
1067      System.out.println("Compacting");
1068      assertEquals(3, store.getStorefilesCount());
1069      store.triggerMajorCompaction();
1070      region.compact(true);
1071      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1072      assertEquals(1, store.getStorefilesCount());
1073      // Even after compaction is done we will have some blocks that cannot
1074      // be evicted this is because the scan is still referencing them
1075      iterator = cache.iterator();
1076      while (iterator.hasNext()) {
1077        CachedBlock next = iterator.next();
1078        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1079        if (cache instanceof BucketCache) {
1080          refCount = ((BucketCache) cache).getRefCount(cacheKey);
1081        } else if (cache instanceof CombinedBlockCache) {
1082          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1083        } else {
1084          continue;
1085        }
1086        if (refCount != 0) {
1087          // Blocks will be with count 3 as they are not yet cleared
1088          assertEquals(NO_OF_THREADS, refCount);
1089          usedBlocksFound = true;
1090        }
1091      }
1092      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1093      // Should not throw exception
1094      compactionLatch.countDown();
1095      latch.countDown();
1096      for (ScanThread thread : scanThreads) {
1097        thread.join();
1098      }
1099      // by this time all blocks should have been evicted
1100      iterator = cache.iterator();
1101      // Since a flush and compaction happened after a scan started
1102      // we need to ensure that all the original blocks of the compacted file
1103      // is also removed.
1104      iterateBlockCache(cache, iterator);
1105      Result r = table.get(new Get(ROW));
1106      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1107      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1108      // The gets would be working on new blocks
1109      iterator = cache.iterator();
1110      iterateBlockCache(cache, iterator);
1111    } finally {
1112      if (table != null) {
1113        table.close();
1114      }
1115    }
1116  }
1117
1118
1119  @Test
1120  public void testScanWithException() throws IOException, InterruptedException {
1121    Table table = null;
1122    try {
1123      latch = new CountDownLatch(1);
1124      exceptionLatch = new CountDownLatch(1);
1125      final TableName tableName = TableName.valueOf(name.getMethodName());
1126      // Create KV that will give you two blocks
1127      // Create a table with block size as 1024
1128      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1129          CustomInnerRegionObserverWrapper.class.getName());
1130      // get the block cache and region
1131      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1132      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
1133      HRegion region =
1134          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1135      HStore store = region.getStores().iterator().next();
1136      CacheConfig cacheConf = store.getCacheConfig();
1137      cacheConf.setCacheDataOnWrite(true);
1138      cacheConf.setEvictOnClose(true);
1139      BlockCache cache = cacheConf.getBlockCache().get();
1140      // insert data. 2 Rows are added
1141      insertData(table);
1142      // flush the data
1143      System.out.println("Flushing cache");
1144      // Should create one Hfile with 2 blocks
1145      region.flush(true);
1146      // CustomInnerRegionObserver.sleepTime.set(5000);
1147      CustomInnerRegionObserver.throwException.set(true);
1148      ScanThread[] scanThreads = initiateScan(table, false);
1149      // The block would have been decremented for the scan case as it was
1150      // wrapped
1151      // before even the postNext hook gets executed.
1152      // giving some time for the block to be decremented
1153      Thread.sleep(100);
1154      Iterator<CachedBlock> iterator = cache.iterator();
1155      boolean usedBlocksFound = false;
1156      int refCount = 0;
1157      while (iterator.hasNext()) {
1158        CachedBlock next = iterator.next();
1159        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1160        if (cache instanceof BucketCache) {
1161          refCount = ((BucketCache) cache).getRefCount(cacheKey);
1162        } else if (cache instanceof CombinedBlockCache) {
1163          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1164        } else {
1165          continue;
1166        }
1167        if (refCount != 0) {
1168          // Blocks will be with count 3
1169          assertEquals(NO_OF_THREADS, refCount);
1170          usedBlocksFound = true;
1171        }
1172      }
1173      assertTrue(usedBlocksFound);
1174      exceptionLatch.countDown();
1175      // countdown the latch
1176      CustomInnerRegionObserver.getCdl().get().countDown();
1177      for (ScanThread thread : scanThreads) {
1178        thread.join();
1179      }
1180      iterator = cache.iterator();
1181      usedBlocksFound = false;
1182      refCount = 0;
1183      while (iterator.hasNext()) {
1184        CachedBlock next = iterator.next();
1185        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1186        if (cache instanceof BucketCache) {
1187          refCount = ((BucketCache) cache).getRefCount(cacheKey);
1188        } else if (cache instanceof CombinedBlockCache) {
1189          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1190        } else {
1191          continue;
1192        }
1193        if (refCount != 0) {
1194          // Blocks will be with count 3
1195          assertEquals(NO_OF_THREADS, refCount);
1196          usedBlocksFound = true;
1197        }
1198      }
1199      assertFalse(usedBlocksFound);
1200      // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1201      assertEquals(0, refCount);
1202    } finally {
1203      if (table != null) {
1204        table.close();
1205      }
1206    }
1207  }
1208
1209  private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1210    int refCount;
1211    while (iterator.hasNext()) {
1212      CachedBlock next = iterator.next();
1213      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1214      if (cache instanceof BucketCache) {
1215        refCount = ((BucketCache) cache).getRefCount(cacheKey);
1216      } else if (cache instanceof CombinedBlockCache) {
1217        refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1218      } else {
1219        continue;
1220      }
1221      assertEquals(0, refCount);
1222    }
1223  }
1224
1225  private void insertData(Table table) throws IOException {
1226    Put put = new Put(ROW);
1227    put.addColumn(FAMILY, QUALIFIER, data);
1228    table.put(put);
1229    put = new Put(ROW1);
1230    put.addColumn(FAMILY, QUALIFIER, data);
1231    table.put(put);
1232    byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1233    put = new Put(ROW);
1234    put.addColumn(FAMILY, QUALIFIER2, data2);
1235    table.put(put);
1236  }
1237
1238  private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException,
1239      InterruptedException {
1240    ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1241    for (int i = 0; i < NO_OF_THREADS; i++) {
1242      scanThreads[i] = new ScanThread(table, reverse);
1243    }
1244    for (ScanThread thread : scanThreads) {
1245      thread.start();
1246    }
1247    return scanThreads;
1248  }
1249
1250  private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1251      throws IOException, InterruptedException {
1252    GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1253    for (int i = 0; i < NO_OF_THREADS; i++) {
1254      getThreads[i] = new GetThread(table, tracker, multipleCFs);
1255    }
1256    for (GetThread thread : getThreads) {
1257      thread.start();
1258    }
1259    return getThreads;
1260  }
1261
1262  private MultiGetThread[] initiateMultiGet(Table table)
1263      throws IOException, InterruptedException {
1264    MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1265    for (int i = 0; i < NO_OF_THREADS; i++) {
1266      multiGetThreads[i] = new MultiGetThread(table);
1267    }
1268    for (MultiGetThread thread : multiGetThreads) {
1269      thread.start();
1270    }
1271    return multiGetThreads;
1272  }
1273
1274  private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1275      throws InterruptedException {
1276    int counter = NO_OF_THREADS;
1277    if (CustomInnerRegionObserver.waitForGets.get()) {
1278      // Because only one row is selected, it has only 2 blocks
1279      counter = counter - 1;
1280      while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1281        Thread.sleep(100);
1282      }
1283    } else {
1284      while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1285        Thread.sleep(100);
1286      }
1287    }
1288    Iterator<CachedBlock> iterator = cache.iterator();
1289    int refCount = 0;
1290    while (iterator.hasNext()) {
1291      CachedBlock next = iterator.next();
1292      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1293      if (cache instanceof BucketCache) {
1294        refCount = ((BucketCache) cache).getRefCount(cacheKey);
1295      } else if (cache instanceof CombinedBlockCache) {
1296        refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1297      } else {
1298        continue;
1299      }
1300      System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1301      if (CustomInnerRegionObserver.waitForGets.get()) {
1302        if (expectOnlyZero) {
1303          assertTrue(refCount == 0);
1304        }
1305        if (refCount != 0) {
1306          // Because the scan would have also touched up on these blocks but
1307          // it
1308          // would have touched
1309          // all 3
1310          if (getClosed) {
1311            // If get has closed only the scan's blocks would be available
1312            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1313          } else {
1314              assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1315          }
1316        }
1317      } else {
1318        // Because the get would have also touched up on these blocks but it
1319        // would have touched
1320        // upon only 2 additionally
1321        if (expectOnlyZero) {
1322          assertTrue(refCount == 0);
1323        }
1324        if (refCount != 0) {
1325          if (getLatch == null) {
1326            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1327          } else {
1328            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1329          }
1330        }
1331      }
1332    }
1333    CustomInnerRegionObserver.getCdl().get().countDown();
1334  }
1335
1336  private static class MultiGetThread extends Thread {
1337    private final Table table;
1338    private final List<Get> gets = new ArrayList<>();
1339    public MultiGetThread(Table table) {
1340      this.table = table;
1341    }
1342    @Override
1343    public void run() {
1344      gets.add(new Get(ROW));
1345      gets.add(new Get(ROW1));
1346      try {
1347        CustomInnerRegionObserver.getCdl().set(latch);
1348        Result[] r = table.get(gets);
1349        assertTrue(Bytes.equals(r[0].getRow(), ROW));
1350        assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1351      } catch (IOException e) {
1352      }
1353    }
1354  }
1355
1356  private static class GetThread extends Thread {
1357    private final Table table;
1358    private final boolean tracker;
1359    private final boolean multipleCFs;
1360
1361    public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1362      this.table = table;
1363      this.tracker = tracker;
1364      this.multipleCFs = multipleCFs;
1365    }
1366
1367    @Override
1368    public void run() {
1369      try {
1370        initiateGet(table);
1371      } catch (IOException e) {
1372        // do nothing
1373      }
1374    }
1375
1376    private void initiateGet(Table table) throws IOException {
1377      Get get = new Get(ROW);
1378      if (tracker) {
1379        // Change this
1380        if (!multipleCFs) {
1381          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1382          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1383          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1384          // Unknown key
1385          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1386        } else {
1387          get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1388          get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1389          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1390          // Unknown key
1391          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1392        }
1393      }
1394      CustomInnerRegionObserver.getCdl().set(latch);
1395      Result r = table.get(get);
1396      System.out.println(r);
1397      if (!tracker) {
1398        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1399        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1400      } else {
1401        if (!multipleCFs) {
1402          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1403          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1404          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1405        } else {
1406          assertTrue(Bytes.equals(
1407              r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1408              data2));
1409          assertTrue(Bytes.equals(
1410              r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1411              data2));
1412          assertTrue(Bytes.equals(
1413              r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1414              data2));
1415        }
1416      }
1417    }
1418  }
1419
1420  private static class ScanThread extends Thread {
1421    private final Table table;
1422    private final boolean reverse;
1423
1424    public ScanThread(Table table, boolean reverse) {
1425      this.table = table;
1426      this.reverse = reverse;
1427    }
1428
1429    @Override
1430    public void run() {
1431      try {
1432        initiateScan(table);
1433      } catch (IOException e) {
1434        // do nothing
1435      }
1436    }
1437
1438    private void initiateScan(Table table) throws IOException {
1439      Scan scan = new Scan();
1440      if (reverse) {
1441        scan.setReversed(true);
1442      }
1443      CustomInnerRegionObserver.getCdl().set(latch);
1444      ResultScanner resScanner = table.getScanner(scan);
1445      int i = (reverse ? ROWS.length - 1 : 0);
1446      boolean resultFound = false;
1447      for (Result result : resScanner) {
1448        resultFound = true;
1449        System.out.println(result);
1450        if (!reverse) {
1451          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1452          i++;
1453        } else {
1454          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1455          i--;
1456        }
1457      }
1458      assertTrue(resultFound);
1459    }
1460  }
1461
1462  private void waitForStoreFileCount(HStore store, int count, int timeout)
1463      throws InterruptedException {
1464    long start = System.currentTimeMillis();
1465    while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) {
1466      Thread.sleep(100);
1467    }
1468    System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" +
1469        store.getStorefilesCount());
1470    assertEquals(count, store.getStorefilesCount());
1471  }
1472
1473  private static class CustomScanner implements RegionScanner {
1474
1475    private RegionScanner delegate;
1476
1477    public CustomScanner(RegionScanner delegate) {
1478      this.delegate = delegate;
1479    }
1480
1481    @Override
1482    public boolean next(List<Cell> results) throws IOException {
1483      return delegate.next(results);
1484    }
1485
1486    @Override
1487    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
1488      return delegate.next(result, scannerContext);
1489    }
1490
1491    @Override
1492    public boolean nextRaw(List<Cell> result) throws IOException {
1493      return delegate.nextRaw(result);
1494    }
1495
1496    @Override
1497    public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException {
1498      boolean nextRaw = delegate.nextRaw(result, context);
1499      if (compactionLatch != null && compactionLatch.getCount() > 0) {
1500        try {
1501          compactionLatch.await();
1502        } catch (InterruptedException ie) {
1503        }
1504      }
1505
1506      if (CustomInnerRegionObserver.throwException.get()) {
1507        if (exceptionLatch.getCount() > 0) {
1508          try {
1509            exceptionLatch.await();
1510          } catch (InterruptedException e) {
1511          }
1512          throw new IOException("throw exception");
1513        }
1514      }
1515      return nextRaw;
1516    }
1517
1518    @Override
1519    public void close() throws IOException {
1520      delegate.close();
1521    }
1522
1523    @Override
1524    public RegionInfo getRegionInfo() {
1525      return delegate.getRegionInfo();
1526    }
1527
1528    @Override
1529    public boolean isFilterDone() throws IOException {
1530      return delegate.isFilterDone();
1531    }
1532
1533    @Override
1534    public boolean reseek(byte[] row) throws IOException {
1535      return false;
1536    }
1537
1538    @Override
1539    public long getMaxResultSize() {
1540      return delegate.getMaxResultSize();
1541    }
1542
1543    @Override
1544    public long getMvccReadPoint() {
1545      return delegate.getMvccReadPoint();
1546    }
1547
1548    @Override
1549    public int getBatch() {
1550      return delegate.getBatch();
1551    }
1552  }
1553
1554  public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1555    @Override
1556    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
1557        Scan scan, RegionScanner s) throws IOException {
1558      return new CustomScanner(s);
1559    }
1560  }
1561
1562  public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1563    static final AtomicLong sleepTime = new AtomicLong(0);
1564    static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
1565    static final AtomicInteger countOfNext = new AtomicInteger(0);
1566    static final AtomicInteger countOfGets = new AtomicInteger(0);
1567    static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1568    static final AtomicBoolean throwException = new AtomicBoolean(false);
1569    private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(
1570        new CountDownLatch(0));
1571
1572    @Override
1573    public Optional<RegionObserver> getRegionObserver() {
1574      return Optional.of(this);
1575    }
1576
1577    @Override
1578    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
1579        InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1580      slowdownCode(e, false);
1581      if (getLatch != null && getLatch.getCount() > 0) {
1582        try {
1583          getLatch.await();
1584        } catch (InterruptedException e1) {
1585        }
1586      }
1587      return hasMore;
1588    }
1589
1590    @Override
1591    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
1592        List<Cell> results) throws IOException {
1593      slowdownCode(e, true);
1594    }
1595
1596    public static AtomicReference<CountDownLatch> getCdl() {
1597      return cdl;
1598    }
1599
1600    private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e,
1601        boolean isGet) {
1602      CountDownLatch latch = getCdl().get();
1603      try {
1604        System.out.println(latch.getCount() + " is the count " + isGet);
1605        if (latch.getCount() > 0) {
1606          if (isGet) {
1607            countOfGets.incrementAndGet();
1608          } else {
1609            countOfNext.incrementAndGet();
1610          }
1611          LOG.info("Waiting for the counterCountDownLatch");
1612          latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1613          if (latch.getCount() > 0) {
1614            throw new RuntimeException("Can't wait more");
1615          }
1616        }
1617      } catch (InterruptedException e1) {
1618        LOG.error(e1.toString(), e1);
1619      }
1620    }
1621  }
1622}