001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.KeyValueTestUtil.create; 021import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.NavigableSet; 029import java.util.Random; 030import java.util.TreeSet; 031import java.util.concurrent.CountDownLatch; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.CellComparator; 037import org.apache.hadoop.hbase.CellComparatorImpl; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HColumnDescriptor; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.HRegionInfo; 044import org.apache.hadoop.hbase.HTableDescriptor; 045import org.apache.hadoop.hbase.KeepDeletedCells; 046import org.apache.hadoop.hbase.KeyValue; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.io.hfile.CacheConfig; 050import org.apache.hadoop.hbase.io.hfile.HFileContext; 051import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 052import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.testclassification.RegionServerTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.rules.TestName; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064/** 065 * This test tests whether parallel {@link StoreScanner#close()} and 066 * {@link StoreScanner#updateReaders(List, List)} works perfectly ensuring 067 * that there are no references on the existing Storescanner readers. 068 */ 069@Category({ RegionServerTests.class, MediumTests.class }) 070public class TestStoreScannerClosure { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestStoreScannerClosure.class); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestStoreScannerClosure.class); 077 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 078 @Rule 079 public TestName name = new TestName(); 080 private static final String CF_STR = "cf"; 081 private static HRegion region; 082 private static final byte[] CF = Bytes.toBytes(CF_STR); 083 static Configuration CONF = HBaseConfiguration.create(); 084 private static CacheConfig cacheConf; 085 private static FileSystem fs; 086 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 087 private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFile").toString(); 088 private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, 089 KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false); 090 private final static byte[] fam = Bytes.toBytes("cf_1"); 091 private static final KeyValue[] kvs = 092 new KeyValue[] { create("R1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), 093 create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"), 094 create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"), 095 create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"), 096 create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"), 097 create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"), 098 create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"), 099 create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"), 100 create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"), 101 create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), }; 102 103 @BeforeClass 104 public static void setUp() throws Exception { 105 CONF = TEST_UTIL.getConfiguration(); 106 cacheConf = new CacheConfig(CONF); 107 fs = TEST_UTIL.getTestFileSystem(); 108 TableName tableName = TableName.valueOf("test"); 109 HTableDescriptor htd = new HTableDescriptor(tableName); 110 htd.addFamily(new HColumnDescriptor(fam)); 111 HRegionInfo info = new HRegionInfo(tableName, null, null, false); 112 Path path = TEST_UTIL.getDataTestDir("test"); 113 region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd); 114 } 115 116 @Test 117 public void testScannerCloseAndUpdateReaders1() throws Exception { 118 testScannerCloseAndUpdateReaderInternal(true, false); 119 } 120 121 @Test 122 public void testScannerCloseAndUpdateReaders2() throws Exception { 123 testScannerCloseAndUpdateReaderInternal(false, true); 124 } 125 126 private Path writeStoreFile() throws IOException { 127 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile"); 128 HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build(); 129 StoreFileWriter sfw = new StoreFileWriter.Builder(CONF, fs).withOutputDir(storeFileParentDir) 130 .withComparator(CellComparatorImpl.COMPARATOR).withFileContext(meta).build(); 131 132 final int rowLen = 32; 133 Random RNG = new Random(); 134 for (int i = 0; i < 1000; ++i) { 135 byte[] k = RandomKeyValueUtil.randomOrderedKey(RNG, i); 136 byte[] v = RandomKeyValueUtil.randomValue(RNG); 137 int cfLen = RNG.nextInt(k.length - rowLen + 1); 138 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 139 k.length - rowLen - cfLen, RNG.nextLong(), generateKeyType(RNG), v, 0, v.length); 140 sfw.append(kv); 141 } 142 143 sfw.close(); 144 return sfw.getPath(); 145 } 146 147 private static KeyValue.Type generateKeyType(Random rand) { 148 if (rand.nextBoolean()) { 149 // Let's make half of KVs puts. 150 return KeyValue.Type.Put; 151 } else { 152 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 153 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 154 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 155 + "Probably the layout of KeyValue.Type has changed."); 156 } 157 return keyType; 158 } 159 } 160 161 private HStoreFile readStoreFile(Path storeFilePath, Configuration conf) throws Exception { 162 // Open the file reader with block cache disabled. 163 HStoreFile file = new HStoreFile(fs, storeFilePath, conf, cacheConf, BloomType.NONE, true); 164 return file; 165 } 166 167 private void testScannerCloseAndUpdateReaderInternal(boolean awaitUpdate, boolean awaitClose) 168 throws IOException, InterruptedException { 169 // start write to store file. 170 Path path = writeStoreFile(); 171 HStoreFile file = null; 172 List<HStoreFile> files = new ArrayList<HStoreFile>(); 173 try { 174 file = readStoreFile(path, CONF); 175 files.add(file); 176 } catch (Exception e) { 177 // fail test 178 assertTrue(false); 179 } 180 scanFixture(kvs); 181 // scanners.add(storeFileScanner); 182 try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, 183 new Scan(), getCols("a", "d"), 100L)) { 184 Thread closeThread = new Thread() { 185 public void run() { 186 scan.close(awaitClose, true); 187 } 188 }; 189 closeThread.start(); 190 Thread updateThread = new Thread() { 191 public void run() { 192 try { 193 scan.updateReaders(awaitUpdate, files, Collections.emptyList()); 194 } catch (IOException e) { 195 e.printStackTrace(); 196 } 197 } 198 }; 199 updateThread.start(); 200 // complete both the threads 201 closeThread.join(); 202 // complete both the threads 203 updateThread.join(); 204 if (file.getReader() != null) { 205 // the fileReader is not null when the updateReaders has completed first. 206 // in the other case the fileReader will be null. 207 int refCount = file.getReader().getRefCount(); 208 LOG.info("the store scanner count is " + refCount); 209 assertTrue("The store scanner count should be 0", refCount == 0); 210 } 211 } 212 } 213 214 private static class ExtendedStoreScanner extends StoreScanner { 215 private CountDownLatch latch = new CountDownLatch(1); 216 217 public ExtendedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, 218 NavigableSet<byte[]> columns, long readPt) throws IOException { 219 super(store, scanInfo, scan, columns, readPt); 220 } 221 222 public void updateReaders(boolean await, List<HStoreFile> sfs, 223 List<KeyValueScanner> memStoreScanners) throws IOException { 224 if (await) { 225 try { 226 latch.await(); 227 } catch (InterruptedException e) { 228 // TODO Auto-generated catch block 229 e.printStackTrace(); 230 } 231 } 232 super.updateReaders(sfs, memStoreScanners); 233 if (!await) { 234 latch.countDown(); 235 } 236 } 237 238 // creating a dummy close 239 public void close(boolean await, boolean dummy) { 240 if (await) { 241 try { 242 latch.await(); 243 } catch (InterruptedException e) { 244 // TODO Auto-generated catch block 245 e.printStackTrace(); 246 } 247 } 248 super.close(); 249 if (!await) { 250 latch.countDown(); 251 } 252 } 253 } 254 255 NavigableSet<byte[]> getCols(String... strCols) { 256 NavigableSet<byte[]> cols = new TreeSet<>(Bytes.BYTES_COMPARATOR); 257 for (String col : strCols) { 258 byte[] bytes = Bytes.toBytes(col); 259 cols.add(bytes); 260 } 261 return cols; 262 } 263}