001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicReference;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
042import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
043import org.apache.hadoop.hbase.coprocessor.ObserverContext;
044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
046import org.apache.hadoop.hbase.coprocessor.RegionObserver;
047import org.apache.hadoop.hbase.io.hfile.BlockCache;
048import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
049import org.apache.hadoop.hbase.io.hfile.CacheConfig;
050import org.apache.hadoop.hbase.io.hfile.CachedBlock;
051import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
052import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
053import org.apache.hadoop.hbase.regionserver.BloomType;
054import org.apache.hadoop.hbase.regionserver.HRegion;
055import org.apache.hadoop.hbase.regionserver.HStore;
056import org.apache.hadoop.hbase.regionserver.InternalScanner;
057import org.apache.hadoop.hbase.regionserver.RegionScanner;
058import org.apache.hadoop.hbase.regionserver.ScannerContext;
059import org.apache.hadoop.hbase.testclassification.ClientTests;
060import org.apache.hadoop.hbase.testclassification.LargeTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.junit.After;
063import org.junit.AfterClass;
064import org.junit.Before;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.rules.TestName;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
075
076@Category({ LargeTests.class, ClientTests.class })
077@SuppressWarnings("deprecation")
078public class TestBlockEvictionFromClient {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082      HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class);
083
084  private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
085  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
086  static byte[][] ROWS = new byte[2][];
087  private static int NO_OF_THREADS = 3;
088  private static byte[] ROW = Bytes.toBytes("testRow");
089  private static byte[] ROW1 = Bytes.toBytes("testRow1");
090  private static byte[] ROW2 = Bytes.toBytes("testRow2");
091  private static byte[] ROW3 = Bytes.toBytes("testRow3");
092  private static byte[] FAMILY = Bytes.toBytes("testFamily");
093  private static byte[][] FAMILIES_1 = new byte[1][0];
094  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
095  private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
096  private static byte[] data = new byte[1000];
097  private static byte[] data2 = Bytes.add(data, data);
098  protected static int SLAVES = 1;
099  private static CountDownLatch latch;
100  private static CountDownLatch getLatch;
101  private static CountDownLatch compactionLatch;
102  private static CountDownLatch exceptionLatch;
103
104  @Rule
105  public TestName name = new TestName();
106
107  /**
108   * @throws java.lang.Exception
109   */
110  @BeforeClass
111  public static void setUpBeforeClass() throws Exception {
112    ROWS[0] = ROW;
113    ROWS[1] = ROW1;
114    Configuration conf = TEST_UTIL.getConfiguration();
115    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
116        MultiRowMutationEndpoint.class.getName());
117    conf.setInt("hbase.regionserver.handler.count", 20);
118    conf.setInt("hbase.bucketcache.size", 400);
119    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
120    conf.setFloat("hfile.block.cache.size", 0.2f);
121    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
122    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
123    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
124    FAMILIES_1[0] = FAMILY;
125    TEST_UTIL.startMiniCluster(SLAVES);
126  }
127
128  /**
129   * @throws java.lang.Exception
130   */
131  @AfterClass
132  public static void tearDownAfterClass() throws Exception {
133    TEST_UTIL.shutdownMiniCluster();
134  }
135
136  /**
137   * @throws java.lang.Exception
138   */
139  @Before
140  public void setUp() throws Exception {
141    CustomInnerRegionObserver.waitForGets.set(false);
142    CustomInnerRegionObserver.countOfNext.set(0);
143    CustomInnerRegionObserver.countOfGets.set(0);
144  }
145
146  /**
147   * @throws java.lang.Exception
148   */
149  @After
150  public void tearDown() throws Exception {
151    if (latch != null) {
152      while (latch.getCount() > 0) {
153        latch.countDown();
154      }
155    }
156    if (getLatch != null) {
157      getLatch.countDown();
158    }
159    if (compactionLatch != null) {
160      compactionLatch.countDown();
161    }
162    if (exceptionLatch != null) {
163      exceptionLatch.countDown();
164    }
165    latch = null;
166    getLatch = null;
167    compactionLatch = null;
168    exceptionLatch = null;
169    CustomInnerRegionObserver.throwException.set(false);
170    // Clean up the tables for every test case
171    TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
172    for (TableName tableName : listTableNames) {
173      if (!tableName.isSystemTable()) {
174        TEST_UTIL.getAdmin().disableTable(tableName);
175        TEST_UTIL.getAdmin().deleteTable(tableName);
176      }
177    }
178  }
179
180  @Test
181  public void testBlockEvictionWithParallelScans() throws Exception {
182    Table table = null;
183    try {
184      latch = new CountDownLatch(1);
185      final TableName tableName = TableName.valueOf(name.getMethodName());
186      // Create a table with block size as 1024
187      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
188          CustomInnerRegionObserver.class.getName());
189      // get the block cache and region
190      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
191      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
192      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName)
193          .getRegion(regionName);
194      HStore store = region.getStores().iterator().next();
195      CacheConfig cacheConf = store.getCacheConfig();
196      cacheConf.setCacheDataOnWrite(true);
197      cacheConf.setEvictOnClose(true);
198      BlockCache cache = cacheConf.getBlockCache().get();
199
200      // insert data. 2 Rows are added
201      Put put = new Put(ROW);
202      put.addColumn(FAMILY, QUALIFIER, data);
203      table.put(put);
204      put = new Put(ROW1);
205      put.addColumn(FAMILY, QUALIFIER, data);
206      table.put(put);
207      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
208      // data was in memstore so don't expect any changes
209      // flush the data
210      // Should create one Hfile with 2 blocks
211      region.flush(true);
212      // Load cache
213      // Create three sets of scan
214      ScanThread[] scanThreads = initiateScan(table, false);
215      Thread.sleep(100);
216      checkForBlockEviction(cache, false, false);
217      for (ScanThread thread : scanThreads) {
218        thread.join();
219      }
220      // CustomInnerRegionObserver.sleepTime.set(0);
221      Iterator<CachedBlock> iterator = cache.iterator();
222      iterateBlockCache(cache, iterator);
223      // read the data and expect same blocks, one new hit, no misses
224      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
225      iterator = cache.iterator();
226      iterateBlockCache(cache, iterator);
227      // Check how this miss is happening
228      // insert a second column, read the row, no new blocks, 3 new hits
229      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
230      byte[] data2 = Bytes.add(data, data);
231      put = new Put(ROW);
232      put.addColumn(FAMILY, QUALIFIER2, data2);
233      table.put(put);
234      Result r = table.get(new Get(ROW));
235      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
236      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
237      iterator = cache.iterator();
238      iterateBlockCache(cache, iterator);
239      // flush, one new block
240      System.out.println("Flushing cache");
241      region.flush(true);
242      iterator = cache.iterator();
243      iterateBlockCache(cache, iterator);
244      // compact, net minus two blocks, two hits, no misses
245      System.out.println("Compacting");
246      assertEquals(2, store.getStorefilesCount());
247      store.triggerMajorCompaction();
248      region.compact(true);
249      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
250      assertEquals(1, store.getStorefilesCount());
251      iterator = cache.iterator();
252      iterateBlockCache(cache, iterator);
253      // read the row, this should be a cache miss because we don't cache data
254      // blocks on compaction
255      r = table.get(new Get(ROW));
256      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
257      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
258      iterator = cache.iterator();
259      iterateBlockCache(cache, iterator);
260    } finally {
261      if (table != null) {
262        table.close();
263      }
264    }
265  }
266
267  @Test
268  public void testParallelGetsAndScans() throws IOException, InterruptedException {
269    Table table = null;
270    try {
271      latch = new CountDownLatch(2);
272      // Check if get() returns blocks on its close() itself
273      getLatch = new CountDownLatch(1);
274      final TableName tableName = TableName.valueOf(name.getMethodName());
275      // Create KV that will give you two blocks
276      // Create a table with block size as 1024
277      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
278          CustomInnerRegionObserver.class.getName());
279      // get the block cache and region
280      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
281      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
282      HRegion region =
283          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
284      HStore store = region.getStores().iterator().next();
285      CacheConfig cacheConf = store.getCacheConfig();
286      cacheConf.setCacheDataOnWrite(true);
287      cacheConf.setEvictOnClose(true);
288      BlockCache cache = cacheConf.getBlockCache().get();
289
290      insertData(table);
291      // flush the data
292      System.out.println("Flushing cache");
293      // Should create one Hfile with 2 blocks
294      region.flush(true);
295      // Create three sets of scan
296      CustomInnerRegionObserver.waitForGets.set(true);
297      ScanThread[] scanThreads = initiateScan(table, false);
298      // Create three sets of gets
299      GetThread[] getThreads = initiateGet(table, false, false);
300      checkForBlockEviction(cache, false, false);
301      CustomInnerRegionObserver.waitForGets.set(false);
302      checkForBlockEviction(cache, false, false);
303      for (GetThread thread : getThreads) {
304        thread.join();
305      }
306      // Verify whether the gets have returned the blocks that it had
307      CustomInnerRegionObserver.waitForGets.set(true);
308      // giving some time for the block to be decremented
309      checkForBlockEviction(cache, true, false);
310      getLatch.countDown();
311      for (ScanThread thread : scanThreads) {
312        thread.join();
313      }
314      System.out.println("Scans should have returned the bloks");
315      // Check with either true or false
316      CustomInnerRegionObserver.waitForGets.set(false);
317      // The scan should also have released the blocks by now
318      checkForBlockEviction(cache, true, true);
319    } finally {
320      if (table != null) {
321        table.close();
322      }
323    }
324  }
325
326  @Test
327  public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
328    Table table = null;
329    try {
330      latch = new CountDownLatch(1);
331      // Check if get() returns blocks on its close() itself
332      getLatch = new CountDownLatch(1);
333      final TableName tableName = TableName.valueOf(name.getMethodName());
334      // Create KV that will give you two blocks
335      // Create a table with block size as 1024
336      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
337          CustomInnerRegionObserver.class.getName());
338      // get the block cache and region
339      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
340      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
341      HRegion region =
342          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
343      HStore store = region.getStores().iterator().next();
344      CacheConfig cacheConf = store.getCacheConfig();
345      cacheConf.setCacheDataOnWrite(true);
346      cacheConf.setEvictOnClose(true);
347      BlockCache cache = cacheConf.getBlockCache().get();
348
349      Put put = new Put(ROW);
350      put.addColumn(FAMILY, QUALIFIER, data);
351      table.put(put);
352      region.flush(true);
353      put = new Put(ROW1);
354      put.addColumn(FAMILY, QUALIFIER, data);
355      table.put(put);
356      region.flush(true);
357      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
358      put = new Put(ROW);
359      put.addColumn(FAMILY, QUALIFIER2, data2);
360      table.put(put);
361      region.flush(true);
362      // flush the data
363      System.out.println("Flushing cache");
364      // Should create one Hfile with 2 blocks
365      CustomInnerRegionObserver.waitForGets.set(true);
366      // Create three sets of gets
367      GetThread[] getThreads = initiateGet(table, false, false);
368      Thread.sleep(200);
369      CustomInnerRegionObserver.getCdl().get().countDown();
370      for (GetThread thread : getThreads) {
371        thread.join();
372      }
373      // Verify whether the gets have returned the blocks that it had
374      CustomInnerRegionObserver.waitForGets.set(true);
375      // giving some time for the block to be decremented
376      checkForBlockEviction(cache, true, false);
377      getLatch.countDown();
378      System.out.println("Gets should have returned the bloks");
379    } finally {
380      if (table != null) {
381        table.close();
382      }
383    }
384  }
385
386  @Test
387  // TODO : check how block index works here
388  public void testGetsWithMultiColumnsAndExplicitTracker()
389      throws IOException, InterruptedException {
390    Table table = null;
391    try {
392      latch = new CountDownLatch(1);
393      // Check if get() returns blocks on its close() itself
394      getLatch = new CountDownLatch(1);
395      final TableName tableName = TableName.valueOf(name.getMethodName());
396      // Create KV that will give you two blocks
397      // Create a table with block size as 1024
398      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
399          CustomInnerRegionObserver.class.getName());
400      // get the block cache and region
401      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
402      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
403      HRegion region =
404          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
405      BlockCache cache = setCacheProperties(region);
406      Put put = new Put(ROW);
407      put.addColumn(FAMILY, QUALIFIER, data);
408      table.put(put);
409      region.flush(true);
410      put = new Put(ROW1);
411      put.addColumn(FAMILY, QUALIFIER, data);
412      table.put(put);
413      region.flush(true);
414      for (int i = 1; i < 10; i++) {
415        put = new Put(ROW);
416        put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
417        table.put(put);
418        if (i % 2 == 0) {
419          region.flush(true);
420        }
421      }
422      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
423      put = new Put(ROW);
424      put.addColumn(FAMILY, QUALIFIER2, data2);
425      table.put(put);
426      region.flush(true);
427      // flush the data
428      System.out.println("Flushing cache");
429      // Should create one Hfile with 2 blocks
430      CustomInnerRegionObserver.waitForGets.set(true);
431      // Create three sets of gets
432      GetThread[] getThreads = initiateGet(table, true, false);
433      Thread.sleep(200);
434      Iterator<CachedBlock> iterator = cache.iterator();
435      boolean usedBlocksFound = false;
436      int refCount = 0;
437      int noOfBlocksWithRef = 0;
438      while (iterator.hasNext()) {
439        CachedBlock next = iterator.next();
440        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
441        if (cache instanceof BucketCache) {
442          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
443        } else if (cache instanceof CombinedBlockCache) {
444          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
445        } else {
446          continue;
447        }
448        if (refCount != 0) {
449          // Blocks will be with count 3
450          System.out.println("The refCount is " + refCount);
451          assertEquals(NO_OF_THREADS, refCount);
452          usedBlocksFound = true;
453          noOfBlocksWithRef++;
454        }
455      }
456      assertTrue(usedBlocksFound);
457      // the number of blocks referred
458      assertEquals(10, noOfBlocksWithRef);
459      CustomInnerRegionObserver.getCdl().get().countDown();
460      for (GetThread thread : getThreads) {
461        thread.join();
462      }
463      // Verify whether the gets have returned the blocks that it had
464      CustomInnerRegionObserver.waitForGets.set(true);
465      // giving some time for the block to be decremented
466      checkForBlockEviction(cache, true, false);
467      getLatch.countDown();
468      System.out.println("Gets should have returned the bloks");
469    } finally {
470      if (table != null) {
471        table.close();
472      }
473    }
474  }
475
476  @Test
477  public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
478    Table table = null;
479    try {
480      latch = new CountDownLatch(1);
481      // Check if get() returns blocks on its close() itself
482      getLatch = new CountDownLatch(1);
483      final TableName tableName = TableName.valueOf(name.getMethodName());
484      // Create KV that will give you two blocks
485      // Create a table with block size as 1024
486      byte[][] fams = new byte[10][];
487      fams[0] = FAMILY;
488      for (int i = 1; i < 10; i++) {
489        fams[i] = (Bytes.toBytes("testFamily" + i));
490      }
491      table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
492          CustomInnerRegionObserver.class.getName());
493      // get the block cache and region
494      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
495      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
496      HRegion region =
497          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
498      BlockCache cache = setCacheProperties(region);
499
500      Put put = new Put(ROW);
501      put.addColumn(FAMILY, QUALIFIER, data);
502      table.put(put);
503      region.flush(true);
504      put = new Put(ROW1);
505      put.addColumn(FAMILY, QUALIFIER, data);
506      table.put(put);
507      region.flush(true);
508      for (int i = 1; i < 10; i++) {
509        put = new Put(ROW);
510        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
511        table.put(put);
512        if (i % 2 == 0) {
513          region.flush(true);
514        }
515      }
516      region.flush(true);
517      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
518      put = new Put(ROW);
519      put.addColumn(FAMILY, QUALIFIER2, data2);
520      table.put(put);
521      region.flush(true);
522      // flush the data
523      System.out.println("Flushing cache");
524      // Should create one Hfile with 2 blocks
525      CustomInnerRegionObserver.waitForGets.set(true);
526      // Create three sets of gets
527      GetThread[] getThreads = initiateGet(table, true, true);
528      Thread.sleep(200);
529      Iterator<CachedBlock> iterator = cache.iterator();
530      boolean usedBlocksFound = false;
531      int refCount = 0;
532      int noOfBlocksWithRef = 0;
533      while (iterator.hasNext()) {
534        CachedBlock next = iterator.next();
535        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
536        if (cache instanceof BucketCache) {
537          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
538        } else if (cache instanceof CombinedBlockCache) {
539          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
540        } else {
541          continue;
542        }
543        if (refCount != 0) {
544          // Blocks will be with count 3
545          System.out.println("The refCount is " + refCount);
546          assertEquals(NO_OF_THREADS, refCount);
547          usedBlocksFound = true;
548          noOfBlocksWithRef++;
549        }
550      }
551      assertTrue(usedBlocksFound);
552      // the number of blocks referred
553      assertEquals(3, noOfBlocksWithRef);
554      CustomInnerRegionObserver.getCdl().get().countDown();
555      for (GetThread thread : getThreads) {
556        thread.join();
557      }
558      // Verify whether the gets have returned the blocks that it had
559      CustomInnerRegionObserver.waitForGets.set(true);
560      // giving some time for the block to be decremented
561      checkForBlockEviction(cache, true, false);
562      getLatch.countDown();
563      System.out.println("Gets should have returned the bloks");
564    } finally {
565      if (table != null) {
566        table.close();
567      }
568    }
569  }
570
571  @Test
572  public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
573    Table table = null;
574    try {
575      final TableName tableName = TableName.valueOf(name.getMethodName());
576      TableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName);
577      // This test expects rpc refcount of cached data blocks to be 0 after split. After split,
578      // two daughter regions are opened and a compaction is scheduled to get rid of reference
579      // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1.
580      // It is flakey since compaction can kick in anytime. To solve this issue, table is created
581      // with compaction disabled.
582      table = TEST_UTIL.createTable(
583        TableDescriptorBuilder.newBuilder(desc).setCompactionEnabled(false).build(), FAMILIES_1,
584        null, BloomType.ROW, 1024, null);
585      // get the block cache and region
586      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
587      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
588      HRegion region =
589          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
590      HStore store = region.getStores().iterator().next();
591      CacheConfig cacheConf = store.getCacheConfig();
592      cacheConf.setEvictOnClose(true);
593      BlockCache cache = cacheConf.getBlockCache().get();
594
595      Put put = new Put(ROW);
596      put.addColumn(FAMILY, QUALIFIER, data);
597      table.put(put);
598      region.flush(true);
599      put = new Put(ROW1);
600      put.addColumn(FAMILY, QUALIFIER, data);
601      table.put(put);
602      region.flush(true);
603      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
604      put = new Put(ROW2);
605      put.addColumn(FAMILY, QUALIFIER2, data2);
606      table.put(put);
607      put = new Put(ROW3);
608      put.addColumn(FAMILY, QUALIFIER2, data2);
609      table.put(put);
610      region.flush(true);
611      ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
612      int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
613      LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(),
614        regionCount);
615      TEST_UTIL.getAdmin().split(tableName, ROW1);
616      // Wait for splits
617      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
618      region.compact(true);
619      List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions();
620      for (HRegion r: regions) {
621        LOG.info("" + r.getCompactionState());
622        TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE));
623      }
624      LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache);
625      Iterator<CachedBlock> iterator = cache.iterator();
626      // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
627      // should be closed inorder to return those blocks
628      iterateBlockCache(cache, iterator);
629    } finally {
630      if (table != null) {
631        table.close();
632      }
633    }
634  }
635
636  @Test
637  public void testMultiGets() throws IOException, InterruptedException {
638    Table table = null;
639    try {
640      latch = new CountDownLatch(2);
641      // Check if get() returns blocks on its close() itself
642      getLatch = new CountDownLatch(1);
643      final TableName tableName = TableName.valueOf(name.getMethodName());
644      // Create KV that will give you two blocks
645      // Create a table with block size as 1024
646      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
647          CustomInnerRegionObserver.class.getName());
648      // get the block cache and region
649      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
650      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
651      HRegion region =
652          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
653      HStore store = region.getStores().iterator().next();
654      CacheConfig cacheConf = store.getCacheConfig();
655      cacheConf.setCacheDataOnWrite(true);
656      cacheConf.setEvictOnClose(true);
657      BlockCache cache = cacheConf.getBlockCache().get();
658
659      Put put = new Put(ROW);
660      put.addColumn(FAMILY, QUALIFIER, data);
661      table.put(put);
662      region.flush(true);
663      put = new Put(ROW1);
664      put.addColumn(FAMILY, QUALIFIER, data);
665      table.put(put);
666      region.flush(true);
667      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
668      put = new Put(ROW);
669      put.addColumn(FAMILY, QUALIFIER2, data2);
670      table.put(put);
671      region.flush(true);
672      // flush the data
673      System.out.println("Flushing cache");
674      // Should create one Hfile with 2 blocks
675      CustomInnerRegionObserver.waitForGets.set(true);
676      // Create three sets of gets
677      MultiGetThread[] getThreads = initiateMultiGet(table);
678      Thread.sleep(200);
679      int refCount;
680      Iterator<CachedBlock> iterator = cache.iterator();
681      boolean foundNonZeroBlock = false;
682      while (iterator.hasNext()) {
683        CachedBlock next = iterator.next();
684        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
685        if (cache instanceof BucketCache) {
686          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
687        } else if (cache instanceof CombinedBlockCache) {
688          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
689        } else {
690          continue;
691        }
692        if (refCount != 0) {
693          assertEquals(NO_OF_THREADS, refCount);
694          foundNonZeroBlock = true;
695        }
696      }
697      assertTrue("Should have found nonzero ref count block",foundNonZeroBlock);
698      CustomInnerRegionObserver.getCdl().get().countDown();
699      CustomInnerRegionObserver.getCdl().get().countDown();
700      for (MultiGetThread thread : getThreads) {
701        thread.join();
702      }
703      // Verify whether the gets have returned the blocks that it had
704      CustomInnerRegionObserver.waitForGets.set(true);
705      // giving some time for the block to be decremented
706      iterateBlockCache(cache, iterator);
707      getLatch.countDown();
708      System.out.println("Gets should have returned the bloks");
709    } finally {
710      if (table != null) {
711        table.close();
712      }
713    }
714  }
715  @Test
716  public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
717    Table table = null;
718    try {
719      latch = new CountDownLatch(1);
720      // Check if get() returns blocks on its close() itself
721      final TableName tableName = TableName.valueOf(name.getMethodName());
722      // Create KV that will give you two blocks
723      // Create a table with block size as 1024
724      byte[][] fams = new byte[10][];
725      fams[0] = FAMILY;
726      for (int i = 1; i < 10; i++) {
727        fams[i] = (Bytes.toBytes("testFamily" + i));
728      }
729      table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
730          CustomInnerRegionObserver.class.getName());
731      // get the block cache and region
732      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
733      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
734      HRegion region =
735          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
736      BlockCache cache = setCacheProperties(region);
737
738      Put put = new Put(ROW);
739      put.addColumn(FAMILY, QUALIFIER, data);
740      table.put(put);
741      region.flush(true);
742      put = new Put(ROW1);
743      put.addColumn(FAMILY, QUALIFIER, data);
744      table.put(put);
745      region.flush(true);
746      for (int i = 1; i < 10; i++) {
747        put = new Put(ROW);
748        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
749        table.put(put);
750        if (i % 2 == 0) {
751          region.flush(true);
752        }
753      }
754      region.flush(true);
755      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
756      put = new Put(ROW);
757      put.addColumn(FAMILY, QUALIFIER2, data2);
758      table.put(put);
759      region.flush(true);
760      // flush the data
761      System.out.println("Flushing cache");
762      // Should create one Hfile with 2 blocks
763      // Create three sets of gets
764      ScanThread[] scanThreads = initiateScan(table, true);
765      Thread.sleep(200);
766      Iterator<CachedBlock> iterator = cache.iterator();
767      boolean usedBlocksFound = false;
768      int refCount = 0;
769      int noOfBlocksWithRef = 0;
770      while (iterator.hasNext()) {
771        CachedBlock next = iterator.next();
772        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
773        if (cache instanceof BucketCache) {
774          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
775        } else if (cache instanceof CombinedBlockCache) {
776          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
777        } else {
778          continue;
779        }
780        if (refCount != 0) {
781          // Blocks will be with count 3
782          System.out.println("The refCount is " + refCount);
783          assertEquals(NO_OF_THREADS, refCount);
784          usedBlocksFound = true;
785          noOfBlocksWithRef++;
786        }
787      }
788      assertTrue(usedBlocksFound);
789      // the number of blocks referred
790      assertEquals(12, noOfBlocksWithRef);
791      CustomInnerRegionObserver.getCdl().get().countDown();
792      for (ScanThread thread : scanThreads) {
793        thread.join();
794      }
795      // giving some time for the block to be decremented
796      checkForBlockEviction(cache, true, false);
797    } finally {
798      if (table != null) {
799        table.close();
800      }
801    }
802  }
803
804  private BlockCache setCacheProperties(HRegion region) {
805    Iterator<HStore> strItr = region.getStores().iterator();
806    BlockCache cache = null;
807    while (strItr.hasNext()) {
808      HStore store = strItr.next();
809      CacheConfig cacheConf = store.getCacheConfig();
810      cacheConf.setCacheDataOnWrite(true);
811      cacheConf.setEvictOnClose(true);
812      // Use the last one
813      cache = cacheConf.getBlockCache().get();
814    }
815    return cache;
816  }
817
818  @Test
819  public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException,
820      InterruptedException {
821    Table table = null;
822    try {
823      latch = new CountDownLatch(2);
824      // Check if get() returns blocks on its close() itself
825      getLatch = new CountDownLatch(1);
826      final TableName tableName = TableName.valueOf(name.getMethodName());
827      // Create KV that will give you two blocks
828      // Create a table with block size as 1024
829      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
830          CustomInnerRegionObserverWrapper.class.getName());
831      // get the block cache and region
832      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
833      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
834      HRegion region =
835          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
836      HStore store = region.getStores().iterator().next();
837      CacheConfig cacheConf = store.getCacheConfig();
838      cacheConf.setCacheDataOnWrite(true);
839      cacheConf.setEvictOnClose(true);
840      BlockCache cache = cacheConf.getBlockCache().get();
841
842      // insert data. 2 Rows are added
843      insertData(table);
844      // flush the data
845      System.out.println("Flushing cache");
846      // Should create one Hfile with 2 blocks
847      region.flush(true);
848      // CustomInnerRegionObserver.sleepTime.set(5000);
849      // Create three sets of scan
850      CustomInnerRegionObserver.waitForGets.set(true);
851      ScanThread[] scanThreads = initiateScan(table, false);
852      // Create three sets of gets
853      GetThread[] getThreads = initiateGet(table, false, false);
854      // The block would have been decremented for the scan case as it was
855      // wrapped
856      // before even the postNext hook gets executed.
857      // giving some time for the block to be decremented
858      Thread.sleep(100);
859      CustomInnerRegionObserver.waitForGets.set(false);
860      checkForBlockEviction(cache, false, false);
861      // countdown the latch
862      CustomInnerRegionObserver.getCdl().get().countDown();
863      for (GetThread thread : getThreads) {
864        thread.join();
865      }
866      getLatch.countDown();
867      for (ScanThread thread : scanThreads) {
868        thread.join();
869      }
870    } finally {
871      if (table != null) {
872        table.close();
873      }
874    }
875  }
876
877  @Test
878  public void testScanWithCompaction() throws IOException, InterruptedException {
879    testScanWithCompactionInternals(name.getMethodName(), false);
880  }
881
882  @Test
883  public void testReverseScanWithCompaction() throws IOException, InterruptedException {
884    testScanWithCompactionInternals(name.getMethodName(), true);
885  }
886
887  private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
888      throws IOException, InterruptedException {
889    Table table = null;
890    try {
891      latch = new CountDownLatch(1);
892      compactionLatch = new CountDownLatch(1);
893      TableName tableName = TableName.valueOf(tableNameStr);
894      // Create a table with block size as 1024
895      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
896          CustomInnerRegionObserverWrapper.class.getName());
897      // get the block cache and region
898      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
899      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
900      HRegion region =
901          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
902      HStore store = region.getStores().iterator().next();
903      CacheConfig cacheConf = store.getCacheConfig();
904      cacheConf.setCacheDataOnWrite(true);
905      cacheConf.setEvictOnClose(true);
906      BlockCache cache = cacheConf.getBlockCache().get();
907
908      // insert data. 2 Rows are added
909      Put put = new Put(ROW);
910      put.addColumn(FAMILY, QUALIFIER, data);
911      table.put(put);
912      put = new Put(ROW1);
913      put.addColumn(FAMILY, QUALIFIER, data);
914      table.put(put);
915      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
916      // Should create one Hfile with 2 blocks
917      region.flush(true);
918      // read the data and expect same blocks, one new hit, no misses
919      int refCount = 0;
920      // Check how this miss is happening
921      // insert a second column, read the row, no new blocks, 3 new hits
922      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
923      byte[] data2 = Bytes.add(data, data);
924      put = new Put(ROW);
925      put.addColumn(FAMILY, QUALIFIER2, data2);
926      table.put(put);
927      // flush, one new block
928      System.out.println("Flushing cache");
929      region.flush(true);
930      Iterator<CachedBlock> iterator = cache.iterator();
931      iterateBlockCache(cache, iterator);
932      // Create three sets of scan
933      ScanThread[] scanThreads = initiateScan(table, reversed);
934      Thread.sleep(100);
935      iterator = cache.iterator();
936      boolean usedBlocksFound = false;
937      while (iterator.hasNext()) {
938        CachedBlock next = iterator.next();
939        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
940        if (cache instanceof BucketCache) {
941          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
942        } else if (cache instanceof CombinedBlockCache) {
943          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
944        } else {
945          continue;
946        }
947        if (refCount != 0) {
948          // Blocks will be with count 3
949          assertEquals(NO_OF_THREADS, refCount);
950          usedBlocksFound = true;
951        }
952      }
953      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
954      usedBlocksFound = false;
955      System.out.println("Compacting");
956      assertEquals(2, store.getStorefilesCount());
957      store.triggerMajorCompaction();
958      region.compact(true);
959      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
960      assertEquals(1, store.getStorefilesCount());
961      // Even after compaction is done we will have some blocks that cannot
962      // be evicted this is because the scan is still referencing them
963      iterator = cache.iterator();
964      while (iterator.hasNext()) {
965        CachedBlock next = iterator.next();
966        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
967        if (cache instanceof BucketCache) {
968          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
969        } else if (cache instanceof CombinedBlockCache) {
970          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
971        } else {
972          continue;
973        }
974        if (refCount != 0) {
975          // Blocks will be with count 3 as they are not yet cleared
976          assertEquals(NO_OF_THREADS, refCount);
977          usedBlocksFound = true;
978        }
979      }
980      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
981      // Should not throw exception
982      compactionLatch.countDown();
983      latch.countDown();
984      for (ScanThread thread : scanThreads) {
985        thread.join();
986      }
987      // by this time all blocks should have been evicted
988      iterator = cache.iterator();
989      iterateBlockCache(cache, iterator);
990      Result r = table.get(new Get(ROW));
991      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
992      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
993      // The gets would be working on new blocks
994      iterator = cache.iterator();
995      iterateBlockCache(cache, iterator);
996    } finally {
997      if (table != null) {
998        table.close();
999      }
1000    }
1001  }
1002
1003  @Test
1004  public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
1005      throws IOException, InterruptedException {
1006    // do flush and scan in parallel
1007    Table table = null;
1008    try {
1009      latch = new CountDownLatch(1);
1010      compactionLatch = new CountDownLatch(1);
1011      final TableName tableName = TableName.valueOf(name.getMethodName());
1012      // Create a table with block size as 1024
1013      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1014          CustomInnerRegionObserverWrapper.class.getName());
1015      // get the block cache and region
1016      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1017      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1018      HRegion region =
1019          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1020      HStore store = region.getStores().iterator().next();
1021      CacheConfig cacheConf = store.getCacheConfig();
1022      cacheConf.setCacheDataOnWrite(true);
1023      cacheConf.setEvictOnClose(true);
1024      BlockCache cache = cacheConf.getBlockCache().get();
1025
1026      // insert data. 2 Rows are added
1027      Put put = new Put(ROW);
1028      put.addColumn(FAMILY, QUALIFIER, data);
1029      table.put(put);
1030      put = new Put(ROW1);
1031      put.addColumn(FAMILY, QUALIFIER, data);
1032      table.put(put);
1033      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1034      // Should create one Hfile with 2 blocks
1035      region.flush(true);
1036      // read the data and expect same blocks, one new hit, no misses
1037      int refCount = 0;
1038      // Check how this miss is happening
1039      // insert a second column, read the row, no new blocks, 3 new hits
1040      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1041      byte[] data2 = Bytes.add(data, data);
1042      put = new Put(ROW);
1043      put.addColumn(FAMILY, QUALIFIER2, data2);
1044      table.put(put);
1045      // flush, one new block
1046      System.out.println("Flushing cache");
1047      region.flush(true);
1048      Iterator<CachedBlock> iterator = cache.iterator();
1049      iterateBlockCache(cache, iterator);
1050      // Create three sets of scan
1051      ScanThread[] scanThreads = initiateScan(table, false);
1052      Thread.sleep(100);
1053      iterator = cache.iterator();
1054      boolean usedBlocksFound = false;
1055      while (iterator.hasNext()) {
1056        CachedBlock next = iterator.next();
1057        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1058        if (cache instanceof BucketCache) {
1059          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1060        } else if (cache instanceof CombinedBlockCache) {
1061          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1062        } else {
1063          continue;
1064        }
1065        if (refCount != 0) {
1066          // Blocks will be with count 3
1067          assertEquals(NO_OF_THREADS, refCount);
1068          usedBlocksFound = true;
1069        }
1070      }
1071      // Make a put and do a flush
1072      QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1073      data2 = Bytes.add(data, data);
1074      put = new Put(ROW1);
1075      put.addColumn(FAMILY, QUALIFIER2, data2);
1076      table.put(put);
1077      // flush, one new block
1078      System.out.println("Flushing cache");
1079      region.flush(true);
1080      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1081      usedBlocksFound = false;
1082      System.out.println("Compacting");
1083      assertEquals(3, store.getStorefilesCount());
1084      store.triggerMajorCompaction();
1085      region.compact(true);
1086      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1087      assertEquals(1, store.getStorefilesCount());
1088      // Even after compaction is done we will have some blocks that cannot
1089      // be evicted this is because the scan is still referencing them
1090      iterator = cache.iterator();
1091      while (iterator.hasNext()) {
1092        CachedBlock next = iterator.next();
1093        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1094        if (cache instanceof BucketCache) {
1095          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1096        } else if (cache instanceof CombinedBlockCache) {
1097          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1098        } else {
1099          continue;
1100        }
1101        if (refCount != 0) {
1102          // Blocks will be with count 3 as they are not yet cleared
1103          assertEquals(NO_OF_THREADS, refCount);
1104          usedBlocksFound = true;
1105        }
1106      }
1107      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1108      // Should not throw exception
1109      compactionLatch.countDown();
1110      latch.countDown();
1111      for (ScanThread thread : scanThreads) {
1112        thread.join();
1113      }
1114      // by this time all blocks should have been evicted
1115      iterator = cache.iterator();
1116      // Since a flush and compaction happened after a scan started
1117      // we need to ensure that all the original blocks of the compacted file
1118      // is also removed.
1119      iterateBlockCache(cache, iterator);
1120      Result r = table.get(new Get(ROW));
1121      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1122      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1123      // The gets would be working on new blocks
1124      iterator = cache.iterator();
1125      iterateBlockCache(cache, iterator);
1126    } finally {
1127      if (table != null) {
1128        table.close();
1129      }
1130    }
1131  }
1132
1133
1134  @Test
1135  public void testScanWithException() throws IOException, InterruptedException {
1136    Table table = null;
1137    try {
1138      latch = new CountDownLatch(1);
1139      exceptionLatch = new CountDownLatch(1);
1140      final TableName tableName = TableName.valueOf(name.getMethodName());
1141      // Create KV that will give you two blocks
1142      // Create a table with block size as 1024
1143      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1144          CustomInnerRegionObserverWrapper.class.getName());
1145      // get the block cache and region
1146      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1147      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1148      HRegion region =
1149          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1150      HStore store = region.getStores().iterator().next();
1151      CacheConfig cacheConf = store.getCacheConfig();
1152      cacheConf.setCacheDataOnWrite(true);
1153      cacheConf.setEvictOnClose(true);
1154      BlockCache cache = cacheConf.getBlockCache().get();
1155      // insert data. 2 Rows are added
1156      insertData(table);
1157      // flush the data
1158      System.out.println("Flushing cache");
1159      // Should create one Hfile with 2 blocks
1160      region.flush(true);
1161      // CustomInnerRegionObserver.sleepTime.set(5000);
1162      CustomInnerRegionObserver.throwException.set(true);
1163      ScanThread[] scanThreads = initiateScan(table, false);
1164      // The block would have been decremented for the scan case as it was
1165      // wrapped
1166      // before even the postNext hook gets executed.
1167      // giving some time for the block to be decremented
1168      Thread.sleep(100);
1169      Iterator<CachedBlock> iterator = cache.iterator();
1170      boolean usedBlocksFound = false;
1171      int refCount = 0;
1172      while (iterator.hasNext()) {
1173        CachedBlock next = iterator.next();
1174        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1175        if (cache instanceof BucketCache) {
1176          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1177        } else if (cache instanceof CombinedBlockCache) {
1178          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1179        } else {
1180          continue;
1181        }
1182        if (refCount != 0) {
1183          // Blocks will be with count 3
1184          assertEquals(NO_OF_THREADS, refCount);
1185          usedBlocksFound = true;
1186        }
1187      }
1188      assertTrue(usedBlocksFound);
1189      exceptionLatch.countDown();
1190      // countdown the latch
1191      CustomInnerRegionObserver.getCdl().get().countDown();
1192      for (ScanThread thread : scanThreads) {
1193        thread.join();
1194      }
1195      iterator = cache.iterator();
1196      usedBlocksFound = false;
1197      refCount = 0;
1198      while (iterator.hasNext()) {
1199        CachedBlock next = iterator.next();
1200        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1201        if (cache instanceof BucketCache) {
1202          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1203        } else if (cache instanceof CombinedBlockCache) {
1204          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1205        } else {
1206          continue;
1207        }
1208        if (refCount != 0) {
1209          // Blocks will be with count 3
1210          assertEquals(NO_OF_THREADS, refCount);
1211          usedBlocksFound = true;
1212        }
1213      }
1214      assertFalse(usedBlocksFound);
1215      // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1216      assertEquals(0, refCount);
1217    } finally {
1218      if (table != null) {
1219        table.close();
1220      }
1221    }
1222  }
1223
1224  private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1225    int refCount;
1226    while (iterator.hasNext()) {
1227      CachedBlock next = iterator.next();
1228      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1229      if (cache instanceof BucketCache) {
1230        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1231        LOG.info("BucketCache {} {}", cacheKey, refCount);
1232      } else if (cache instanceof CombinedBlockCache) {
1233        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1234        LOG.info("CombinedBlockCache {} {}", cacheKey, refCount);
1235      } else {
1236        continue;
1237      }
1238      assertEquals(0, refCount);
1239    }
1240  }
1241
1242  private void insertData(Table table) throws IOException {
1243    Put put = new Put(ROW);
1244    put.addColumn(FAMILY, QUALIFIER, data);
1245    table.put(put);
1246    put = new Put(ROW1);
1247    put.addColumn(FAMILY, QUALIFIER, data);
1248    table.put(put);
1249    byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1250    put = new Put(ROW);
1251    put.addColumn(FAMILY, QUALIFIER2, data2);
1252    table.put(put);
1253  }
1254
1255  private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException,
1256      InterruptedException {
1257    ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1258    for (int i = 0; i < NO_OF_THREADS; i++) {
1259      scanThreads[i] = new ScanThread(table, reverse);
1260    }
1261    for (ScanThread thread : scanThreads) {
1262      thread.start();
1263    }
1264    return scanThreads;
1265  }
1266
1267  private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1268      throws IOException, InterruptedException {
1269    GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1270    for (int i = 0; i < NO_OF_THREADS; i++) {
1271      getThreads[i] = new GetThread(table, tracker, multipleCFs);
1272    }
1273    for (GetThread thread : getThreads) {
1274      thread.start();
1275    }
1276    return getThreads;
1277  }
1278
1279  private MultiGetThread[] initiateMultiGet(Table table)
1280      throws IOException, InterruptedException {
1281    MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1282    for (int i = 0; i < NO_OF_THREADS; i++) {
1283      multiGetThreads[i] = new MultiGetThread(table);
1284    }
1285    for (MultiGetThread thread : multiGetThreads) {
1286      thread.start();
1287    }
1288    return multiGetThreads;
1289  }
1290
1291  private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1292      throws InterruptedException {
1293    int counter = NO_OF_THREADS;
1294    if (CustomInnerRegionObserver.waitForGets.get()) {
1295      // Because only one row is selected, it has only 2 blocks
1296      counter = counter - 1;
1297      while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1298        Thread.sleep(100);
1299      }
1300    } else {
1301      while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1302        Thread.sleep(100);
1303      }
1304    }
1305    Iterator<CachedBlock> iterator = cache.iterator();
1306    int refCount = 0;
1307    while (iterator.hasNext()) {
1308      CachedBlock next = iterator.next();
1309      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1310      if (cache instanceof BucketCache) {
1311        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1312      } else if (cache instanceof CombinedBlockCache) {
1313        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1314      } else {
1315        continue;
1316      }
1317      System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1318      if (CustomInnerRegionObserver.waitForGets.get()) {
1319        if (expectOnlyZero) {
1320          assertTrue(refCount == 0);
1321        }
1322        if (refCount != 0) {
1323          // Because the scan would have also touched up on these blocks but
1324          // it
1325          // would have touched
1326          // all 3
1327          if (getClosed) {
1328            // If get has closed only the scan's blocks would be available
1329            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1330          } else {
1331              assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1332          }
1333        }
1334      } else {
1335        // Because the get would have also touched up on these blocks but it
1336        // would have touched
1337        // upon only 2 additionally
1338        if (expectOnlyZero) {
1339          assertTrue(refCount == 0);
1340        }
1341        if (refCount != 0) {
1342          if (getLatch == null) {
1343            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1344          } else {
1345            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1346          }
1347        }
1348      }
1349    }
1350    CustomInnerRegionObserver.getCdl().get().countDown();
1351  }
1352
1353  private static class MultiGetThread extends Thread {
1354    private final Table table;
1355    private final List<Get> gets = new ArrayList<>();
1356    public MultiGetThread(Table table) {
1357      this.table = table;
1358    }
1359    @Override
1360    public void run() {
1361      gets.add(new Get(ROW));
1362      gets.add(new Get(ROW1));
1363      try {
1364        CustomInnerRegionObserver.getCdl().set(latch);
1365        Result[] r = table.get(gets);
1366        assertTrue(Bytes.equals(r[0].getRow(), ROW));
1367        assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1368      } catch (IOException e) {
1369      }
1370    }
1371  }
1372
1373  private static class GetThread extends Thread {
1374    private final Table table;
1375    private final boolean tracker;
1376    private final boolean multipleCFs;
1377
1378    public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1379      this.table = table;
1380      this.tracker = tracker;
1381      this.multipleCFs = multipleCFs;
1382    }
1383
1384    @Override
1385    public void run() {
1386      try {
1387        initiateGet(table);
1388      } catch (IOException e) {
1389        // do nothing
1390      }
1391    }
1392
1393    private void initiateGet(Table table) throws IOException {
1394      Get get = new Get(ROW);
1395      if (tracker) {
1396        // Change this
1397        if (!multipleCFs) {
1398          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1399          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1400          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1401          // Unknown key
1402          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1403        } else {
1404          get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1405          get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1406          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1407          // Unknown key
1408          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1409        }
1410      }
1411      CustomInnerRegionObserver.getCdl().set(latch);
1412      Result r = table.get(get);
1413      System.out.println(r);
1414      if (!tracker) {
1415        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1416        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1417      } else {
1418        if (!multipleCFs) {
1419          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1420          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1421          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1422        } else {
1423          assertTrue(Bytes.equals(
1424              r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1425              data2));
1426          assertTrue(Bytes.equals(
1427              r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1428              data2));
1429          assertTrue(Bytes.equals(
1430              r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1431              data2));
1432        }
1433      }
1434    }
1435  }
1436
1437  private static class ScanThread extends Thread {
1438    private final Table table;
1439    private final boolean reverse;
1440
1441    public ScanThread(Table table, boolean reverse) {
1442      this.table = table;
1443      this.reverse = reverse;
1444    }
1445
1446    @Override
1447    public void run() {
1448      try {
1449        initiateScan(table);
1450      } catch (IOException e) {
1451        // do nothing
1452      }
1453    }
1454
1455    private void initiateScan(Table table) throws IOException {
1456      Scan scan = new Scan();
1457      if (reverse) {
1458        scan.setReversed(true);
1459      }
1460      CustomInnerRegionObserver.getCdl().set(latch);
1461      ResultScanner resScanner = table.getScanner(scan);
1462      int i = (reverse ? ROWS.length - 1 : 0);
1463      boolean resultFound = false;
1464      for (Result result : resScanner) {
1465        resultFound = true;
1466        System.out.println(result);
1467        if (!reverse) {
1468          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1469          i++;
1470        } else {
1471          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1472          i--;
1473        }
1474      }
1475      assertTrue(resultFound);
1476    }
1477  }
1478
1479  private void waitForStoreFileCount(HStore store, int count, int timeout)
1480      throws InterruptedException {
1481    long start = System.currentTimeMillis();
1482    while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) {
1483      Thread.sleep(100);
1484    }
1485    System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" +
1486        store.getStorefilesCount());
1487    assertEquals(count, store.getStorefilesCount());
1488  }
1489
1490  private static class CustomScanner implements RegionScanner {
1491
1492    private RegionScanner delegate;
1493
1494    public CustomScanner(RegionScanner delegate) {
1495      this.delegate = delegate;
1496    }
1497
1498    @Override
1499    public boolean next(List<Cell> results) throws IOException {
1500      return delegate.next(results);
1501    }
1502
1503    @Override
1504    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
1505      return delegate.next(result, scannerContext);
1506    }
1507
1508    @Override
1509    public boolean nextRaw(List<Cell> result) throws IOException {
1510      return delegate.nextRaw(result);
1511    }
1512
1513    @Override
1514    public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException {
1515      boolean nextRaw = delegate.nextRaw(result, context);
1516      if (compactionLatch != null && compactionLatch.getCount() > 0) {
1517        try {
1518          compactionLatch.await();
1519        } catch (InterruptedException ie) {
1520        }
1521      }
1522
1523      if (CustomInnerRegionObserver.throwException.get()) {
1524        if (exceptionLatch.getCount() > 0) {
1525          try {
1526            exceptionLatch.await();
1527          } catch (InterruptedException e) {
1528          }
1529          throw new IOException("throw exception");
1530        }
1531      }
1532      return nextRaw;
1533    }
1534
1535    @Override
1536    public void close() throws IOException {
1537      delegate.close();
1538    }
1539
1540    @Override
1541    public RegionInfo getRegionInfo() {
1542      return delegate.getRegionInfo();
1543    }
1544
1545    @Override
1546    public boolean isFilterDone() throws IOException {
1547      return delegate.isFilterDone();
1548    }
1549
1550    @Override
1551    public boolean reseek(byte[] row) throws IOException {
1552      return false;
1553    }
1554
1555    @Override
1556    public long getMaxResultSize() {
1557      return delegate.getMaxResultSize();
1558    }
1559
1560    @Override
1561    public long getMvccReadPoint() {
1562      return delegate.getMvccReadPoint();
1563    }
1564
1565    @Override
1566    public int getBatch() {
1567      return delegate.getBatch();
1568    }
1569  }
1570
1571  public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1572    @Override
1573    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
1574        Scan scan, RegionScanner s) throws IOException {
1575      return new CustomScanner(s);
1576    }
1577  }
1578
1579  public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1580    static final AtomicInteger countOfNext = new AtomicInteger(0);
1581    static final AtomicInteger countOfGets = new AtomicInteger(0);
1582    static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1583    static final AtomicBoolean throwException = new AtomicBoolean(false);
1584    private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(
1585        new CountDownLatch(0));
1586
1587    @Override
1588    public Optional<RegionObserver> getRegionObserver() {
1589      return Optional.of(this);
1590    }
1591
1592    @Override
1593    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
1594        InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1595      slowdownCode(e, false);
1596      if (getLatch != null && getLatch.getCount() > 0) {
1597        try {
1598          getLatch.await();
1599        } catch (InterruptedException e1) {
1600        }
1601      }
1602      return hasMore;
1603    }
1604
1605    @Override
1606    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
1607        List<Cell> results) throws IOException {
1608      slowdownCode(e, true);
1609    }
1610
1611    public static AtomicReference<CountDownLatch> getCdl() {
1612      return cdl;
1613    }
1614
1615    private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e,
1616        boolean isGet) {
1617      CountDownLatch latch = getCdl().get();
1618      try {
1619        System.out.println(latch.getCount() + " is the count " + isGet);
1620        if (latch.getCount() > 0) {
1621          if (isGet) {
1622            countOfGets.incrementAndGet();
1623          } else {
1624            countOfNext.incrementAndGet();
1625          }
1626          LOG.info("Waiting for the counterCountDownLatch");
1627          latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1628          if (latch.getCount() > 0) {
1629            throw new RuntimeException("Can't wait more");
1630          }
1631        }
1632      } catch (InterruptedException e1) {
1633        LOG.error(e1.toString(), e1);
1634      }
1635    }
1636  }
1637}