001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.Random; 025import java.util.concurrent.ThreadLocalRandom; 026import java.util.concurrent.atomic.AtomicLong; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 032import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; 033import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 034import org.apache.hadoop.hbase.procedure2.util.StringUtils; 035import org.apache.hadoop.hbase.testclassification.MasterTests; 036import org.apache.hadoop.hbase.testclassification.MediumTests; 037import org.junit.jupiter.api.AfterEach; 038import org.junit.jupiter.api.BeforeEach; 039import org.junit.jupiter.api.Disabled; 040import org.junit.jupiter.api.Tag; 041import org.junit.jupiter.api.Test; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045@Tag(MasterTests.TAG) 046@Tag(MediumTests.TAG) 047public class TestStressWALProcedureStore { 048 049 private static final Logger LOG = LoggerFactory.getLogger(TestStressWALProcedureStore.class); 050 051 private static final int PROCEDURE_STORE_SLOTS = 8; 052 053 private WALProcedureStore procStore; 054 055 private HBaseCommonTestingUtil htu; 056 private FileSystem fs; 057 private Path testDir; 058 private Path logDir; 059 060 private void setupConfiguration(Configuration conf) { 061 conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false); 062 conf.setInt(WALProcedureStore.PERIODIC_ROLL_CONF_KEY, 5000); 063 conf.setInt(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, 128 * 1024); 064 } 065 066 @BeforeEach 067 public void setUp() throws IOException { 068 htu = new HBaseCommonTestingUtil(); 069 setupConfiguration(htu.getConfiguration()); 070 071 testDir = htu.getDataTestDir(); 072 fs = testDir.getFileSystem(htu.getConfiguration()); 073 assertTrue(testDir.depth() > 1); 074 075 logDir = new Path(testDir, "proc-logs"); 076 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 077 procStore.start(PROCEDURE_STORE_SLOTS); 078 procStore.recoverLease(); 079 080 LoadCounter loader = new LoadCounter(); 081 procStore.load(loader); 082 assertEquals(0, loader.getMaxProcId()); 083 assertEquals(0, loader.getLoadedCount()); 084 assertEquals(0, loader.getCorruptedCount()); 085 } 086 087 @AfterEach 088 public void tearDown() throws IOException { 089 procStore.stop(false); 090 fs.delete(logDir, true); 091 } 092 093 @Test 094 public void testInsertUpdateDelete() throws Exception { 095 final long LAST_PROC_ID = 19999; 096 final Thread[] thread = new Thread[PROCEDURE_STORE_SLOTS]; 097 final Random rand = ThreadLocalRandom.current(); 098 final AtomicLong procCounter = new AtomicLong(rand.nextInt(100)); 099 for (int i = 0; i < thread.length; ++i) { 100 thread[i] = new Thread() { 101 @Override 102 public void run() { 103 TestProcedure proc; 104 do { 105 // After HBASE-15579 there may be gap in the procId sequence, trying to simulate that. 106 long procId = procCounter.addAndGet(1 + rand.nextInt(3)); 107 proc = new TestProcedure(procId); 108 // Insert 109 procStore.insert(proc, null); 110 // Update 111 for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) { 112 try { 113 Thread.sleep(0, rand.nextInt(15)); 114 } catch (InterruptedException e) { 115 } 116 procStore.update(proc); 117 } 118 // Delete 119 procStore.delete(proc.getProcId()); 120 } while (proc.getProcId() < LAST_PROC_ID); 121 } 122 }; 123 thread[i].start(); 124 } 125 126 for (int i = 0; i < thread.length; ++i) { 127 thread[i].join(); 128 } 129 130 procStore.getStoreTracker().dump(); 131 assertTrue(procCounter.get() >= LAST_PROC_ID); 132 assertTrue(procStore.getStoreTracker().isEmpty()); 133 assertEquals(1, procStore.getActiveLogs().size()); 134 } 135 136 @Disabled 137 @Test // REENABLE after merge of 138 // https://github.com/google/protobuf/issues/2228#issuecomment-252058282 139 public void testEntrySizeLimit() throws Exception { 140 final int NITEMS = 20; 141 for (int i = 1; i <= NITEMS; ++i) { 142 final byte[] data = new byte[256 << i]; 143 LOG.info("Writing {}", StringUtils.humanSize(data.length)); 144 TestProcedure proc = new TestProcedure(i, 0, data); 145 procStore.insert(proc, null); 146 } 147 148 // check that we are able to read the big proc-blobs 149 ProcedureTestingUtility.storeRestartAndAssert(procStore, NITEMS, NITEMS, 0, 0); 150 } 151}