001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.KeyValueTestUtil.create; 021import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import java.util.NavigableSet; 030import java.util.Random; 031import java.util.TreeSet; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.ThreadLocalRandom; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.CellComparator; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.KeepDeletedCells; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.RegionInfoBuilder; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.io.hfile.CacheConfig; 052import org.apache.hadoop.hbase.io.hfile.HFileContext; 053import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 054import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; 055import org.apache.hadoop.hbase.testclassification.RegionServerTests; 056import org.apache.hadoop.hbase.testclassification.SmallTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.junit.jupiter.api.BeforeAll; 059import org.junit.jupiter.api.Tag; 060import org.junit.jupiter.api.Test; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064/** 065 * This test tests whether parallel {@link StoreScanner#close()} and 066 * {@link StoreScanner#updateReaders(List, List)} works perfectly ensuring that there are no 067 * references on the existing Storescanner readers. 068 */ 069@Tag(RegionServerTests.TAG) 070@Tag(SmallTests.TAG) 071public class TestStoreScannerClosure { 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestStoreScannerClosure.class); 074 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 075 076 private static final String CF_STR = "cf"; 077 private static HRegion region; 078 private static final byte[] CF = Bytes.toBytes(CF_STR); 079 static Configuration CONF = HBaseConfiguration.create(); 080 private static CacheConfig cacheConf; 081 private static FileSystem fs; 082 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 083 private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, 084 KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false); 085 private final static byte[] fam = Bytes.toBytes("cf_1"); 086 private static final KeyValue[] kvs = 087 new KeyValue[] { create("R1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), 088 create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"), 089 create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"), 090 create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"), 091 create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"), 092 create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"), 093 create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"), 094 create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"), 095 create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"), 096 create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), }; 097 098 @BeforeAll 099 public static void setUp() throws Exception { 100 CONF = TEST_UTIL.getConfiguration(); 101 cacheConf = new CacheConfig(CONF); 102 fs = TEST_UTIL.getTestFileSystem(); 103 TableName tableName = TableName.valueOf("test"); 104 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 105 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build(); 106 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 107 Path path = TEST_UTIL.getDataTestDir("test"); 108 region = HBaseTestingUtil.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), 109 tableDescriptor); 110 } 111 112 @Test 113 public void testScannerCloseAndUpdateReadersWithMemstoreScanner() throws Exception { 114 Put p = new Put(Bytes.toBytes("row")); 115 p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val")); 116 region.put(p); 117 // create the store scanner here. 118 // for easiness, use Long.MAX_VALUE as read pt 119 try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, 120 new Scan(), getCols("q1"), Long.MAX_VALUE)) { 121 p = new Put(Bytes.toBytes("row1")); 122 p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val")); 123 region.put(p); 124 HStore store = region.getStore(fam); 125 // use the lock to manually get a new memstore scanner. this is what 126 // HStore#notifyChangedReadersObservers does under the lock.(lock is not needed 127 // here 128 // since it is just a testcase). 129 store.getStoreEngine().readLock(); 130 final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE); 131 store.getStoreEngine().readUnlock(); 132 Thread closeThread = new Thread() { 133 public void run() { 134 // close should be completed 135 scan.close(false, true); 136 } 137 }; 138 closeThread.start(); 139 Thread updateThread = new Thread() { 140 public void run() { 141 try { 142 // use the updated memstoreScanners and pass it to updateReaders 143 scan.updateReaders(true, Collections.emptyList(), memScanners); 144 } catch (IOException e) { 145 e.printStackTrace(); 146 } 147 } 148 }; 149 updateThread.start(); 150 // wait for close and updateThread to complete 151 closeThread.join(); 152 updateThread.join(); 153 MemStoreLAB memStoreLAB; 154 for (KeyValueScanner scanner : memScanners) { 155 if (scanner instanceof SegmentScanner) { 156 memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB(); 157 if (memStoreLAB != null) { 158 // There should be no unpooled chunks 159 int refCount = ((MemStoreLABImpl) memStoreLAB).getRefCntValue(); 160 assertEquals(0, refCount, "The memstore should not have unpooled chunks"); 161 } 162 } 163 } 164 } 165 } 166 167 @Test 168 public void testScannerCloseAndUpdateReaders1() throws Exception { 169 testScannerCloseAndUpdateReaderInternal(true, false); 170 } 171 172 @Test 173 public void testScannerCloseAndUpdateReaders2() throws Exception { 174 testScannerCloseAndUpdateReaderInternal(false, true); 175 } 176 177 private Path writeStoreFile() throws IOException { 178 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile"); 179 HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build(); 180 StoreFileWriter sfw = new StoreFileWriter.Builder(CONF, fs).withOutputDir(storeFileParentDir) 181 .withFileContext(meta).build(); 182 183 final int rowLen = 32; 184 Random rand = ThreadLocalRandom.current(); 185 for (int i = 0; i < 1000; ++i) { 186 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 187 byte[] v = RandomKeyValueUtil.randomValue(rand); 188 int cfLen = rand.nextInt(k.length - rowLen + 1); 189 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 190 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 191 sfw.append(kv); 192 } 193 194 sfw.close(); 195 return sfw.getPath(); 196 } 197 198 private static KeyValue.Type generateKeyType(Random rand) { 199 if (rand.nextBoolean()) { 200 // Let's make half of KVs puts. 201 return KeyValue.Type.Put; 202 } else { 203 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 204 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 205 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 206 + "Probably the layout of KeyValue.Type has changed."); 207 } 208 return keyType; 209 } 210 } 211 212 private HStoreFile readStoreFile(StoreFileInfo fileinfo) throws Exception { 213 // Open the file reader with block cache disabled. 214 HStoreFile file = new HStoreFile(fileinfo, BloomType.NONE, cacheConf); 215 return file; 216 } 217 218 private void testScannerCloseAndUpdateReaderInternal(boolean awaitUpdate, boolean awaitClose) 219 throws IOException, InterruptedException { 220 // start write to store file. 221 Path path = writeStoreFile(); 222 HStoreFile file = null; 223 List<HStoreFile> files = new ArrayList<HStoreFile>(); 224 try { 225 StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(CONF, fs, path, true); 226 file = readStoreFile(storeFileInfo); 227 files.add(file); 228 } catch (Exception e) { 229 // fail test 230 assertTrue(false); 231 } 232 scanFixture(kvs); 233 // scanners.add(storeFileScanner); 234 try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, 235 new Scan(), getCols("a", "d"), 100L)) { 236 Thread closeThread = new Thread() { 237 public void run() { 238 scan.close(awaitClose, true); 239 } 240 }; 241 closeThread.start(); 242 Thread updateThread = new Thread() { 243 public void run() { 244 try { 245 scan.updateReaders(awaitUpdate, files, Collections.emptyList()); 246 } catch (IOException e) { 247 e.printStackTrace(); 248 } 249 } 250 }; 251 updateThread.start(); 252 // complete both the threads 253 closeThread.join(); 254 // complete both the threads 255 updateThread.join(); 256 if (file.getReader() != null) { 257 // the fileReader is not null when the updateReaders has completed first. 258 // in the other case the fileReader will be null. 259 int refCount = file.getReader().getRefCount(); 260 LOG.info("the store scanner count is " + refCount); 261 assertEquals(0, refCount, "The store scanner count should be 0"); 262 } 263 } 264 } 265 266 private static class ExtendedStoreScanner extends StoreScanner { 267 private CountDownLatch latch = new CountDownLatch(1); 268 269 public ExtendedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, 270 NavigableSet<byte[]> columns, long readPt) throws IOException { 271 super(store, scanInfo, scan, columns, readPt); 272 } 273 274 public void updateReaders(boolean await, List<HStoreFile> sfs, 275 List<KeyValueScanner> memStoreScanners) throws IOException { 276 if (await) { 277 try { 278 latch.await(); 279 } catch (InterruptedException e) { 280 // TODO Auto-generated catch block 281 e.printStackTrace(); 282 } 283 } 284 super.updateReaders(sfs, memStoreScanners); 285 if (!await) { 286 latch.countDown(); 287 } 288 } 289 290 // creating a dummy close 291 public void close(boolean await, boolean dummy) { 292 if (await) { 293 try { 294 latch.await(); 295 } catch (InterruptedException e) { 296 // TODO Auto-generated catch block 297 e.printStackTrace(); 298 } 299 } 300 super.close(); 301 if (!await) { 302 latch.countDown(); 303 } 304 } 305 } 306 307 NavigableSet<byte[]> getCols(String... strCols) { 308 NavigableSet<byte[]> cols = new TreeSet<>(Bytes.BYTES_COMPARATOR); 309 for (String col : strCols) { 310 byte[] bytes = Bytes.toBytes(col); 311 cols.add(bytes); 312 } 313 return cols; 314 } 315}