001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicReference;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.ExtendedCell;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
044import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
045import org.apache.hadoop.hbase.coprocessor.ObserverContext;
046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
048import org.apache.hadoop.hbase.coprocessor.RegionObserver;
049import org.apache.hadoop.hbase.io.hfile.BlockCache;
050import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
051import org.apache.hadoop.hbase.io.hfile.CacheConfig;
052import org.apache.hadoop.hbase.io.hfile.CachedBlock;
053import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
054import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
055import org.apache.hadoop.hbase.regionserver.BloomType;
056import org.apache.hadoop.hbase.regionserver.HRegion;
057import org.apache.hadoop.hbase.regionserver.HStore;
058import org.apache.hadoop.hbase.regionserver.InternalScanner;
059import org.apache.hadoop.hbase.regionserver.RegionScanner;
060import org.apache.hadoop.hbase.regionserver.ScannerContext;
061import org.apache.hadoop.hbase.testclassification.ClientTests;
062import org.apache.hadoop.hbase.testclassification.LargeTests;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
065import org.junit.jupiter.api.AfterAll;
066import org.junit.jupiter.api.AfterEach;
067import org.junit.jupiter.api.BeforeAll;
068import org.junit.jupiter.api.BeforeEach;
069import org.junit.jupiter.api.Tag;
070import org.junit.jupiter.api.Test;
071import org.junit.jupiter.api.TestInfo;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
076
077@Tag(LargeTests.TAG)
078@Tag(ClientTests.TAG)
079public class TestBlockEvictionFromClient {
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
082  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
083  static byte[][] ROWS = new byte[2][];
084  private static int NO_OF_THREADS = 3;
085  private static byte[] ROW = Bytes.toBytes("testRow");
086  private static byte[] ROW1 = Bytes.toBytes("testRow1");
087  private static byte[] ROW2 = Bytes.toBytes("testRow2");
088  private static byte[] ROW3 = Bytes.toBytes("testRow3");
089  private static byte[] FAMILY = Bytes.toBytes("testFamily");
090  private static byte[][] FAMILIES_1 = new byte[1][0];
091  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
092  private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
093  private static byte[] data = new byte[1000];
094  private static byte[] data2 = Bytes.add(data, data);
095  protected static int SLAVES = 1;
096  private static CountDownLatch latch;
097  private static CountDownLatch getLatch;
098  private static CountDownLatch compactionLatch;
099  private static CountDownLatch exceptionLatch;
100  private String methodName;
101
102  @BeforeEach
103  public void setUp(TestInfo testInfo) throws Exception {
104    this.methodName = testInfo.getTestMethod().get().getName();
105    CustomInnerRegionObserver.waitForGets.set(false);
106    CustomInnerRegionObserver.countOfNext.set(0);
107    CustomInnerRegionObserver.countOfGets.set(0);
108  }
109
110  @BeforeAll
111  public static void setUpBeforeClass() throws Exception {
112    ROWS[0] = ROW;
113    ROWS[1] = ROW1;
114    Configuration conf = TEST_UTIL.getConfiguration();
115    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
116      MultiRowMutationEndpoint.class.getName());
117    conf.setInt("hbase.regionserver.handler.count", 20);
118    conf.setInt("hbase.bucketcache.size", 400);
119    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
120    conf.setFloat("hfile.block.cache.size", 0.2f);
121    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
122    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
123    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
124    FAMILIES_1[0] = FAMILY;
125    TEST_UTIL.startMiniCluster(SLAVES);
126  }
127
128  /**
129   * @throws java.lang.Exception
130   */
131  @AfterAll
132  public static void tearDownAfterClass() throws Exception {
133    TEST_UTIL.shutdownMiniCluster();
134  }
135
136  /**
137   * @throws java.lang.Exception
138   */
139  @AfterEach
140  public void tearDown() throws Exception {
141    if (latch != null) {
142      while (latch.getCount() > 0) {
143        latch.countDown();
144      }
145    }
146    if (getLatch != null) {
147      getLatch.countDown();
148    }
149    if (compactionLatch != null) {
150      compactionLatch.countDown();
151    }
152    if (exceptionLatch != null) {
153      exceptionLatch.countDown();
154    }
155    latch = null;
156    getLatch = null;
157    compactionLatch = null;
158    exceptionLatch = null;
159    CustomInnerRegionObserver.throwException.set(false);
160    // Clean up the tables for every test case
161    TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
162    for (TableName tableName : listTableNames) {
163      if (!tableName.isSystemTable()) {
164        TEST_UTIL.getAdmin().disableTable(tableName);
165        TEST_UTIL.getAdmin().deleteTable(tableName);
166      }
167    }
168  }
169
170  @Test
171  public void testBlockEvictionWithParallelScans() throws Exception {
172    Table table = null;
173    try {
174      latch = new CountDownLatch(1);
175      final TableName tableName = TableName.valueOf(methodName);
176      // Create a table with block size as 1024
177      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
178        CustomInnerRegionObserver.class.getName());
179      // get the block cache and region
180      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
181      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
182      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
183      HStore store = region.getStores().iterator().next();
184      CacheConfig cacheConf = store.getCacheConfig();
185      cacheConf.setCacheDataOnWrite(true);
186      cacheConf.setEvictOnClose(true);
187      BlockCache cache = cacheConf.getBlockCache().get();
188
189      // insert data. 2 Rows are added
190      Put put = new Put(ROW);
191      put.addColumn(FAMILY, QUALIFIER, data);
192      table.put(put);
193      put = new Put(ROW1);
194      put.addColumn(FAMILY, QUALIFIER, data);
195      table.put(put);
196      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
197      // data was in memstore so don't expect any changes
198      // flush the data
199      // Should create one Hfile with 2 blocks
200      region.flush(true);
201      // Load cache
202      // Create three sets of scan
203      ScanThread[] scanThreads = initiateScan(table, false);
204      Thread.sleep(100);
205      checkForBlockEviction(cache, false, false);
206      for (ScanThread thread : scanThreads) {
207        thread.join();
208      }
209      // CustomInnerRegionObserver.sleepTime.set(0);
210      Iterator<CachedBlock> iterator = cache.iterator();
211      iterateBlockCache(cache, iterator);
212      // read the data and expect same blocks, one new hit, no misses
213      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
214      iterator = cache.iterator();
215      iterateBlockCache(cache, iterator);
216      // Check how this miss is happening
217      // insert a second column, read the row, no new blocks, 3 new hits
218      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
219      byte[] data2 = Bytes.add(data, data);
220      put = new Put(ROW);
221      put.addColumn(FAMILY, QUALIFIER2, data2);
222      table.put(put);
223      Result r = table.get(new Get(ROW));
224      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
225      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
226      iterator = cache.iterator();
227      iterateBlockCache(cache, iterator);
228      // flush, one new block
229      LOG.info("Flushing cache");
230      region.flush(true);
231      iterator = cache.iterator();
232      iterateBlockCache(cache, iterator);
233      // compact, net minus two blocks, two hits, no misses
234      LOG.info("Compacting");
235      assertEquals(2, store.getStorefilesCount());
236      store.triggerMajorCompaction();
237      region.compact(true);
238      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
239      assertEquals(1, store.getStorefilesCount());
240      iterator = cache.iterator();
241      iterateBlockCache(cache, iterator);
242      // read the row, this should be a cache miss because we don't cache data
243      // blocks on compaction
244      r = table.get(new Get(ROW));
245      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
246      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
247      iterator = cache.iterator();
248      iterateBlockCache(cache, iterator);
249    } finally {
250      if (table != null) {
251        table.close();
252      }
253    }
254  }
255
256  @Test
257  public void testParallelGetsAndScans() throws IOException, InterruptedException {
258    Table table = null;
259    try {
260      latch = new CountDownLatch(2);
261      // Check if get() returns blocks on its close() itself
262      getLatch = new CountDownLatch(1);
263      final TableName tableName = TableName.valueOf(methodName);
264      // Create KV that will give you two blocks
265      // Create a table with block size as 1024
266      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
267        CustomInnerRegionObserver.class.getName());
268      // get the block cache and region
269      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
270      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
271      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
272      HStore store = region.getStores().iterator().next();
273      CacheConfig cacheConf = store.getCacheConfig();
274      cacheConf.setCacheDataOnWrite(true);
275      cacheConf.setEvictOnClose(true);
276      BlockCache cache = cacheConf.getBlockCache().get();
277
278      insertData(table);
279      // flush the data
280      LOG.info("Flushing cache");
281      // Should create one Hfile with 2 blocks
282      region.flush(true);
283      // Create three sets of scan
284      CustomInnerRegionObserver.waitForGets.set(true);
285      ScanThread[] scanThreads = initiateScan(table, false);
286      // Create three sets of gets
287      GetThread[] getThreads = initiateGet(table, false, false);
288      checkForBlockEviction(cache, false, false);
289      CustomInnerRegionObserver.waitForGets.set(false);
290      checkForBlockEviction(cache, false, false);
291      for (GetThread thread : getThreads) {
292        thread.join();
293      }
294      // Verify whether the gets have returned the blocks that it had
295      CustomInnerRegionObserver.waitForGets.set(true);
296      // giving some time for the block to be decremented
297      checkForBlockEviction(cache, true, false);
298      getLatch.countDown();
299      for (ScanThread thread : scanThreads) {
300        thread.join();
301      }
302      LOG.info("Scans should have returned the bloks");
303      // Check with either true or false
304      CustomInnerRegionObserver.waitForGets.set(false);
305      // The scan should also have released the blocks by now
306      checkForBlockEviction(cache, true, true);
307    } finally {
308      if (table != null) {
309        table.close();
310      }
311    }
312  }
313
314  @Test
315  public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
316    Table table = null;
317    try {
318      latch = new CountDownLatch(1);
319      // Check if get() returns blocks on its close() itself
320      getLatch = new CountDownLatch(1);
321      final TableName tableName = TableName.valueOf(methodName);
322      // Create KV that will give you two blocks
323      // Create a table with block size as 1024
324      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
325        CustomInnerRegionObserver.class.getName());
326      // get the block cache and region
327      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
328      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
329      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
330      HStore store = region.getStores().iterator().next();
331      CacheConfig cacheConf = store.getCacheConfig();
332      cacheConf.setCacheDataOnWrite(true);
333      cacheConf.setEvictOnClose(true);
334      BlockCache cache = cacheConf.getBlockCache().get();
335
336      Put put = new Put(ROW);
337      put.addColumn(FAMILY, QUALIFIER, data);
338      table.put(put);
339      region.flush(true);
340      put = new Put(ROW1);
341      put.addColumn(FAMILY, QUALIFIER, data);
342      table.put(put);
343      region.flush(true);
344      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
345      put = new Put(ROW);
346      put.addColumn(FAMILY, QUALIFIER2, data2);
347      table.put(put);
348      region.flush(true);
349      // flush the data
350      LOG.info("Flushing cache");
351      // Should create one Hfile with 2 blocks
352      CustomInnerRegionObserver.waitForGets.set(true);
353      // Create three sets of gets
354      GetThread[] getThreads = initiateGet(table, false, false);
355      Thread.sleep(200);
356      CustomInnerRegionObserver.getCdl().get().countDown();
357      for (GetThread thread : getThreads) {
358        thread.join();
359      }
360      // Verify whether the gets have returned the blocks that it had
361      CustomInnerRegionObserver.waitForGets.set(true);
362      // giving some time for the block to be decremented
363      checkForBlockEviction(cache, true, false);
364      getLatch.countDown();
365      LOG.info("Gets should have returned the bloks");
366    } finally {
367      if (table != null) {
368        table.close();
369      }
370    }
371  }
372
373  @Test
374  // TODO : check how block index works here
375  public void testGetsWithMultiColumnsAndExplicitTracker()
376    throws IOException, InterruptedException {
377    Table table = null;
378    try {
379      latch = new CountDownLatch(1);
380      // Check if get() returns blocks on its close() itself
381      getLatch = new CountDownLatch(1);
382      final TableName tableName = TableName.valueOf(methodName);
383      // Create KV that will give you two blocks
384      // Create a table with block size as 1024
385      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
386        CustomInnerRegionObserver.class.getName());
387      // get the block cache and region
388      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
389      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
390      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
391      BlockCache cache = setCacheProperties(region);
392      Put put = new Put(ROW);
393      put.addColumn(FAMILY, QUALIFIER, data);
394      table.put(put);
395      region.flush(true);
396      put = new Put(ROW1);
397      put.addColumn(FAMILY, QUALIFIER, data);
398      table.put(put);
399      region.flush(true);
400      for (int i = 1; i < 10; i++) {
401        put = new Put(ROW);
402        put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
403        table.put(put);
404        if (i % 2 == 0) {
405          region.flush(true);
406        }
407      }
408      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
409      put = new Put(ROW);
410      put.addColumn(FAMILY, QUALIFIER2, data2);
411      table.put(put);
412      region.flush(true);
413      // flush the data
414      LOG.info("Flushing cache");
415      // Should create one Hfile with 2 blocks
416      CustomInnerRegionObserver.waitForGets.set(true);
417      // Create three sets of gets
418      GetThread[] getThreads = initiateGet(table, true, false);
419      Thread.sleep(200);
420      Iterator<CachedBlock> iterator = cache.iterator();
421      boolean usedBlocksFound = false;
422      int refCount = 0;
423      int noOfBlocksWithRef = 0;
424      while (iterator.hasNext()) {
425        CachedBlock next = iterator.next();
426        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
427        if (cache instanceof BucketCache) {
428          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
429        } else if (cache instanceof CombinedBlockCache) {
430          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
431        } else {
432          continue;
433        }
434        if (refCount != 0) {
435          // Blocks will be with count 3
436          LOG.info("The refCount is " + refCount);
437          assertEquals(NO_OF_THREADS, refCount);
438          usedBlocksFound = true;
439          noOfBlocksWithRef++;
440        }
441      }
442      assertTrue(usedBlocksFound);
443      // the number of blocks referred
444      assertEquals(10, noOfBlocksWithRef);
445      CustomInnerRegionObserver.getCdl().get().countDown();
446      for (GetThread thread : getThreads) {
447        thread.join();
448      }
449      // Verify whether the gets have returned the blocks that it had
450      CustomInnerRegionObserver.waitForGets.set(true);
451      // giving some time for the block to be decremented
452      checkForBlockEviction(cache, true, false);
453      getLatch.countDown();
454      LOG.info("Gets should have returned the bloks");
455    } finally {
456      if (table != null) {
457        table.close();
458      }
459    }
460  }
461
462  @Test
463  public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
464    Table table = null;
465    try {
466      latch = new CountDownLatch(1);
467      // Check if get() returns blocks on its close() itself
468      getLatch = new CountDownLatch(1);
469      final TableName tableName = TableName.valueOf(methodName);
470      // Create KV that will give you two blocks
471      // Create a table with block size as 1024
472      byte[][] fams = new byte[10][];
473      fams[0] = FAMILY;
474      for (int i = 1; i < 10; i++) {
475        fams[i] = (Bytes.toBytes("testFamily" + i));
476      }
477      table =
478        TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName());
479      // get the block cache and region
480      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
481      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
482      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
483      BlockCache cache = setCacheProperties(region);
484
485      Put put = new Put(ROW);
486      put.addColumn(FAMILY, QUALIFIER, data);
487      table.put(put);
488      region.flush(true);
489      put = new Put(ROW1);
490      put.addColumn(FAMILY, QUALIFIER, data);
491      table.put(put);
492      region.flush(true);
493      for (int i = 1; i < 10; i++) {
494        put = new Put(ROW);
495        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
496        table.put(put);
497        if (i % 2 == 0) {
498          region.flush(true);
499        }
500      }
501      region.flush(true);
502      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
503      put = new Put(ROW);
504      put.addColumn(FAMILY, QUALIFIER2, data2);
505      table.put(put);
506      region.flush(true);
507      // flush the data
508      LOG.info("Flushing cache");
509      // Should create one Hfile with 2 blocks
510      CustomInnerRegionObserver.waitForGets.set(true);
511      // Create three sets of gets
512      GetThread[] getThreads = initiateGet(table, true, true);
513      Thread.sleep(200);
514      Iterator<CachedBlock> iterator = cache.iterator();
515      boolean usedBlocksFound = false;
516      int refCount = 0;
517      int noOfBlocksWithRef = 0;
518      while (iterator.hasNext()) {
519        CachedBlock next = iterator.next();
520        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
521        if (cache instanceof BucketCache) {
522          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
523        } else if (cache instanceof CombinedBlockCache) {
524          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
525        } else {
526          continue;
527        }
528        if (refCount != 0) {
529          // Blocks will be with count 3
530          LOG.info("The refCount is " + refCount);
531          assertEquals(NO_OF_THREADS, refCount);
532          usedBlocksFound = true;
533          noOfBlocksWithRef++;
534        }
535      }
536      assertTrue(usedBlocksFound);
537      // the number of blocks referred
538      assertEquals(3, noOfBlocksWithRef);
539      CustomInnerRegionObserver.getCdl().get().countDown();
540      for (GetThread thread : getThreads) {
541        thread.join();
542      }
543      // Verify whether the gets have returned the blocks that it had
544      CustomInnerRegionObserver.waitForGets.set(true);
545      // giving some time for the block to be decremented
546      checkForBlockEviction(cache, true, false);
547      getLatch.countDown();
548      LOG.info("Gets should have returned the bloks");
549    } finally {
550      if (table != null) {
551        table.close();
552      }
553    }
554  }
555
556  @Test
557  public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
558    Table table = null;
559    try {
560      final TableName tableName = TableName.valueOf(methodName);
561      TableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName);
562      // This test expects rpc refcount of cached data blocks to be 0 after split. After split,
563      // two daughter regions are opened and a compaction is scheduled to get rid of reference
564      // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1.
565      // It is flakey since compaction can kick in anytime. To solve this issue, table is created
566      // with compaction disabled.
567      table = TEST_UTIL.createTable(
568        TableDescriptorBuilder.newBuilder(desc).setCompactionEnabled(false).build(), FAMILIES_1,
569        null, BloomType.ROW, 1024, null);
570      // get the block cache and region
571      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
572      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
573      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
574      HStore store = region.getStores().iterator().next();
575      CacheConfig cacheConf = store.getCacheConfig();
576      cacheConf.setEvictOnClose(true);
577      BlockCache cache = cacheConf.getBlockCache().get();
578
579      Put put = new Put(ROW);
580      put.addColumn(FAMILY, QUALIFIER, data);
581      table.put(put);
582      region.flush(true);
583      put = new Put(ROW1);
584      put.addColumn(FAMILY, QUALIFIER, data);
585      table.put(put);
586      region.flush(true);
587      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
588      put = new Put(ROW2);
589      put.addColumn(FAMILY, QUALIFIER2, data2);
590      table.put(put);
591      put = new Put(ROW3);
592      put.addColumn(FAMILY, QUALIFIER2, data2);
593      table.put(put);
594      region.flush(true);
595      ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
596      int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
597      LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(),
598        regionCount);
599      TEST_UTIL.getAdmin().split(tableName, ROW1);
600      // Wait for splits
601      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
602      region.compact(true);
603      List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions();
604      for (HRegion r : regions) {
605        LOG.info("" + r.getCompactionState());
606        TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE));
607      }
608      LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache);
609      Iterator<CachedBlock> iterator = cache.iterator();
610      // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
611      // should be closed inorder to return those blocks
612      iterateBlockCache(cache, iterator);
613    } finally {
614      if (table != null) {
615        table.close();
616      }
617    }
618  }
619
620  @Test
621  public void testMultiGets() throws IOException, InterruptedException {
622    Table table = null;
623    try {
624      latch = new CountDownLatch(2);
625      // Check if get() returns blocks on its close() itself
626      getLatch = new CountDownLatch(1);
627      final TableName tableName = TableName.valueOf(methodName);
628      // Create KV that will give you two blocks
629      // Create a table with block size as 1024
630      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
631        CustomInnerRegionObserver.class.getName());
632      // get the block cache and region
633      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
634      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
635      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
636      HStore store = region.getStores().iterator().next();
637      CacheConfig cacheConf = store.getCacheConfig();
638      cacheConf.setCacheDataOnWrite(true);
639      cacheConf.setEvictOnClose(true);
640      BlockCache cache = cacheConf.getBlockCache().get();
641
642      Put put = new Put(ROW);
643      put.addColumn(FAMILY, QUALIFIER, data);
644      table.put(put);
645      region.flush(true);
646      put = new Put(ROW1);
647      put.addColumn(FAMILY, QUALIFIER, data);
648      table.put(put);
649      region.flush(true);
650      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
651      put = new Put(ROW);
652      put.addColumn(FAMILY, QUALIFIER2, data2);
653      table.put(put);
654      region.flush(true);
655      // flush the data
656      LOG.info("Flushing cache");
657      // Should create one Hfile with 2 blocks
658      CustomInnerRegionObserver.waitForGets.set(true);
659      // Create three sets of gets
660      MultiGetThread[] getThreads = initiateMultiGet(table);
661      Thread.sleep(200);
662      int refCount;
663      Iterator<CachedBlock> iterator = cache.iterator();
664      boolean foundNonZeroBlock = false;
665      while (iterator.hasNext()) {
666        CachedBlock next = iterator.next();
667        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
668        if (cache instanceof BucketCache) {
669          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
670        } else if (cache instanceof CombinedBlockCache) {
671          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
672        } else {
673          continue;
674        }
675        if (refCount != 0) {
676          assertEquals(NO_OF_THREADS, refCount);
677          foundNonZeroBlock = true;
678        }
679      }
680      assertTrue(foundNonZeroBlock, "Should have found nonzero ref count block");
681      CustomInnerRegionObserver.getCdl().get().countDown();
682      CustomInnerRegionObserver.getCdl().get().countDown();
683      for (MultiGetThread thread : getThreads) {
684        thread.join();
685      }
686      // Verify whether the gets have returned the blocks that it had
687      CustomInnerRegionObserver.waitForGets.set(true);
688      // giving some time for the block to be decremented
689      iterateBlockCache(cache, iterator);
690      getLatch.countDown();
691      LOG.info("Gets should have returned the bloks");
692    } finally {
693      if (table != null) {
694        table.close();
695      }
696    }
697  }
698
699  @Test
700  public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
701    Table table = null;
702    try {
703      latch = new CountDownLatch(1);
704      // Check if get() returns blocks on its close() itself
705      final TableName tableName = TableName.valueOf(methodName);
706      // Create KV that will give you two blocks
707      // Create a table with block size as 1024
708      byte[][] fams = new byte[10][];
709      fams[0] = FAMILY;
710      for (int i = 1; i < 10; i++) {
711        fams[i] = (Bytes.toBytes("testFamily" + i));
712      }
713      table =
714        TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName());
715      // get the block cache and region
716      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
717      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
718      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
719      BlockCache cache = setCacheProperties(region);
720
721      Put put = new Put(ROW);
722      put.addColumn(FAMILY, QUALIFIER, data);
723      table.put(put);
724      region.flush(true);
725      put = new Put(ROW1);
726      put.addColumn(FAMILY, QUALIFIER, data);
727      table.put(put);
728      region.flush(true);
729      for (int i = 1; i < 10; i++) {
730        put = new Put(ROW);
731        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
732        table.put(put);
733        if (i % 2 == 0) {
734          region.flush(true);
735        }
736      }
737      region.flush(true);
738      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
739      put = new Put(ROW);
740      put.addColumn(FAMILY, QUALIFIER2, data2);
741      table.put(put);
742      region.flush(true);
743      // flush the data
744      LOG.info("Flushing cache");
745      // Should create one Hfile with 2 blocks
746      // Create three sets of gets
747      ScanThread[] scanThreads = initiateScan(table, true);
748      Thread.sleep(200);
749      Iterator<CachedBlock> iterator = cache.iterator();
750      boolean usedBlocksFound = false;
751      int refCount = 0;
752      int noOfBlocksWithRef = 0;
753      while (iterator.hasNext()) {
754        CachedBlock next = iterator.next();
755        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
756        if (cache instanceof BucketCache) {
757          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
758        } else if (cache instanceof CombinedBlockCache) {
759          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
760        } else {
761          continue;
762        }
763        if (refCount != 0) {
764          // Blocks will be with count 3
765          LOG.info("The refCount is " + refCount);
766          assertEquals(NO_OF_THREADS, refCount);
767          usedBlocksFound = true;
768          noOfBlocksWithRef++;
769        }
770      }
771      assertTrue(usedBlocksFound);
772      // the number of blocks referred
773      assertEquals(12, noOfBlocksWithRef);
774      CustomInnerRegionObserver.getCdl().get().countDown();
775      for (ScanThread thread : scanThreads) {
776        thread.join();
777      }
778      // giving some time for the block to be decremented
779      checkForBlockEviction(cache, true, false);
780    } finally {
781      if (table != null) {
782        table.close();
783      }
784    }
785  }
786
787  private BlockCache setCacheProperties(HRegion region) {
788    Iterator<HStore> strItr = region.getStores().iterator();
789    BlockCache cache = null;
790    while (strItr.hasNext()) {
791      HStore store = strItr.next();
792      CacheConfig cacheConf = store.getCacheConfig();
793      cacheConf.setCacheDataOnWrite(true);
794      cacheConf.setEvictOnClose(true);
795      // Use the last one
796      cache = cacheConf.getBlockCache().get();
797    }
798    return cache;
799  }
800
801  @Test
802  public void testParallelGetsAndScanWithWrappedRegionScanner()
803    throws IOException, InterruptedException {
804    Table table = null;
805    try {
806      latch = new CountDownLatch(2);
807      // Check if get() returns blocks on its close() itself
808      getLatch = new CountDownLatch(1);
809      final TableName tableName = TableName.valueOf(methodName);
810      // Create KV that will give you two blocks
811      // Create a table with block size as 1024
812      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
813        CustomInnerRegionObserverWrapper.class.getName());
814      // get the block cache and region
815      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
816      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
817      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
818      HStore store = region.getStores().iterator().next();
819      CacheConfig cacheConf = store.getCacheConfig();
820      cacheConf.setCacheDataOnWrite(true);
821      cacheConf.setEvictOnClose(true);
822      BlockCache cache = cacheConf.getBlockCache().get();
823
824      // insert data. 2 Rows are added
825      insertData(table);
826      // flush the data
827      LOG.info("Flushing cache");
828      // Should create one Hfile with 2 blocks
829      region.flush(true);
830      // CustomInnerRegionObserver.sleepTime.set(5000);
831      // Create three sets of scan
832      CustomInnerRegionObserver.waitForGets.set(true);
833      ScanThread[] scanThreads = initiateScan(table, false);
834      // Create three sets of gets
835      GetThread[] getThreads = initiateGet(table, false, false);
836      // The block would have been decremented for the scan case as it was
837      // wrapped
838      // before even the postNext hook gets executed.
839      // giving some time for the block to be decremented
840      Thread.sleep(100);
841      CustomInnerRegionObserver.waitForGets.set(false);
842      checkForBlockEviction(cache, false, false);
843      // countdown the latch
844      CustomInnerRegionObserver.getCdl().get().countDown();
845      for (GetThread thread : getThreads) {
846        thread.join();
847      }
848      getLatch.countDown();
849      for (ScanThread thread : scanThreads) {
850        thread.join();
851      }
852    } finally {
853      if (table != null) {
854        table.close();
855      }
856    }
857  }
858
859  @Test
860  public void testScanWithCompaction() throws IOException, InterruptedException {
861    testScanWithCompactionInternals(methodName, false);
862  }
863
864  @Test
865  public void testReverseScanWithCompaction() throws IOException, InterruptedException {
866    testScanWithCompactionInternals(methodName, true);
867  }
868
869  private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
870    throws IOException, InterruptedException {
871    Table table = null;
872    try {
873      latch = new CountDownLatch(1);
874      compactionLatch = new CountDownLatch(1);
875      TableName tableName = TableName.valueOf(tableNameStr);
876      // Create a table with block size as 1024
877      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
878        CustomInnerRegionObserverWrapper.class.getName());
879      // get the block cache and region
880      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
881      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
882      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
883      HStore store = region.getStores().iterator().next();
884      CacheConfig cacheConf = store.getCacheConfig();
885      cacheConf.setCacheDataOnWrite(true);
886      cacheConf.setEvictOnClose(true);
887      BlockCache cache = cacheConf.getBlockCache().get();
888
889      // insert data. 2 Rows are added
890      Put put = new Put(ROW);
891      put.addColumn(FAMILY, QUALIFIER, data);
892      table.put(put);
893      put = new Put(ROW1);
894      put.addColumn(FAMILY, QUALIFIER, data);
895      table.put(put);
896      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
897      // Should create one Hfile with 2 blocks
898      region.flush(true);
899      // read the data and expect same blocks, one new hit, no misses
900      int refCount = 0;
901      // Check how this miss is happening
902      // insert a second column, read the row, no new blocks, 3 new hits
903      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
904      byte[] data2 = Bytes.add(data, data);
905      put = new Put(ROW);
906      put.addColumn(FAMILY, QUALIFIER2, data2);
907      table.put(put);
908      // flush, one new block
909      LOG.info("Flushing cache");
910      region.flush(true);
911      Iterator<CachedBlock> iterator = cache.iterator();
912      iterateBlockCache(cache, iterator);
913      // Create three sets of scan
914      ScanThread[] scanThreads = initiateScan(table, reversed);
915      Thread.sleep(100);
916      iterator = cache.iterator();
917      boolean usedBlocksFound = false;
918      while (iterator.hasNext()) {
919        CachedBlock next = iterator.next();
920        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
921        if (cache instanceof BucketCache) {
922          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
923        } else if (cache instanceof CombinedBlockCache) {
924          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
925        } else {
926          continue;
927        }
928        if (refCount != 0) {
929          // Blocks will be with count 3
930          assertEquals(NO_OF_THREADS, refCount);
931          usedBlocksFound = true;
932        }
933      }
934      assertTrue(usedBlocksFound, "Blocks with non zero ref count should be found ");
935      usedBlocksFound = false;
936      LOG.info("Compacting");
937      assertEquals(2, store.getStorefilesCount());
938      store.triggerMajorCompaction();
939      region.compact(true);
940      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
941      assertEquals(1, store.getStorefilesCount());
942      // Even after compaction is done we will have some blocks that cannot
943      // be evicted this is because the scan is still referencing them
944      iterator = cache.iterator();
945      while (iterator.hasNext()) {
946        CachedBlock next = iterator.next();
947        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
948        if (cache instanceof BucketCache) {
949          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
950        } else if (cache instanceof CombinedBlockCache) {
951          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
952        } else {
953          continue;
954        }
955        if (refCount != 0) {
956          // Blocks will be with count 3 as they are not yet cleared
957          assertEquals(NO_OF_THREADS, refCount);
958          usedBlocksFound = true;
959        }
960      }
961      assertTrue(usedBlocksFound, "Blocks with non zero ref count should be found ");
962      // Should not throw exception
963      compactionLatch.countDown();
964      latch.countDown();
965      for (ScanThread thread : scanThreads) {
966        thread.join();
967      }
968      // by this time all blocks should have been evicted
969      iterator = cache.iterator();
970      iterateBlockCache(cache, iterator);
971      Result r = table.get(new Get(ROW));
972      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
973      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
974      // The gets would be working on new blocks
975      iterator = cache.iterator();
976      iterateBlockCache(cache, iterator);
977    } finally {
978      if (table != null) {
979        table.close();
980      }
981    }
982  }
983
984  @Test
985  public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
986    throws IOException, InterruptedException {
987    // do flush and scan in parallel
988    Table table = null;
989    try {
990      latch = new CountDownLatch(1);
991      compactionLatch = new CountDownLatch(1);
992      final TableName tableName = TableName.valueOf(methodName);
993      // Create a table with block size as 1024
994      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
995        CustomInnerRegionObserverWrapper.class.getName());
996      // get the block cache and region
997      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
998      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
999      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1000      HStore store = region.getStores().iterator().next();
1001      CacheConfig cacheConf = store.getCacheConfig();
1002      cacheConf.setCacheDataOnWrite(true);
1003      cacheConf.setEvictOnClose(true);
1004      BlockCache cache = cacheConf.getBlockCache().get();
1005
1006      // insert data. 2 Rows are added
1007      Put put = new Put(ROW);
1008      put.addColumn(FAMILY, QUALIFIER, data);
1009      table.put(put);
1010      put = new Put(ROW1);
1011      put.addColumn(FAMILY, QUALIFIER, data);
1012      table.put(put);
1013      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1014      // Should create one Hfile with 2 blocks
1015      region.flush(true);
1016      // read the data and expect same blocks, one new hit, no misses
1017      int refCount = 0;
1018      // Check how this miss is happening
1019      // insert a second column, read the row, no new blocks, 3 new hits
1020      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1021      byte[] data2 = Bytes.add(data, data);
1022      put = new Put(ROW);
1023      put.addColumn(FAMILY, QUALIFIER2, data2);
1024      table.put(put);
1025      // flush, one new block
1026      LOG.info("Flushing cache");
1027      region.flush(true);
1028      Iterator<CachedBlock> iterator = cache.iterator();
1029      iterateBlockCache(cache, iterator);
1030      // Create three sets of scan
1031      ScanThread[] scanThreads = initiateScan(table, false);
1032      Thread.sleep(100);
1033      iterator = cache.iterator();
1034      boolean usedBlocksFound = false;
1035      while (iterator.hasNext()) {
1036        CachedBlock next = iterator.next();
1037        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1038        if (cache instanceof BucketCache) {
1039          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1040        } else if (cache instanceof CombinedBlockCache) {
1041          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1042        } else {
1043          continue;
1044        }
1045        if (refCount != 0) {
1046          // Blocks will be with count 3
1047          assertEquals(NO_OF_THREADS, refCount);
1048          usedBlocksFound = true;
1049        }
1050      }
1051      // Make a put and do a flush
1052      QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1053      data2 = Bytes.add(data, data);
1054      put = new Put(ROW1);
1055      put.addColumn(FAMILY, QUALIFIER2, data2);
1056      table.put(put);
1057      // flush, one new block
1058      LOG.info("Flushing cache");
1059      region.flush(true);
1060      assertTrue(usedBlocksFound, "Blocks with non zero ref count should be found ");
1061      usedBlocksFound = false;
1062      LOG.info("Compacting");
1063      assertEquals(3, store.getStorefilesCount());
1064      store.triggerMajorCompaction();
1065      region.compact(true);
1066      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1067      assertEquals(1, store.getStorefilesCount());
1068      // Even after compaction is done we will have some blocks that cannot
1069      // be evicted this is because the scan is still referencing them
1070      iterator = cache.iterator();
1071      while (iterator.hasNext()) {
1072        CachedBlock next = iterator.next();
1073        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1074        if (cache instanceof BucketCache) {
1075          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1076        } else if (cache instanceof CombinedBlockCache) {
1077          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1078        } else {
1079          continue;
1080        }
1081        if (refCount != 0) {
1082          // Blocks will be with count 3 as they are not yet cleared
1083          assertEquals(NO_OF_THREADS, refCount);
1084          usedBlocksFound = true;
1085        }
1086      }
1087      assertTrue(usedBlocksFound, "Blocks with non zero ref count should be found ");
1088      // Should not throw exception
1089      compactionLatch.countDown();
1090      latch.countDown();
1091      for (ScanThread thread : scanThreads) {
1092        thread.join();
1093      }
1094      // by this time all blocks should have been evicted
1095      iterator = cache.iterator();
1096      // Since a flush and compaction happened after a scan started
1097      // we need to ensure that all the original blocks of the compacted file
1098      // is also removed.
1099      iterateBlockCache(cache, iterator);
1100      Result r = table.get(new Get(ROW));
1101      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1102      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1103      // The gets would be working on new blocks
1104      iterator = cache.iterator();
1105      iterateBlockCache(cache, iterator);
1106    } finally {
1107      if (table != null) {
1108        table.close();
1109      }
1110    }
1111  }
1112
1113  @Test
1114  public void testScanWithException() throws IOException, InterruptedException {
1115    Table table = null;
1116    try {
1117      latch = new CountDownLatch(1);
1118      exceptionLatch = new CountDownLatch(1);
1119      final TableName tableName = TableName.valueOf(methodName);
1120      // Create KV that will give you two blocks
1121      // Create a table with block size as 1024
1122      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1123        CustomInnerRegionObserverWrapper.class.getName());
1124      // get the block cache and region
1125      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1126      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1127      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1128      HStore store = region.getStores().iterator().next();
1129      CacheConfig cacheConf = store.getCacheConfig();
1130      cacheConf.setCacheDataOnWrite(true);
1131      cacheConf.setEvictOnClose(true);
1132      BlockCache cache = cacheConf.getBlockCache().get();
1133      // insert data. 2 Rows are added
1134      insertData(table);
1135      // flush the data
1136      LOG.info("Flushing cache");
1137      // Should create one Hfile with 2 blocks
1138      region.flush(true);
1139      // CustomInnerRegionObserver.sleepTime.set(5000);
1140      CustomInnerRegionObserver.throwException.set(true);
1141      ScanThread[] scanThreads = initiateScan(table, false);
1142      // The block would have been decremented for the scan case as it was
1143      // wrapped
1144      // before even the postNext hook gets executed.
1145      // giving some time for the block to be decremented
1146      Thread.sleep(100);
1147      Iterator<CachedBlock> iterator = cache.iterator();
1148      boolean usedBlocksFound = false;
1149      int refCount = 0;
1150      while (iterator.hasNext()) {
1151        CachedBlock next = iterator.next();
1152        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1153        if (cache instanceof BucketCache) {
1154          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1155        } else if (cache instanceof CombinedBlockCache) {
1156          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1157        } else {
1158          continue;
1159        }
1160        if (refCount != 0) {
1161          // Blocks will be with count 3
1162          assertEquals(NO_OF_THREADS, refCount);
1163          usedBlocksFound = true;
1164        }
1165      }
1166      assertTrue(usedBlocksFound);
1167      exceptionLatch.countDown();
1168      // countdown the latch
1169      CustomInnerRegionObserver.getCdl().get().countDown();
1170      for (ScanThread thread : scanThreads) {
1171        thread.join();
1172      }
1173      iterator = cache.iterator();
1174      usedBlocksFound = false;
1175      refCount = 0;
1176      while (iterator.hasNext()) {
1177        CachedBlock next = iterator.next();
1178        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1179        if (cache instanceof BucketCache) {
1180          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1181        } else if (cache instanceof CombinedBlockCache) {
1182          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1183        } else {
1184          continue;
1185        }
1186        if (refCount != 0) {
1187          // Blocks will be with count 3
1188          assertEquals(NO_OF_THREADS, refCount);
1189          usedBlocksFound = true;
1190        }
1191      }
1192      assertFalse(usedBlocksFound);
1193      // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1194      assertEquals(0, refCount);
1195    } finally {
1196      if (table != null) {
1197        table.close();
1198      }
1199    }
1200  }
1201
1202  private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1203    int refCount;
1204    while (iterator.hasNext()) {
1205      CachedBlock next = iterator.next();
1206      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1207      if (cache instanceof BucketCache) {
1208        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1209        LOG.info("BucketCache {} {}", cacheKey, refCount);
1210      } else if (cache instanceof CombinedBlockCache) {
1211        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1212        LOG.info("CombinedBlockCache {} {}", cacheKey, refCount);
1213      } else {
1214        continue;
1215      }
1216      assertEquals(0, refCount);
1217    }
1218  }
1219
1220  private void insertData(Table table) throws IOException {
1221    Put put = new Put(ROW);
1222    put.addColumn(FAMILY, QUALIFIER, data);
1223    table.put(put);
1224    put = new Put(ROW1);
1225    put.addColumn(FAMILY, QUALIFIER, data);
1226    table.put(put);
1227    byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1228    put = new Put(ROW);
1229    put.addColumn(FAMILY, QUALIFIER2, data2);
1230    table.put(put);
1231  }
1232
1233  private ScanThread[] initiateScan(Table table, boolean reverse)
1234    throws IOException, InterruptedException {
1235    ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1236    for (int i = 0; i < NO_OF_THREADS; i++) {
1237      scanThreads[i] = new ScanThread(table, reverse);
1238    }
1239    for (ScanThread thread : scanThreads) {
1240      thread.start();
1241    }
1242    return scanThreads;
1243  }
1244
1245  private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1246    throws IOException, InterruptedException {
1247    GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1248    for (int i = 0; i < NO_OF_THREADS; i++) {
1249      getThreads[i] = new GetThread(table, tracker, multipleCFs);
1250    }
1251    for (GetThread thread : getThreads) {
1252      thread.start();
1253    }
1254    return getThreads;
1255  }
1256
1257  private MultiGetThread[] initiateMultiGet(Table table) throws IOException, InterruptedException {
1258    MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1259    for (int i = 0; i < NO_OF_THREADS; i++) {
1260      multiGetThreads[i] = new MultiGetThread(table);
1261    }
1262    for (MultiGetThread thread : multiGetThreads) {
1263      thread.start();
1264    }
1265    return multiGetThreads;
1266  }
1267
1268  private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1269    throws InterruptedException {
1270    int counter = NO_OF_THREADS;
1271    if (CustomInnerRegionObserver.waitForGets.get()) {
1272      // Because only one row is selected, it has only 2 blocks
1273      counter = counter - 1;
1274      while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1275        Thread.sleep(100);
1276      }
1277    } else {
1278      while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1279        Thread.sleep(100);
1280      }
1281    }
1282    Iterator<CachedBlock> iterator = cache.iterator();
1283    int refCount = 0;
1284    while (iterator.hasNext()) {
1285      CachedBlock next = iterator.next();
1286      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1287      if (cache instanceof BucketCache) {
1288        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1289      } else if (cache instanceof CombinedBlockCache) {
1290        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1291      } else {
1292        continue;
1293      }
1294      LOG.info(" the refcount is " + refCount + " block is " + cacheKey);
1295      if (CustomInnerRegionObserver.waitForGets.get()) {
1296        if (expectOnlyZero) {
1297          assertTrue(refCount == 0);
1298        }
1299        if (refCount != 0) {
1300          // Because the scan would have also touched up on these blocks but
1301          // it
1302          // would have touched
1303          // all 3
1304          if (getClosed) {
1305            // If get has closed only the scan's blocks would be available
1306            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1307          } else {
1308            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1309          }
1310        }
1311      } else {
1312        // Because the get would have also touched up on these blocks but it
1313        // would have touched
1314        // upon only 2 additionally
1315        if (expectOnlyZero) {
1316          assertTrue(refCount == 0);
1317        }
1318        if (refCount != 0) {
1319          if (getLatch == null) {
1320            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1321          } else {
1322            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1323          }
1324        }
1325      }
1326    }
1327    CustomInnerRegionObserver.getCdl().get().countDown();
1328  }
1329
1330  private static class MultiGetThread extends Thread {
1331    private final Table table;
1332    private final List<Get> gets = new ArrayList<>();
1333
1334    public MultiGetThread(Table table) {
1335      this.table = table;
1336    }
1337
1338    @Override
1339    public void run() {
1340      gets.add(new Get(ROW));
1341      gets.add(new Get(ROW1));
1342      try {
1343        CustomInnerRegionObserver.getCdl().set(latch);
1344        Result[] r = table.get(gets);
1345        assertTrue(Bytes.equals(r[0].getRow(), ROW));
1346        assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1347      } catch (IOException e) {
1348      }
1349    }
1350  }
1351
1352  private static class GetThread extends Thread {
1353    private final Table table;
1354    private final boolean tracker;
1355    private final boolean multipleCFs;
1356
1357    public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1358      this.table = table;
1359      this.tracker = tracker;
1360      this.multipleCFs = multipleCFs;
1361    }
1362
1363    @Override
1364    public void run() {
1365      try {
1366        initiateGet(table);
1367      } catch (IOException e) {
1368        // do nothing
1369      }
1370    }
1371
1372    private void initiateGet(Table table) throws IOException {
1373      Get get = new Get(ROW);
1374      if (tracker) {
1375        // Change this
1376        if (!multipleCFs) {
1377          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1378          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1379          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1380          // Unknown key
1381          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1382        } else {
1383          get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1384          get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1385          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1386          // Unknown key
1387          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1388        }
1389      }
1390      CustomInnerRegionObserver.getCdl().set(latch);
1391      Result r = table.get(get);
1392      LOG.info("" + r);
1393      if (!tracker) {
1394        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1395        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1396      } else {
1397        if (!multipleCFs) {
1398          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1399          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1400          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1401        } else {
1402          assertTrue(Bytes.equals(
1403            r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1404            data2));
1405          assertTrue(Bytes.equals(
1406            r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1407            data2));
1408          assertTrue(Bytes.equals(
1409            r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1410            data2));
1411        }
1412      }
1413    }
1414  }
1415
1416  private static class ScanThread extends Thread {
1417    private final Table table;
1418    private final boolean reverse;
1419
1420    public ScanThread(Table table, boolean reverse) {
1421      this.table = table;
1422      this.reverse = reverse;
1423    }
1424
1425    @Override
1426    public void run() {
1427      try {
1428        initiateScan(table);
1429      } catch (IOException e) {
1430        // do nothing
1431      }
1432    }
1433
1434    private void initiateScan(Table table) throws IOException {
1435      Scan scan = new Scan();
1436      if (reverse) {
1437        scan.setReversed(true);
1438      }
1439      CustomInnerRegionObserver.getCdl().set(latch);
1440      ResultScanner resScanner = table.getScanner(scan);
1441      int i = (reverse ? ROWS.length - 1 : 0);
1442      boolean resultFound = false;
1443      for (Result result : resScanner) {
1444        resultFound = true;
1445        LOG.info("" + result);
1446        if (!reverse) {
1447          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1448          i++;
1449        } else {
1450          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1451          i--;
1452        }
1453      }
1454      assertTrue(resultFound);
1455    }
1456  }
1457
1458  private void waitForStoreFileCount(HStore store, int count, int timeout)
1459    throws InterruptedException {
1460    long start = EnvironmentEdgeManager.currentTime();
1461    while (
1462      start + timeout > EnvironmentEdgeManager.currentTime() && store.getStorefilesCount() != count
1463    ) {
1464      Thread.sleep(100);
1465    }
1466    LOG.info("start=" + start + ", now=" + EnvironmentEdgeManager.currentTime() + ", cur="
1467      + store.getStorefilesCount());
1468    assertEquals(count, store.getStorefilesCount());
1469  }
1470
1471  private static class CustomScanner implements RegionScanner {
1472
1473    private RegionScanner delegate;
1474
1475    public CustomScanner(RegionScanner delegate) {
1476      this.delegate = delegate;
1477    }
1478
1479    @Override
1480    public boolean next(List<? super ExtendedCell> results) throws IOException {
1481      return delegate.next(results);
1482    }
1483
1484    @Override
1485    public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
1486      throws IOException {
1487      return delegate.next(result, scannerContext);
1488    }
1489
1490    @Override
1491    public boolean nextRaw(List<? super ExtendedCell> result) throws IOException {
1492      return delegate.nextRaw(result);
1493    }
1494
1495    @Override
1496    public boolean nextRaw(List<? super ExtendedCell> result, ScannerContext context)
1497      throws IOException {
1498      boolean nextRaw = delegate.nextRaw(result, context);
1499      if (compactionLatch != null && compactionLatch.getCount() > 0) {
1500        try {
1501          compactionLatch.await();
1502        } catch (InterruptedException ie) {
1503        }
1504      }
1505
1506      if (CustomInnerRegionObserver.throwException.get()) {
1507        if (exceptionLatch.getCount() > 0) {
1508          try {
1509            exceptionLatch.await();
1510          } catch (InterruptedException e) {
1511          }
1512          throw new IOException("throw exception");
1513        }
1514      }
1515      return nextRaw;
1516    }
1517
1518    @Override
1519    public Set<Path> getFilesRead() {
1520      return delegate.getFilesRead();
1521    }
1522
1523    @Override
1524    public void close() throws IOException {
1525      delegate.close();
1526    }
1527
1528    @Override
1529    public RegionInfo getRegionInfo() {
1530      return delegate.getRegionInfo();
1531    }
1532
1533    @Override
1534    public boolean isFilterDone() throws IOException {
1535      return delegate.isFilterDone();
1536    }
1537
1538    @Override
1539    public boolean reseek(byte[] row) throws IOException {
1540      return false;
1541    }
1542
1543    @Override
1544    public long getMaxResultSize() {
1545      return delegate.getMaxResultSize();
1546    }
1547
1548    @Override
1549    public long getMvccReadPoint() {
1550      return delegate.getMvccReadPoint();
1551    }
1552
1553    @Override
1554    public int getBatch() {
1555      return delegate.getBatch();
1556    }
1557  }
1558
1559  public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1560    @Override
1561    public RegionScanner postScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> e,
1562      Scan scan, RegionScanner s) throws IOException {
1563      return new CustomScanner(s);
1564    }
1565  }
1566
1567  public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1568    static final AtomicInteger countOfNext = new AtomicInteger(0);
1569    static final AtomicInteger countOfGets = new AtomicInteger(0);
1570    static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1571    static final AtomicBoolean throwException = new AtomicBoolean(false);
1572    private static final AtomicReference<CountDownLatch> cdl =
1573      new AtomicReference<>(new CountDownLatch(0));
1574
1575    @Override
1576    public Optional<RegionObserver> getRegionObserver() {
1577      return Optional.of(this);
1578    }
1579
1580    @Override
1581    public boolean postScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> e,
1582      InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1583      slowdownCode(e, false);
1584      if (getLatch != null && getLatch.getCount() > 0) {
1585        try {
1586          getLatch.await();
1587        } catch (InterruptedException e1) {
1588        }
1589      }
1590      return hasMore;
1591    }
1592
1593    @Override
1594    public void postGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get,
1595      List<Cell> results) throws IOException {
1596      slowdownCode(e, true);
1597    }
1598
1599    public static AtomicReference<CountDownLatch> getCdl() {
1600      return cdl;
1601    }
1602
1603    private void slowdownCode(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
1604      boolean isGet) {
1605      CountDownLatch latch = getCdl().get();
1606      try {
1607        LOG.info(latch.getCount() + " is the count " + isGet);
1608        if (latch.getCount() > 0) {
1609          if (isGet) {
1610            countOfGets.incrementAndGet();
1611          } else {
1612            countOfNext.incrementAndGet();
1613          }
1614          LOG.info("Waiting for the counterCountDownLatch");
1615          latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1616          if (latch.getCount() > 0) {
1617            throw new RuntimeException("Can't wait more");
1618          }
1619        }
1620      } catch (InterruptedException e1) {
1621        LOG.error(e1.toString(), e1);
1622      }
1623    }
1624  }
1625}