001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.KeyValueTestUtil.create;
021import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import java.util.NavigableSet;
030import java.util.Random;
031import java.util.TreeSet;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.ThreadLocalRandom;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.CellComparator;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.KeepDeletedCells;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.client.RegionInfoBuilder;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.io.hfile.CacheConfig;
052import org.apache.hadoop.hbase.io.hfile.HFileContext;
053import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
054import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
055import org.apache.hadoop.hbase.testclassification.RegionServerTests;
056import org.apache.hadoop.hbase.testclassification.SmallTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.junit.jupiter.api.BeforeAll;
059import org.junit.jupiter.api.Tag;
060import org.junit.jupiter.api.Test;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * This test tests whether parallel {@link StoreScanner#close()} and
066 * {@link StoreScanner#updateReaders(List, List)} works perfectly ensuring that there are no
067 * references on the existing Storescanner readers.
068 */
069@Tag(RegionServerTests.TAG)
070@Tag(SmallTests.TAG)
071public class TestStoreScannerClosure {
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestStoreScannerClosure.class);
074  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
075
076  private static final String CF_STR = "cf";
077  private static HRegion region;
078  private static final byte[] CF = Bytes.toBytes(CF_STR);
079  static Configuration CONF = HBaseConfiguration.create();
080  private static CacheConfig cacheConf;
081  private static FileSystem fs;
082  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
083  private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
084    KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false);
085  private final static byte[] fam = Bytes.toBytes("cf_1");
086  private static final KeyValue[] kvs =
087    new KeyValue[] { create("R1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
088      create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"),
089      create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
090      create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"),
091      create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"),
092      create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"),
093      create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"),
094      create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"),
095      create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"),
096      create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), };
097
098  @BeforeAll
099  public static void setUp() throws Exception {
100    CONF = TEST_UTIL.getConfiguration();
101    cacheConf = new CacheConfig(CONF);
102    fs = TEST_UTIL.getTestFileSystem();
103    TableName tableName = TableName.valueOf("test");
104    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
105      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
106    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
107    Path path = TEST_UTIL.getDataTestDir("test");
108    region = HBaseTestingUtil.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(),
109      tableDescriptor);
110  }
111
112  @Test
113  public void testScannerCloseAndUpdateReadersWithMemstoreScanner() throws Exception {
114    Put p = new Put(Bytes.toBytes("row"));
115    p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
116    region.put(p);
117    // create the store scanner here.
118    // for easiness, use Long.MAX_VALUE as read pt
119    try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo,
120      new Scan(), getCols("q1"), Long.MAX_VALUE)) {
121      p = new Put(Bytes.toBytes("row1"));
122      p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
123      region.put(p);
124      HStore store = region.getStore(fam);
125      // use the lock to manually get a new memstore scanner. this is what
126      // HStore#notifyChangedReadersObservers does under the lock.(lock is not needed
127      // here
128      // since it is just a testcase).
129      store.getStoreEngine().readLock();
130      final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE);
131      store.getStoreEngine().readUnlock();
132      Thread closeThread = new Thread() {
133        public void run() {
134          // close should be completed
135          scan.close(false, true);
136        }
137      };
138      closeThread.start();
139      Thread updateThread = new Thread() {
140        public void run() {
141          try {
142            // use the updated memstoreScanners and pass it to updateReaders
143            scan.updateReaders(true, Collections.emptyList(), memScanners);
144          } catch (IOException e) {
145            e.printStackTrace();
146          }
147        }
148      };
149      updateThread.start();
150      // wait for close and updateThread to complete
151      closeThread.join();
152      updateThread.join();
153      MemStoreLAB memStoreLAB;
154      for (KeyValueScanner scanner : memScanners) {
155        if (scanner instanceof SegmentScanner) {
156          memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB();
157          if (memStoreLAB != null) {
158            // There should be no unpooled chunks
159            int refCount = ((MemStoreLABImpl) memStoreLAB).getRefCntValue();
160            assertEquals(0, refCount, "The memstore should not have unpooled chunks");
161          }
162        }
163      }
164    }
165  }
166
167  @Test
168  public void testScannerCloseAndUpdateReaders1() throws Exception {
169    testScannerCloseAndUpdateReaderInternal(true, false);
170  }
171
172  @Test
173  public void testScannerCloseAndUpdateReaders2() throws Exception {
174    testScannerCloseAndUpdateReaderInternal(false, true);
175  }
176
177  private Path writeStoreFile() throws IOException {
178    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile");
179    HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build();
180    StoreFileWriter sfw = new StoreFileWriter.Builder(CONF, fs).withOutputDir(storeFileParentDir)
181      .withFileContext(meta).build();
182
183    final int rowLen = 32;
184    Random rand = ThreadLocalRandom.current();
185    for (int i = 0; i < 1000; ++i) {
186      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
187      byte[] v = RandomKeyValueUtil.randomValue(rand);
188      int cfLen = rand.nextInt(k.length - rowLen + 1);
189      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
190        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
191      sfw.append(kv);
192    }
193
194    sfw.close();
195    return sfw.getPath();
196  }
197
198  private static KeyValue.Type generateKeyType(Random rand) {
199    if (rand.nextBoolean()) {
200      // Let's make half of KVs puts.
201      return KeyValue.Type.Put;
202    } else {
203      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
204      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
205        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
206          + "Probably the layout of KeyValue.Type has changed.");
207      }
208      return keyType;
209    }
210  }
211
212  private HStoreFile readStoreFile(StoreFileInfo fileinfo) throws Exception {
213    // Open the file reader with block cache disabled.
214    HStoreFile file = new HStoreFile(fileinfo, BloomType.NONE, cacheConf);
215    return file;
216  }
217
218  private void testScannerCloseAndUpdateReaderInternal(boolean awaitUpdate, boolean awaitClose)
219    throws IOException, InterruptedException {
220    // start write to store file.
221    Path path = writeStoreFile();
222    HStoreFile file = null;
223    List<HStoreFile> files = new ArrayList<HStoreFile>();
224    try {
225      StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(CONF, fs, path, true);
226      file = readStoreFile(storeFileInfo);
227      files.add(file);
228    } catch (Exception e) {
229      // fail test
230      assertTrue(false);
231    }
232    scanFixture(kvs);
233    // scanners.add(storeFileScanner);
234    try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo,
235      new Scan(), getCols("a", "d"), 100L)) {
236      Thread closeThread = new Thread() {
237        public void run() {
238          scan.close(awaitClose, true);
239        }
240      };
241      closeThread.start();
242      Thread updateThread = new Thread() {
243        public void run() {
244          try {
245            scan.updateReaders(awaitUpdate, files, Collections.emptyList());
246          } catch (IOException e) {
247            e.printStackTrace();
248          }
249        }
250      };
251      updateThread.start();
252      // complete both the threads
253      closeThread.join();
254      // complete both the threads
255      updateThread.join();
256      if (file.getReader() != null) {
257        // the fileReader is not null when the updateReaders has completed first.
258        // in the other case the fileReader will be null.
259        int refCount = file.getReader().getRefCount();
260        LOG.info("the store scanner count is " + refCount);
261        assertEquals(0, refCount, "The store scanner count should be 0");
262      }
263    }
264  }
265
266  private static class ExtendedStoreScanner extends StoreScanner {
267    private CountDownLatch latch = new CountDownLatch(1);
268
269    public ExtendedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
270      NavigableSet<byte[]> columns, long readPt) throws IOException {
271      super(store, scanInfo, scan, columns, readPt);
272    }
273
274    public void updateReaders(boolean await, List<HStoreFile> sfs,
275      List<KeyValueScanner> memStoreScanners) throws IOException {
276      if (await) {
277        try {
278          latch.await();
279        } catch (InterruptedException e) {
280          // TODO Auto-generated catch block
281          e.printStackTrace();
282        }
283      }
284      super.updateReaders(sfs, memStoreScanners);
285      if (!await) {
286        latch.countDown();
287      }
288    }
289
290    // creating a dummy close
291    public void close(boolean await, boolean dummy) {
292      if (await) {
293        try {
294          latch.await();
295        } catch (InterruptedException e) {
296          // TODO Auto-generated catch block
297          e.printStackTrace();
298        }
299      }
300      super.close();
301      if (!await) {
302        latch.countDown();
303      }
304    }
305  }
306
307  NavigableSet<byte[]> getCols(String... strCols) {
308    NavigableSet<byte[]> cols = new TreeSet<>(Bytes.BYTES_COMPARATOR);
309    for (String col : strCols) {
310      byte[] bytes = Bytes.toBytes(col);
311      cols.add(bytes);
312    }
313    return cols;
314  }
315}