001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Optional;
027import java.util.concurrent.CountDownLatch;
028import java.util.concurrent.atomic.AtomicBoolean;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellComparatorImpl;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
037import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
038import org.apache.hadoop.hbase.coprocessor.ObserverContext;
039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
041import org.apache.hadoop.hbase.coprocessor.RegionObserver;
042import org.apache.hadoop.hbase.io.hfile.BlockCache;
043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
044import org.apache.hadoop.hbase.io.hfile.CacheConfig;
045import org.apache.hadoop.hbase.io.hfile.CachedBlock;
046import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.regionserver.HStore;
049import org.apache.hadoop.hbase.regionserver.InternalScanner;
050import org.apache.hadoop.hbase.regionserver.ScanType;
051import org.apache.hadoop.hbase.regionserver.ScannerContext;
052import org.apache.hadoop.hbase.regionserver.Store;
053import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
054import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
055import org.apache.hadoop.hbase.testclassification.ClientTests;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.junit.AfterClass;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Rule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.rules.TestName;
065
066import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
067
068@Category({ LargeTests.class, ClientTests.class })
069public class TestAvoidCellReferencesIntoShippedBlocks {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073      HBaseClassTestRule.forClass(TestAvoidCellReferencesIntoShippedBlocks.class);
074
075  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
076  static byte[][] ROWS = new byte[2][];
077  private static byte[] ROW = Bytes.toBytes("testRow");
078  private static byte[] ROW1 = Bytes.toBytes("testRow1");
079  private static byte[] ROW2 = Bytes.toBytes("testRow2");
080  private static byte[] ROW3 = Bytes.toBytes("testRow3");
081  private static byte[] ROW4 = Bytes.toBytes("testRow4");
082  private static byte[] ROW5 = Bytes.toBytes("testRow5");
083  private static byte[] FAMILY = Bytes.toBytes("testFamily");
084  private static byte[][] FAMILIES_1 = new byte[1][0];
085  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
086  private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
087  private static byte[] data = new byte[1000];
088  protected static int SLAVES = 1;
089  private CountDownLatch latch = new CountDownLatch(1);
090  private static CountDownLatch compactReadLatch = new CountDownLatch(1);
091  private static AtomicBoolean doScan = new AtomicBoolean(false);
092
093  @Rule
094  public TestName name = new TestName();
095
096  /**
097   * @throws java.lang.Exception
098   */
099  @BeforeClass
100  public static void setUpBeforeClass() throws Exception {
101    ROWS[0] = ROW;
102    ROWS[1] = ROW1;
103    Configuration conf = TEST_UTIL.getConfiguration();
104    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
105      MultiRowMutationEndpoint.class.getName());
106    conf.setInt("hbase.regionserver.handler.count", 20);
107    conf.setInt("hbase.bucketcache.size", 400);
108    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
109    conf.setInt("hbase.hstore.compactionThreshold", 7);
110    conf.setFloat("hfile.block.cache.size", 0.2f);
111    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
112    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
113    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000);
114    FAMILIES_1[0] = FAMILY;
115    TEST_UTIL.startMiniCluster(SLAVES);
116    compactReadLatch = new CountDownLatch(1);
117  }
118
119  /**
120   * @throws java.lang.Exception
121   */
122  @AfterClass
123  public static void tearDownAfterClass() throws Exception {
124    TEST_UTIL.shutdownMiniCluster();
125  }
126
127  @Test
128  public void testHBase16372InCompactionWritePath() throws Exception {
129    final TableName tableName = TableName.valueOf(name.getMethodName());
130    // Create a table with block size as 1024
131    final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
132      CompactorRegionObserver.class.getName());
133    try {
134      // get the block cache and region
135      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
136      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
137      HRegion region =
138          (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
139      HStore store = region.getStores().iterator().next();
140      CacheConfig cacheConf = store.getCacheConfig();
141      cacheConf.setCacheDataOnWrite(true);
142      cacheConf.setEvictOnClose(true);
143      final BlockCache cache = cacheConf.getBlockCache().get();
144      // insert data. 5 Rows are added
145      Put put = new Put(ROW);
146      put.addColumn(FAMILY, QUALIFIER, data);
147      table.put(put);
148      put = new Put(ROW);
149      put.addColumn(FAMILY, QUALIFIER1, data);
150      table.put(put);
151      put = new Put(ROW1);
152      put.addColumn(FAMILY, QUALIFIER, data);
153      table.put(put);
154      // data was in memstore so don't expect any changes
155      region.flush(true);
156      put = new Put(ROW1);
157      put.addColumn(FAMILY, QUALIFIER1, data);
158      table.put(put);
159      put = new Put(ROW2);
160      put.addColumn(FAMILY, QUALIFIER, data);
161      table.put(put);
162      put = new Put(ROW2);
163      put.addColumn(FAMILY, QUALIFIER1, data);
164      table.put(put);
165      // data was in memstore so don't expect any changes
166      region.flush(true);
167      put = new Put(ROW3);
168      put.addColumn(FAMILY, QUALIFIER, data);
169      table.put(put);
170      put = new Put(ROW3);
171      put.addColumn(FAMILY, QUALIFIER1, data);
172      table.put(put);
173      put = new Put(ROW4);
174      put.addColumn(FAMILY, QUALIFIER, data);
175      table.put(put);
176      // data was in memstore so don't expect any changes
177      region.flush(true);
178      put = new Put(ROW4);
179      put.addColumn(FAMILY, QUALIFIER1, data);
180      table.put(put);
181      put = new Put(ROW5);
182      put.addColumn(FAMILY, QUALIFIER, data);
183      table.put(put);
184      put = new Put(ROW5);
185      put.addColumn(FAMILY, QUALIFIER1, data);
186      table.put(put);
187      // data was in memstore so don't expect any changes
188      region.flush(true);
189      // Load cache
190      Scan s = new Scan();
191      s.setMaxResultSize(1000);
192      int count;
193      try (ResultScanner scanner = table.getScanner(s)) {
194        count = Iterables.size(scanner);
195      }
196      assertEquals("Count all the rows ", 6, count);
197      // all the cache is loaded
198      // trigger a major compaction
199      ScannerThread scannerThread = new ScannerThread(table, cache);
200      scannerThread.start();
201      region.compact(true);
202      s = new Scan();
203      s.setMaxResultSize(1000);
204      try (ResultScanner scanner = table.getScanner(s)) {
205        count = Iterables.size(scanner);
206      }
207      assertEquals("Count all the rows ", 6, count);
208    } finally {
209      table.close();
210    }
211  }
212
213  private static class ScannerThread extends Thread {
214    private final Table table;
215    private final BlockCache cache;
216
217    public ScannerThread(Table table, BlockCache cache) {
218      this.table = table;
219      this.cache = cache;
220    }
221
222    @Override
223    public void run() {
224      Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1);
225      try {
226        while(!doScan.get()) {
227          try {
228            // Sleep till you start scan
229            Thread.sleep(1);
230          } catch (InterruptedException e) {
231          }
232        }
233        List<BlockCacheKey> cacheList = new ArrayList<>();
234        Iterator<CachedBlock> iterator = cache.iterator();
235        // evict all the blocks
236        while (iterator.hasNext()) {
237          CachedBlock next = iterator.next();
238          BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
239          cacheList.add(cacheKey);
240          // evict what ever is available
241          cache.evictBlock(cacheKey);
242        }
243        try (ResultScanner scanner = table.getScanner(s)) {
244          while (scanner.next() != null) {
245          }
246        }
247        compactReadLatch.countDown();
248      } catch (IOException e) {
249      }
250    }
251  }
252
253  public static class CompactorRegionObserver implements RegionCoprocessor, RegionObserver {
254
255    @Override
256    public Optional<RegionObserver> getRegionObserver() {
257      return Optional.of(this);
258    }
259
260    @Override
261    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
262        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
263        CompactionRequest request) throws IOException {
264      return new CompactorInternalScanner(scanner);
265    }
266  }
267
268  private static final class CompactorInternalScanner extends DelegatingInternalScanner {
269
270    public CompactorInternalScanner(InternalScanner scanner) {
271      super(scanner);
272    }
273
274    @Override
275    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
276      boolean next = scanner.next(result, scannerContext);
277      for (Cell cell : result) {
278        if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) {
279          try {
280            // hold the compaction
281            // set doscan to true
282            doScan.compareAndSet(false, true);
283            compactReadLatch.await();
284          } catch (InterruptedException e) {
285          }
286        }
287      }
288      return next;
289    }
290  }
291
292  @Test
293  public void testHBASE16372InReadPath() throws Exception {
294    final TableName tableName = TableName.valueOf(name.getMethodName());
295    // Create a table with block size as 1024
296    final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, null);
297    try {
298      // get the block cache and region
299      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
300      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
301      HRegion region = (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName)
302          .getRegion(regionName);
303      HStore store = region.getStores().iterator().next();
304      CacheConfig cacheConf = store.getCacheConfig();
305      cacheConf.setCacheDataOnWrite(true);
306      cacheConf.setEvictOnClose(true);
307      final BlockCache cache = cacheConf.getBlockCache().get();
308      // insert data. 5 Rows are added
309      Put put = new Put(ROW);
310      put.addColumn(FAMILY, QUALIFIER, data);
311      table.put(put);
312      put = new Put(ROW);
313      put.addColumn(FAMILY, QUALIFIER1, data);
314      table.put(put);
315      put = new Put(ROW1);
316      put.addColumn(FAMILY, QUALIFIER, data);
317      table.put(put);
318      put = new Put(ROW1);
319      put.addColumn(FAMILY, QUALIFIER1, data);
320      table.put(put);
321      put = new Put(ROW2);
322      put.addColumn(FAMILY, QUALIFIER, data);
323      table.put(put);
324      put = new Put(ROW2);
325      put.addColumn(FAMILY, QUALIFIER1, data);
326      table.put(put);
327      put = new Put(ROW3);
328      put.addColumn(FAMILY, QUALIFIER, data);
329      table.put(put);
330      put = new Put(ROW3);
331      put.addColumn(FAMILY, QUALIFIER1, data);
332      table.put(put);
333      put = new Put(ROW4);
334      put.addColumn(FAMILY, QUALIFIER, data);
335      table.put(put);
336      put = new Put(ROW4);
337      put.addColumn(FAMILY, QUALIFIER1, data);
338      table.put(put);
339      put = new Put(ROW5);
340      put.addColumn(FAMILY, QUALIFIER, data);
341      table.put(put);
342      put = new Put(ROW5);
343      put.addColumn(FAMILY, QUALIFIER1, data);
344      table.put(put);
345      // data was in memstore so don't expect any changes
346      region.flush(true);
347      // Load cache
348      Scan s = new Scan();
349      s.setMaxResultSize(1000);
350      int count;
351      try (ResultScanner scanner = table.getScanner(s)) {
352        count = Iterables.size(scanner);
353      }
354      assertEquals("Count all the rows ", 6, count);
355
356      // Scan from cache
357      s = new Scan();
358      // Start a scan from row3
359      s.setCaching(1);
360      s.withStartRow(ROW1);
361      // set partial as true so that the scan can send partial columns also
362      s.setAllowPartialResults(true);
363      s.setMaxResultSize(1000);
364      try (ResultScanner scanner = table.getScanner(s)) {
365        Thread evictorThread = new Thread() {
366          @Override
367          public void run() {
368            List<BlockCacheKey> cacheList = new ArrayList<>();
369            Iterator<CachedBlock> iterator = cache.iterator();
370            // evict all the blocks
371            while (iterator.hasNext()) {
372              CachedBlock next = iterator.next();
373              BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
374              cacheList.add(cacheKey);
375              cache.evictBlock(cacheKey);
376            }
377            try {
378              Thread.sleep(1);
379            } catch (InterruptedException e1) {
380            }
381            iterator = cache.iterator();
382            int refBlockCount = 0;
383            while (iterator.hasNext()) {
384              iterator.next();
385              refBlockCount++;
386            }
387            assertEquals("One block should be there ", 1, refBlockCount);
388            // Rescan to prepopulate the data
389            // cache this row.
390            Scan s1 = new Scan();
391            // This scan will start from ROW1 and it will populate the cache with a
392            // row that is lower than ROW3.
393            s1.withStartRow(ROW3);
394            s1.withStopRow(ROW5);
395            s1.setCaching(1);
396            ResultScanner scanner;
397            try {
398              scanner = table.getScanner(s1);
399              int count = Iterables.size(scanner);
400              assertEquals("Count the rows", 2, count);
401              int newBlockRefCount = 0;
402              List<BlockCacheKey> newCacheList = new ArrayList<>();
403              while (true) {
404                newBlockRefCount = 0;
405                newCacheList.clear();
406                iterator = cache.iterator();
407                while (iterator.hasNext()) {
408                  CachedBlock next = iterator.next();
409                  BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
410                  newCacheList.add(cacheKey);
411                }
412                for (BlockCacheKey key : cacheList) {
413                  if (newCacheList.contains(key)) {
414                    newBlockRefCount++;
415                  }
416                }
417                if (newBlockRefCount == 6) {
418                  break;
419                }
420              }
421              latch.countDown();
422            } catch (IOException e) {
423            }
424          }
425        };
426        count = 0;
427        while (scanner.next() != null) {
428          count++;
429          if (count == 2) {
430            evictorThread.start();
431            latch.await();
432          }
433        }
434      }
435      assertEquals("Count should give all rows ", 10, count);
436    } finally {
437      table.close();
438    }
439  }
440}