001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.KeyValueTestUtil.create;
021import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.NavigableSet;
029import java.util.Random;
030import java.util.TreeSet;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.locks.ReentrantReadWriteLock;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.CellComparator;
038import org.apache.hadoop.hbase.CellComparatorImpl;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HColumnDescriptor;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.HRegionInfo;
045import org.apache.hadoop.hbase.HTableDescriptor;
046import org.apache.hadoop.hbase.KeepDeletedCells;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.io.hfile.CacheConfig;
052import org.apache.hadoop.hbase.io.hfile.HFileContext;
053import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
054import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.testclassification.RegionServerTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.junit.BeforeClass;
059import org.junit.ClassRule;
060import org.junit.Rule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.rules.TestName;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066/**
067 * This test tests whether parallel {@link StoreScanner#close()} and
068 * {@link StoreScanner#updateReaders(List, List)} works perfectly ensuring
069 * that there are no references on the existing Storescanner readers.
070 */
071@Category({ RegionServerTests.class, MediumTests.class })
072public class TestStoreScannerClosure {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestStoreScannerClosure.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestStoreScannerClosure.class);
079  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
080  @Rule
081  public TestName name = new TestName();
082  private static final String CF_STR = "cf";
083  private static HRegion region;
084  private static final byte[] CF = Bytes.toBytes(CF_STR);
085  static Configuration CONF = HBaseConfiguration.create();
086  private static CacheConfig cacheConf;
087  private static FileSystem fs;
088  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
089  private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFile").toString();
090  private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
091      KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false);
092  private final static byte[] fam = Bytes.toBytes("cf_1");
093  private static final KeyValue[] kvs =
094      new KeyValue[] { create("R1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
095          create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"),
096          create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
097          create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"),
098          create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"),
099          create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"),
100          create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"),
101          create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"),
102          create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"),
103          create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), };
104
105  @BeforeClass
106  public static void setUp() throws Exception {
107    CONF = TEST_UTIL.getConfiguration();
108    cacheConf = new CacheConfig(CONF);
109    fs = TEST_UTIL.getTestFileSystem();
110    TableName tableName = TableName.valueOf("test");
111    HTableDescriptor htd = new HTableDescriptor(tableName);
112    htd.addFamily(new HColumnDescriptor(fam));
113    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
114    Path path = TEST_UTIL.getDataTestDir("test");
115    region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
116  }
117
118  @Test
119  public void testScannerCloseAndUpdateReadersWithMemstoreScanner() throws Exception {
120    Put p = new Put(Bytes.toBytes("row"));
121    p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
122    region.put(p);
123    // create the store scanner here.
124    // for easiness, use Long.MAX_VALUE as read pt
125    try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo,
126        new Scan(), getCols("q1"), Long.MAX_VALUE)) {
127      p = new Put(Bytes.toBytes("row1"));
128      p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
129      region.put(p);
130      HStore store = region.getStore(fam);
131      ReentrantReadWriteLock lock = store.lock;
132      // use the lock to manually get a new memstore scanner. this is what
133      // HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here
134      //since it is just a testcase).
135      lock.readLock().lock();
136      final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE);
137      lock.readLock().unlock();
138      Thread closeThread = new Thread() {
139        public void run() {
140          // close should be completed
141          scan.close(false, true);
142        }
143      };
144      closeThread.start();
145      Thread updateThread = new Thread() {
146        public void run() {
147          try {
148            // use the updated memstoreScanners and pass it to updateReaders
149            scan.updateReaders(true, Collections.emptyList(), memScanners);
150          } catch (IOException e) {
151            e.printStackTrace();
152          }
153        }
154      };
155      updateThread.start();
156      // wait for close and updateThread to complete
157      closeThread.join();
158      updateThread.join();
159      MemStoreLAB memStoreLAB;
160      for (KeyValueScanner scanner : memScanners) {
161        if (scanner instanceof SegmentScanner) {
162          memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB();
163          if (memStoreLAB != null) {
164            // There should be no unpooled chunks
165            int openScannerCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
166            assertTrue("The memstore should not have unpooled chunks", openScannerCount == 0);
167          }
168        }
169      }
170    }
171  }
172
173  @Test
174  public void testScannerCloseAndUpdateReaders1() throws Exception {
175    testScannerCloseAndUpdateReaderInternal(true, false);
176  }
177
178  @Test
179  public void testScannerCloseAndUpdateReaders2() throws Exception {
180    testScannerCloseAndUpdateReaderInternal(false, true);
181  }
182
183  private Path writeStoreFile() throws IOException {
184    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile");
185    HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build();
186    StoreFileWriter sfw = new StoreFileWriter.Builder(CONF, fs).withOutputDir(storeFileParentDir)
187        .withComparator(CellComparatorImpl.COMPARATOR).withFileContext(meta).build();
188
189    final int rowLen = 32;
190    Random RNG = new Random();
191    for (int i = 0; i < 1000; ++i) {
192      byte[] k = RandomKeyValueUtil.randomOrderedKey(RNG, i);
193      byte[] v = RandomKeyValueUtil.randomValue(RNG);
194      int cfLen = RNG.nextInt(k.length - rowLen + 1);
195      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
196          k.length - rowLen - cfLen, RNG.nextLong(), generateKeyType(RNG), v, 0, v.length);
197      sfw.append(kv);
198    }
199
200    sfw.close();
201    return sfw.getPath();
202  }
203
204  private static KeyValue.Type generateKeyType(Random rand) {
205    if (rand.nextBoolean()) {
206      // Let's make half of KVs puts.
207      return KeyValue.Type.Put;
208    } else {
209      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
210      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
211        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
212            + "Probably the layout of KeyValue.Type has changed.");
213      }
214      return keyType;
215    }
216  }
217
218  private HStoreFile readStoreFile(Path storeFilePath, Configuration conf) throws Exception {
219    // Open the file reader with block cache disabled.
220    HStoreFile file = new HStoreFile(fs, storeFilePath, conf, cacheConf, BloomType.NONE, true);
221    return file;
222  }
223
224  private void testScannerCloseAndUpdateReaderInternal(boolean awaitUpdate, boolean awaitClose)
225      throws IOException, InterruptedException {
226    // start write to store file.
227    Path path = writeStoreFile();
228    HStoreFile file = null;
229    List<HStoreFile> files = new ArrayList<HStoreFile>();
230    try {
231      file = readStoreFile(path, CONF);
232      files.add(file);
233    } catch (Exception e) {
234      // fail test
235      assertTrue(false);
236    }
237    scanFixture(kvs);
238    // scanners.add(storeFileScanner);
239    try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo,
240        new Scan(), getCols("a", "d"), 100L)) {
241      Thread closeThread = new Thread() {
242        public void run() {
243          scan.close(awaitClose, true);
244        }
245      };
246      closeThread.start();
247      Thread updateThread = new Thread() {
248        public void run() {
249          try {
250            scan.updateReaders(awaitUpdate, files, Collections.emptyList());
251          } catch (IOException e) {
252            e.printStackTrace();
253          }
254        }
255      };
256      updateThread.start();
257      // complete both the threads
258      closeThread.join();
259      // complete both the threads
260      updateThread.join();
261      if (file.getReader() != null) {
262        // the fileReader is not null when the updateReaders has completed first.
263        // in the other case the fileReader will be null.
264        int refCount = file.getReader().getRefCount();
265        LOG.info("the store scanner count is " + refCount);
266        assertTrue("The store scanner count should be 0", refCount == 0);
267      }
268    }
269  }
270
271  private static class ExtendedStoreScanner extends StoreScanner {
272    private CountDownLatch latch = new CountDownLatch(1);
273
274    public ExtendedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
275        NavigableSet<byte[]> columns, long readPt) throws IOException {
276      super(store, scanInfo, scan, columns, readPt);
277    }
278
279    public void updateReaders(boolean await, List<HStoreFile> sfs,
280        List<KeyValueScanner> memStoreScanners) throws IOException {
281      if (await) {
282        try {
283          latch.await();
284        } catch (InterruptedException e) {
285          // TODO Auto-generated catch block
286          e.printStackTrace();
287        }
288      }
289      super.updateReaders(sfs, memStoreScanners);
290      if (!await) {
291        latch.countDown();
292      }
293    }
294
295    // creating a dummy close
296    public void close(boolean await, boolean dummy) {
297      if (await) {
298        try {
299          latch.await();
300        } catch (InterruptedException e) {
301          // TODO Auto-generated catch block
302          e.printStackTrace();
303        }
304      }
305      super.close();
306      if (!await) {
307        latch.countDown();
308      }
309    }
310  }
311
312  NavigableSet<byte[]> getCols(String... strCols) {
313    NavigableSet<byte[]> cols = new TreeSet<>(Bytes.BYTES_COMPARATOR);
314    for (String col : strCols) {
315      byte[] bytes = Bytes.toBytes(col);
316      cols.add(bytes);
317    }
318    return cols;
319  }
320}