001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import java.io.IOException; 023import java.util.Random; 024import java.util.concurrent.atomic.AtomicLong; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; 032import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 033import org.apache.hadoop.hbase.procedure2.util.StringUtils; 034import org.apache.hadoop.hbase.testclassification.MasterTests; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.junit.After; 037import org.junit.Before; 038import org.junit.ClassRule; 039import org.junit.Ignore; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045@Category({MasterTests.class, MediumTests.class}) 046public class TestStressWALProcedureStore { 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestStressWALProcedureStore.class); 051 052 private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class); 053 054 private static final int PROCEDURE_STORE_SLOTS = 8; 055 056 private WALProcedureStore procStore; 057 058 private HBaseCommonTestingUtility htu; 059 private FileSystem fs; 060 private Path testDir; 061 private Path logDir; 062 063 private void setupConfiguration(Configuration conf) { 064 conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false); 065 conf.setInt(WALProcedureStore.PERIODIC_ROLL_CONF_KEY, 5000); 066 conf.setInt(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, 128 * 1024); 067 } 068 069 @Before 070 public void setUp() throws IOException { 071 htu = new HBaseCommonTestingUtility(); 072 setupConfiguration(htu.getConfiguration()); 073 074 testDir = htu.getDataTestDir(); 075 fs = testDir.getFileSystem(htu.getConfiguration()); 076 assertTrue(testDir.depth() > 1); 077 078 logDir = new Path(testDir, "proc-logs"); 079 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 080 procStore.start(PROCEDURE_STORE_SLOTS); 081 procStore.recoverLease(); 082 083 LoadCounter loader = new LoadCounter(); 084 procStore.load(loader); 085 assertEquals(0, loader.getMaxProcId()); 086 assertEquals(0, loader.getLoadedCount()); 087 assertEquals(0, loader.getCorruptedCount()); 088 } 089 090 @After 091 public void tearDown() throws IOException { 092 procStore.stop(false); 093 fs.delete(logDir, true); 094 } 095 096 @Test 097 public void testInsertUpdateDelete() throws Exception { 098 final long LAST_PROC_ID = 19999; 099 final Thread[] thread = new Thread[PROCEDURE_STORE_SLOTS]; 100 final AtomicLong procCounter = new AtomicLong((long)Math.round(Math.random() * 100)); 101 for (int i = 0; i < thread.length; ++i) { 102 thread[i] = new Thread() { 103 @Override 104 public void run() { 105 Random rand = new Random(); 106 TestProcedure proc; 107 do { 108 // After HBASE-15579 there may be gap in the procId sequence, trying to simulate that. 109 long procId = procCounter.addAndGet(1 + rand.nextInt(3)); 110 proc = new TestProcedure(procId); 111 // Insert 112 procStore.insert(proc, null); 113 // Update 114 for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) { 115 try { 116 Thread.sleep(0, rand.nextInt(15)); 117 } catch (InterruptedException e) {} 118 procStore.update(proc); 119 } 120 // Delete 121 procStore.delete(proc.getProcId()); 122 } while (proc.getProcId() < LAST_PROC_ID); 123 } 124 }; 125 thread[i].start(); 126 } 127 128 for (int i = 0; i < thread.length; ++i) { 129 thread[i].join(); 130 } 131 132 procStore.getStoreTracker().dump(); 133 assertTrue(procCounter.get() >= LAST_PROC_ID); 134 assertTrue(procStore.getStoreTracker().isEmpty()); 135 assertEquals(1, procStore.getActiveLogs().size()); 136 } 137 138 @Ignore @Test // REENABLE after merge of 139 // https://github.com/google/protobuf/issues/2228#issuecomment-252058282 140 public void testEntrySizeLimit() throws Exception { 141 final int NITEMS = 20; 142 for (int i = 1; i <= NITEMS; ++i) { 143 final byte[] data = new byte[256 << i]; 144 LOG.info(String.format("Writing %s", StringUtils.humanSize(data.length))); 145 TestProcedure proc = new TestProcedure(i, 0, data); 146 procStore.insert(proc, null); 147 } 148 149 // check that we are able to read the big proc-blobs 150 ProcedureTestingUtility.storeRestartAndAssert(procStore, NITEMS, NITEMS, 0, 0); 151 } 152}