001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.atomic.AtomicBoolean;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellComparatorImpl;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
038import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
039import org.apache.hadoop.hbase.coprocessor.ObserverContext;
040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
042import org.apache.hadoop.hbase.coprocessor.RegionObserver;
043import org.apache.hadoop.hbase.io.hfile.BlockCache;
044import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
045import org.apache.hadoop.hbase.io.hfile.CacheConfig;
046import org.apache.hadoop.hbase.io.hfile.CachedBlock;
047import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
048import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
049import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.regionserver.HStore;
052import org.apache.hadoop.hbase.regionserver.InternalScanner;
053import org.apache.hadoop.hbase.regionserver.ScanType;
054import org.apache.hadoop.hbase.regionserver.ScannerContext;
055import org.apache.hadoop.hbase.regionserver.Store;
056import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
057import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
058import org.apache.hadoop.hbase.testclassification.ClientTests;
059import org.apache.hadoop.hbase.testclassification.LargeTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.junit.AfterClass;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.TestName;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
070
071@Category({ LargeTests.class, ClientTests.class })
072public class TestAvoidCellReferencesIntoShippedBlocks {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestAvoidCellReferencesIntoShippedBlocks.class);
077
078  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
079  static byte[][] ROWS = new byte[2][];
080  private static byte[] ROW = Bytes.toBytes("testRow");
081  private static byte[] ROW1 = Bytes.toBytes("testRow1");
082  private static byte[] ROW2 = Bytes.toBytes("testRow2");
083  private static byte[] ROW3 = Bytes.toBytes("testRow3");
084  private static byte[] ROW4 = Bytes.toBytes("testRow4");
085  private static byte[] ROW5 = Bytes.toBytes("testRow5");
086  private static byte[] FAMILY = Bytes.toBytes("testFamily");
087  private static byte[][] FAMILIES_1 = new byte[1][0];
088  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
089  private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
090  private static byte[] data = new byte[1000];
091  protected static int SLAVES = 1;
092  private CountDownLatch latch = new CountDownLatch(1);
093  private static CountDownLatch compactReadLatch = new CountDownLatch(1);
094  private static AtomicBoolean doScan = new AtomicBoolean(false);
095
096  @Rule
097  public TestName name = new TestName();
098
099  /**
100   * @throws java.lang.Exception
101   */
102  @BeforeClass
103  public static void setUpBeforeClass() throws Exception {
104    ROWS[0] = ROW;
105    ROWS[1] = ROW1;
106    Configuration conf = TEST_UTIL.getConfiguration();
107    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
108      MultiRowMutationEndpoint.class.getName());
109    conf.setInt("hbase.regionserver.handler.count", 20);
110    conf.setInt("hbase.bucketcache.size", 400);
111    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
112    conf.setInt("hbase.hstore.compactionThreshold", 7);
113    conf.setFloat("hfile.block.cache.size", 0.2f);
114    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
115    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
116    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000);
117    FAMILIES_1[0] = FAMILY;
118    TEST_UTIL.startMiniCluster(SLAVES);
119    compactReadLatch = new CountDownLatch(1);
120  }
121
122  /**
123   * @throws java.lang.Exception
124   */
125  @AfterClass
126  public static void tearDownAfterClass() throws Exception {
127    TEST_UTIL.shutdownMiniCluster();
128  }
129
130  @Test
131  public void testHBase16372InCompactionWritePath() throws Exception {
132    final TableName tableName = TableName.valueOf(name.getMethodName());
133    // Create a table with block size as 1024
134    final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
135      CompactorRegionObserver.class.getName());
136    try {
137      // get the block cache and region
138      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
139      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
140      HRegion region =
141          (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
142      HStore store = region.getStores().iterator().next();
143      CacheConfig cacheConf = store.getCacheConfig();
144      cacheConf.setCacheDataOnWrite(true);
145      cacheConf.setEvictOnClose(true);
146      final BlockCache cache = cacheConf.getBlockCache().get();
147      // insert data. 5 Rows are added
148      Put put = new Put(ROW);
149      put.addColumn(FAMILY, QUALIFIER, data);
150      table.put(put);
151      put = new Put(ROW);
152      put.addColumn(FAMILY, QUALIFIER1, data);
153      table.put(put);
154      put = new Put(ROW1);
155      put.addColumn(FAMILY, QUALIFIER, data);
156      table.put(put);
157      // data was in memstore so don't expect any changes
158      region.flush(true);
159      put = new Put(ROW1);
160      put.addColumn(FAMILY, QUALIFIER1, data);
161      table.put(put);
162      put = new Put(ROW2);
163      put.addColumn(FAMILY, QUALIFIER, data);
164      table.put(put);
165      put = new Put(ROW2);
166      put.addColumn(FAMILY, QUALIFIER1, data);
167      table.put(put);
168      // data was in memstore so don't expect any changes
169      region.flush(true);
170      put = new Put(ROW3);
171      put.addColumn(FAMILY, QUALIFIER, data);
172      table.put(put);
173      put = new Put(ROW3);
174      put.addColumn(FAMILY, QUALIFIER1, data);
175      table.put(put);
176      put = new Put(ROW4);
177      put.addColumn(FAMILY, QUALIFIER, data);
178      table.put(put);
179      // data was in memstore so don't expect any changes
180      region.flush(true);
181      put = new Put(ROW4);
182      put.addColumn(FAMILY, QUALIFIER1, data);
183      table.put(put);
184      put = new Put(ROW5);
185      put.addColumn(FAMILY, QUALIFIER, data);
186      table.put(put);
187      put = new Put(ROW5);
188      put.addColumn(FAMILY, QUALIFIER1, data);
189      table.put(put);
190      // data was in memstore so don't expect any changes
191      region.flush(true);
192      // Load cache
193      Scan s = new Scan();
194      s.setMaxResultSize(1000);
195      int count;
196      try (ResultScanner scanner = table.getScanner(s)) {
197        count = Iterables.size(scanner);
198      }
199      assertEquals("Count all the rows ", 6, count);
200      // all the cache is loaded
201      // trigger a major compaction
202      ScannerThread scannerThread = new ScannerThread(table, cache);
203      scannerThread.start();
204      region.compact(true);
205      s = new Scan();
206      s.setMaxResultSize(1000);
207      try (ResultScanner scanner = table.getScanner(s)) {
208        count = Iterables.size(scanner);
209      }
210      assertEquals("Count all the rows ", 6, count);
211    } finally {
212      table.close();
213    }
214  }
215
216  private static class ScannerThread extends Thread {
217    private final Table table;
218    private final BlockCache cache;
219
220    public ScannerThread(Table table, BlockCache cache) {
221      this.table = table;
222      this.cache = cache;
223    }
224
225    @Override
226    public void run() {
227      Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1);
228      try {
229        while(!doScan.get()) {
230          try {
231            // Sleep till you start scan
232            Thread.sleep(1);
233          } catch (InterruptedException e) {
234          }
235        }
236        List<BlockCacheKey> cacheList = new ArrayList<>();
237        Iterator<CachedBlock> iterator = cache.iterator();
238        // evict all the blocks
239        while (iterator.hasNext()) {
240          CachedBlock next = iterator.next();
241          BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
242          cacheList.add(cacheKey);
243          // evict what ever is available
244          cache.evictBlock(cacheKey);
245        }
246        try (ResultScanner scanner = table.getScanner(s)) {
247          while (scanner.next() != null) {
248          }
249        }
250        compactReadLatch.countDown();
251      } catch (IOException e) {
252      }
253    }
254  }
255
256  public static class CompactorRegionObserver implements RegionCoprocessor, RegionObserver {
257
258    @Override
259    public Optional<RegionObserver> getRegionObserver() {
260      return Optional.of(this);
261    }
262
263    @Override
264    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
265        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
266        CompactionRequest request) throws IOException {
267      return new CompactorInternalScanner(scanner);
268    }
269  }
270
271  private static final class CompactorInternalScanner extends DelegatingInternalScanner {
272
273    public CompactorInternalScanner(InternalScanner scanner) {
274      super(scanner);
275    }
276
277    @Override
278    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
279      boolean next = scanner.next(result, scannerContext);
280      for (Cell cell : result) {
281        if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) {
282          try {
283            // hold the compaction
284            // set doscan to true
285            doScan.compareAndSet(false, true);
286            compactReadLatch.await();
287          } catch (InterruptedException e) {
288          }
289        }
290      }
291      return next;
292    }
293  }
294
295  @Test
296  public void testHBASE16372InReadPath() throws Exception {
297    final TableName tableName = TableName.valueOf(name.getMethodName());
298    // Create a table with block size as 1024
299    try (Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, null)) {
300      // get the block cache and region
301      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
302      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
303      HRegion region = (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName)
304          .getRegion(regionName);
305      HStore store = region.getStores().iterator().next();
306      CacheConfig cacheConf = store.getCacheConfig();
307      cacheConf.setCacheDataOnWrite(true);
308      cacheConf.setEvictOnClose(true);
309      final BlockCache cache = cacheConf.getBlockCache().get();
310      // insert data. 5 Rows are added
311      Put put = new Put(ROW);
312      put.addColumn(FAMILY, QUALIFIER, data);
313      table.put(put);
314      put = new Put(ROW);
315      put.addColumn(FAMILY, QUALIFIER1, data);
316      table.put(put);
317      put = new Put(ROW1);
318      put.addColumn(FAMILY, QUALIFIER, data);
319      table.put(put);
320      put = new Put(ROW1);
321      put.addColumn(FAMILY, QUALIFIER1, data);
322      table.put(put);
323      put = new Put(ROW2);
324      put.addColumn(FAMILY, QUALIFIER, data);
325      table.put(put);
326      put = new Put(ROW2);
327      put.addColumn(FAMILY, QUALIFIER1, data);
328      table.put(put);
329      put = new Put(ROW3);
330      put.addColumn(FAMILY, QUALIFIER, data);
331      table.put(put);
332      put = new Put(ROW3);
333      put.addColumn(FAMILY, QUALIFIER1, data);
334      table.put(put);
335      put = new Put(ROW4);
336      put.addColumn(FAMILY, QUALIFIER, data);
337      table.put(put);
338      put = new Put(ROW4);
339      put.addColumn(FAMILY, QUALIFIER1, data);
340      table.put(put);
341      put = new Put(ROW5);
342      put.addColumn(FAMILY, QUALIFIER, data);
343      table.put(put);
344      put = new Put(ROW5);
345      put.addColumn(FAMILY, QUALIFIER1, data);
346      table.put(put);
347      // data was in memstore so don't expect any changes
348      region.flush(true);
349      // Load cache
350      Scan s = new Scan();
351      s.setMaxResultSize(1000);
352      int count;
353      try (ResultScanner scanner = table.getScanner(s)) {
354        count = Iterables.size(scanner);
355      }
356      assertEquals("Count all the rows ", 6, count);
357
358      // Scan from cache
359      s = new Scan();
360      // Start a scan from row3
361      s.setCaching(1);
362      s.withStartRow(ROW1);
363      // set partial as true so that the scan can send partial columns also
364      s.setAllowPartialResults(true);
365      s.setMaxResultSize(1000);
366      try (ScanPerNextResultScanner scanner =
367        new ScanPerNextResultScanner(TEST_UTIL.getAsyncConnection().getTable(tableName), s)) {
368        Thread evictorThread = new Thread() {
369          @Override
370          public void run() {
371            List<BlockCacheKey> cacheList = new ArrayList<>();
372            Iterator<CachedBlock> iterator = cache.iterator();
373            // evict all the blocks
374            while (iterator.hasNext()) {
375              CachedBlock next = iterator.next();
376              BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
377              cacheList.add(cacheKey);
378              /**
379               * There is only one Block referenced by rpc,here we evict blocks which have no rpc
380               * referenced.
381               */
382              evictBlock(cache, cacheKey);
383            }
384            try {
385              Thread.sleep(1);
386            } catch (InterruptedException e1) {
387            }
388            iterator = cache.iterator();
389            int refBlockCount = 0;
390            while (iterator.hasNext()) {
391              iterator.next();
392              refBlockCount++;
393            }
394            assertEquals("One block should be there ", 1, refBlockCount);
395            // Rescan to prepopulate the data
396            // cache this row.
397            Scan s1 = new Scan();
398            // This scan will start from ROW1 and it will populate the cache with a
399            // row that is lower than ROW3.
400            s1.withStartRow(ROW3);
401            s1.withStopRow(ROW5);
402            s1.setCaching(1);
403
404            try (ResultScanner scanner = table.getScanner(s1)) {
405              int count = Iterables.size(scanner);
406              assertEquals("Count the rows", 2, count);
407              int newBlockRefCount = 0;
408              List<BlockCacheKey> newCacheList = new ArrayList<>();
409              while (true) {
410                newBlockRefCount = 0;
411                newCacheList.clear();
412                iterator = cache.iterator();
413                while (iterator.hasNext()) {
414                  CachedBlock next = iterator.next();
415                  BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
416                  newCacheList.add(cacheKey);
417                }
418                for (BlockCacheKey key : cacheList) {
419                  if (newCacheList.contains(key)) {
420                    newBlockRefCount++;
421                  }
422                }
423                if (newBlockRefCount == 6) {
424                  break;
425                }
426              }
427              latch.countDown();
428            } catch (IOException e) {
429            }
430          }
431        };
432        count = 0;
433        while (scanner.next() != null) {
434          count++;
435          if (count == 2) {
436            evictorThread.start();
437            latch.await();
438          }
439        }
440      }
441      assertEquals("Count should give all rows ", 10, count);
442    }
443  }
444
445  /**
446   * For {@link BucketCache},we only evict Block if there is no rpc referenced.
447   */
448  private void evictBlock(BlockCache blockCache, BlockCacheKey blockCacheKey) {
449    assertTrue(blockCache instanceof CombinedBlockCache);
450    BlockCache[] blockCaches = blockCache.getBlockCaches();
451    for (BlockCache currentBlockCache : blockCaches) {
452      if (currentBlockCache instanceof BucketCache) {
453        ((BucketCache) currentBlockCache).evictBlockIfNoRpcReferenced(blockCacheKey);
454      } else {
455        currentBlockCache.evictBlock(blockCacheKey);
456      }
457    }
458
459  }
460}