001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.atomic.AtomicBoolean;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellComparatorImpl;
033import org.apache.hadoop.hbase.ExtendedCell;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
038import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
039import org.apache.hadoop.hbase.coprocessor.ObserverContext;
040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
042import org.apache.hadoop.hbase.coprocessor.RegionObserver;
043import org.apache.hadoop.hbase.io.hfile.BlockCache;
044import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
045import org.apache.hadoop.hbase.io.hfile.CacheConfig;
046import org.apache.hadoop.hbase.io.hfile.CachedBlock;
047import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
048import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
049import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.regionserver.HStore;
052import org.apache.hadoop.hbase.regionserver.InternalScanner;
053import org.apache.hadoop.hbase.regionserver.ScanType;
054import org.apache.hadoop.hbase.regionserver.ScannerContext;
055import org.apache.hadoop.hbase.regionserver.Store;
056import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
057import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
058import org.apache.hadoop.hbase.testclassification.ClientTests;
059import org.apache.hadoop.hbase.testclassification.LargeTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.junit.jupiter.api.AfterAll;
062import org.junit.jupiter.api.BeforeAll;
063import org.junit.jupiter.api.BeforeEach;
064import org.junit.jupiter.api.Tag;
065import org.junit.jupiter.api.Test;
066import org.junit.jupiter.api.TestInfo;
067
068import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
069
070@Tag(LargeTests.TAG)
071@Tag(ClientTests.TAG)
072public class TestAvoidCellReferencesIntoShippedBlocks {
073
074  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
075  static byte[][] ROWS = new byte[2][];
076  private static byte[] ROW = Bytes.toBytes("testRow");
077  private static byte[] ROW1 = Bytes.toBytes("testRow1");
078  private static byte[] ROW2 = Bytes.toBytes("testRow2");
079  private static byte[] ROW3 = Bytes.toBytes("testRow3");
080  private static byte[] ROW4 = Bytes.toBytes("testRow4");
081  private static byte[] ROW5 = Bytes.toBytes("testRow5");
082  private static byte[] FAMILY = Bytes.toBytes("testFamily");
083  private static byte[][] FAMILIES_1 = new byte[1][0];
084  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
085  private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
086  private static byte[] data = new byte[1000];
087  protected static int SLAVES = 1;
088  private CountDownLatch latch = new CountDownLatch(1);
089  private static CountDownLatch compactReadLatch = new CountDownLatch(1);
090  private static AtomicBoolean doScan = new AtomicBoolean(false);
091  private String methodName;
092
093  @BeforeEach
094  public void setUp(TestInfo testInfo) throws Exception {
095    this.methodName = testInfo.getTestMethod().get().getName();
096  }
097
098  @BeforeAll
099  public static void setUpBeforeClass() throws Exception {
100    ROWS[0] = ROW;
101    ROWS[1] = ROW1;
102    Configuration conf = TEST_UTIL.getConfiguration();
103    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
104      MultiRowMutationEndpoint.class.getName());
105    conf.setInt("hbase.regionserver.handler.count", 20);
106    conf.setInt("hbase.bucketcache.size", 400);
107    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
108    conf.setInt("hbase.hstore.compactionThreshold", 7);
109    conf.setFloat("hfile.block.cache.size", 0.2f);
110    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
111    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
112    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000);
113    FAMILIES_1[0] = FAMILY;
114    TEST_UTIL.startMiniCluster(SLAVES);
115    compactReadLatch = new CountDownLatch(1);
116  }
117
118  /**
119   * @throws java.lang.Exception
120   */
121  @AfterAll
122  public static void tearDownAfterClass() throws Exception {
123    TEST_UTIL.shutdownMiniCluster();
124  }
125
126  @Test
127  public void testHBase16372InCompactionWritePath() throws Exception {
128    final TableName tableName = TableName.valueOf(methodName);
129    // Create a table with block size as 1024
130    final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
131      CompactorRegionObserver.class.getName());
132    try {
133      // get the block cache and region
134      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
135      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
136      HRegion region =
137        (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
138      HStore store = region.getStores().iterator().next();
139      CacheConfig cacheConf = store.getCacheConfig();
140      cacheConf.setCacheDataOnWrite(true);
141      cacheConf.setEvictOnClose(true);
142      final BlockCache cache = cacheConf.getBlockCache().get();
143      // insert data. 5 Rows are added
144      Put put = new Put(ROW);
145      put.addColumn(FAMILY, QUALIFIER, data);
146      table.put(put);
147      put = new Put(ROW);
148      put.addColumn(FAMILY, QUALIFIER1, data);
149      table.put(put);
150      put = new Put(ROW1);
151      put.addColumn(FAMILY, QUALIFIER, data);
152      table.put(put);
153      // data was in memstore so don't expect any changes
154      region.flush(true);
155      put = new Put(ROW1);
156      put.addColumn(FAMILY, QUALIFIER1, data);
157      table.put(put);
158      put = new Put(ROW2);
159      put.addColumn(FAMILY, QUALIFIER, data);
160      table.put(put);
161      put = new Put(ROW2);
162      put.addColumn(FAMILY, QUALIFIER1, data);
163      table.put(put);
164      // data was in memstore so don't expect any changes
165      region.flush(true);
166      put = new Put(ROW3);
167      put.addColumn(FAMILY, QUALIFIER, data);
168      table.put(put);
169      put = new Put(ROW3);
170      put.addColumn(FAMILY, QUALIFIER1, data);
171      table.put(put);
172      put = new Put(ROW4);
173      put.addColumn(FAMILY, QUALIFIER, data);
174      table.put(put);
175      // data was in memstore so don't expect any changes
176      region.flush(true);
177      put = new Put(ROW4);
178      put.addColumn(FAMILY, QUALIFIER1, data);
179      table.put(put);
180      put = new Put(ROW5);
181      put.addColumn(FAMILY, QUALIFIER, data);
182      table.put(put);
183      put = new Put(ROW5);
184      put.addColumn(FAMILY, QUALIFIER1, data);
185      table.put(put);
186      // data was in memstore so don't expect any changes
187      region.flush(true);
188      // Load cache
189      Scan s = new Scan();
190      s.setMaxResultSize(1000);
191      int count;
192      try (ResultScanner scanner = table.getScanner(s)) {
193        count = Iterables.size(scanner);
194      }
195      assertEquals(6, count, "Count all the rows ");
196      // all the cache is loaded
197      // trigger a major compaction
198      ScannerThread scannerThread = new ScannerThread(table, cache);
199      scannerThread.start();
200      region.compact(true);
201      s = new Scan();
202      s.setMaxResultSize(1000);
203      try (ResultScanner scanner = table.getScanner(s)) {
204        count = Iterables.size(scanner);
205      }
206      assertEquals(6, count, "Count all the rows ");
207    } finally {
208      table.close();
209    }
210  }
211
212  private static class ScannerThread extends Thread {
213    private final Table table;
214    private final BlockCache cache;
215
216    public ScannerThread(Table table, BlockCache cache) {
217      this.table = table;
218      this.cache = cache;
219    }
220
221    @Override
222    public void run() {
223      Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1);
224      try {
225        while (!doScan.get()) {
226          try {
227            // Sleep till you start scan
228            Thread.sleep(1);
229          } catch (InterruptedException e) {
230          }
231        }
232        List<BlockCacheKey> cacheList = new ArrayList<>();
233        Iterator<CachedBlock> iterator = cache.iterator();
234        // evict all the blocks
235        while (iterator.hasNext()) {
236          CachedBlock next = iterator.next();
237          BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
238          cacheList.add(cacheKey);
239          // evict what ever is available
240          cache.evictBlock(cacheKey);
241        }
242        try (ResultScanner scanner = table.getScanner(s)) {
243          while (scanner.next() != null) {
244          }
245        }
246        compactReadLatch.countDown();
247      } catch (IOException e) {
248      }
249    }
250  }
251
252  public static class CompactorRegionObserver implements RegionCoprocessor, RegionObserver {
253
254    @Override
255    public Optional<RegionObserver> getRegionObserver() {
256      return Optional.of(this);
257    }
258
259    @Override
260    public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c,
261      Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
262      CompactionRequest request) throws IOException {
263      return new CompactorInternalScanner(scanner);
264    }
265  }
266
267  private static final class CompactorInternalScanner extends DelegatingInternalScanner {
268
269    public CompactorInternalScanner(InternalScanner scanner) {
270      super(scanner);
271    }
272
273    @Override
274    public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
275      throws IOException {
276      boolean next = scanner.next(result, scannerContext);
277      for (Iterator<? super ExtendedCell> iter = result.iterator(); iter.hasNext();) {
278        Cell cell = (Cell) iter.next();
279        if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) {
280          try {
281            // hold the compaction
282            // set doscan to true
283            doScan.compareAndSet(false, true);
284            compactReadLatch.await();
285          } catch (InterruptedException e) {
286          }
287        }
288      }
289      return next;
290    }
291  }
292
293  @Test
294  public void testHBASE16372InReadPath() throws Exception {
295    final TableName tableName = TableName.valueOf(methodName);
296    // Create a table with block size as 1024
297    try (Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, null)) {
298      // get the block cache and region
299      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
300      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
301      HRegion region =
302        (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
303      HStore store = region.getStores().iterator().next();
304      CacheConfig cacheConf = store.getCacheConfig();
305      cacheConf.setCacheDataOnWrite(true);
306      cacheConf.setEvictOnClose(true);
307      final BlockCache cache = cacheConf.getBlockCache().get();
308      // insert data. 5 Rows are added
309      Put put = new Put(ROW);
310      put.addColumn(FAMILY, QUALIFIER, data);
311      table.put(put);
312      put = new Put(ROW);
313      put.addColumn(FAMILY, QUALIFIER1, data);
314      table.put(put);
315      put = new Put(ROW1);
316      put.addColumn(FAMILY, QUALIFIER, data);
317      table.put(put);
318      put = new Put(ROW1);
319      put.addColumn(FAMILY, QUALIFIER1, data);
320      table.put(put);
321      put = new Put(ROW2);
322      put.addColumn(FAMILY, QUALIFIER, data);
323      table.put(put);
324      put = new Put(ROW2);
325      put.addColumn(FAMILY, QUALIFIER1, data);
326      table.put(put);
327      put = new Put(ROW3);
328      put.addColumn(FAMILY, QUALIFIER, data);
329      table.put(put);
330      put = new Put(ROW3);
331      put.addColumn(FAMILY, QUALIFIER1, data);
332      table.put(put);
333      put = new Put(ROW4);
334      put.addColumn(FAMILY, QUALIFIER, data);
335      table.put(put);
336      put = new Put(ROW4);
337      put.addColumn(FAMILY, QUALIFIER1, data);
338      table.put(put);
339      put = new Put(ROW5);
340      put.addColumn(FAMILY, QUALIFIER, data);
341      table.put(put);
342      put = new Put(ROW5);
343      put.addColumn(FAMILY, QUALIFIER1, data);
344      table.put(put);
345      // data was in memstore so don't expect any changes
346      region.flush(true);
347      // Load cache
348      Scan s = new Scan();
349      s.setMaxResultSize(1000);
350      int count;
351      try (ResultScanner scanner = table.getScanner(s)) {
352        count = Iterables.size(scanner);
353      }
354      assertEquals(6, count, "Count all the rows ");
355
356      // Scan from cache
357      s = new Scan();
358      // Start a scan from row3
359      s.setCaching(1);
360      s.withStartRow(ROW1);
361      // set partial as true so that the scan can send partial columns also
362      s.setAllowPartialResults(true);
363      s.setMaxResultSize(1000);
364      try (ScanPerNextResultScanner scanner =
365        new ScanPerNextResultScanner(TEST_UTIL.getAsyncConnection().getTable(tableName), s)) {
366        Thread evictorThread = new Thread() {
367          @Override
368          public void run() {
369            List<BlockCacheKey> cacheList = new ArrayList<>();
370            Iterator<CachedBlock> iterator = cache.iterator();
371            // evict all the blocks
372            while (iterator.hasNext()) {
373              CachedBlock next = iterator.next();
374              BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
375              cacheList.add(cacheKey);
376              /**
377               * There is only one Block referenced by rpc,here we evict blocks which have no rpc
378               * referenced.
379               */
380              evictBlock(cache, cacheKey);
381            }
382            try {
383              Thread.sleep(1);
384            } catch (InterruptedException e1) {
385            }
386            iterator = cache.iterator();
387            int refBlockCount = 0;
388            while (iterator.hasNext()) {
389              iterator.next();
390              refBlockCount++;
391            }
392            assertEquals(1, refBlockCount, "One block should be there ");
393            // Rescan to prepopulate the data
394            // cache this row.
395            Scan s1 = new Scan();
396            // This scan will start from ROW1 and it will populate the cache with a
397            // row that is lower than ROW3.
398            s1.withStartRow(ROW3);
399            s1.withStopRow(ROW5);
400            s1.setCaching(1);
401
402            try (ResultScanner scanner = table.getScanner(s1)) {
403              int count = Iterables.size(scanner);
404              assertEquals(2, count, "Count the rows");
405              int newBlockRefCount = 0;
406              List<BlockCacheKey> newCacheList = new ArrayList<>();
407              while (true) {
408                newBlockRefCount = 0;
409                newCacheList.clear();
410                iterator = cache.iterator();
411                while (iterator.hasNext()) {
412                  CachedBlock next = iterator.next();
413                  BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
414                  newCacheList.add(cacheKey);
415                }
416                for (BlockCacheKey key : cacheList) {
417                  if (newCacheList.contains(key)) {
418                    newBlockRefCount++;
419                  }
420                }
421                if (newBlockRefCount == 6) {
422                  break;
423                }
424              }
425              latch.countDown();
426            } catch (IOException e) {
427            }
428          }
429        };
430        count = 0;
431        while (scanner.next() != null) {
432          count++;
433          if (count == 2) {
434            evictorThread.start();
435            latch.await();
436          }
437        }
438      }
439      assertEquals(10, count, "Count should give all rows ");
440    }
441  }
442
443  /**
444   * For {@link BucketCache},we only evict Block if there is no rpc referenced.
445   */
446  private void evictBlock(BlockCache blockCache, BlockCacheKey blockCacheKey) {
447    assertTrue(blockCache instanceof CombinedBlockCache);
448    BlockCache[] blockCaches = blockCache.getBlockCaches();
449    for (BlockCache currentBlockCache : blockCaches) {
450      if (currentBlockCache instanceof BucketCache) {
451        ((BucketCache) currentBlockCache).evictBlockIfNoRpcReferenced(blockCacheKey);
452      } else {
453        currentBlockCache.evictBlock(blockCacheKey);
454      }
455    }
456
457  }
458}