001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Random;
025import java.util.concurrent.ThreadLocalRandom;
026import java.util.concurrent.atomic.AtomicLong;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
032import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
033import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
034import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
035import org.apache.hadoop.hbase.procedure2.util.StringUtils;
036import org.apache.hadoop.hbase.testclassification.MasterTests;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.junit.After;
039import org.junit.Before;
040import org.junit.ClassRule;
041import org.junit.Ignore;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047@Category({ MasterTests.class, MediumTests.class })
048public class TestStressWALProcedureStore {
049
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052    HBaseClassTestRule.forClass(TestStressWALProcedureStore.class);
053
054  private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class);
055
056  private static final int PROCEDURE_STORE_SLOTS = 8;
057
058  private WALProcedureStore procStore;
059
060  private HBaseCommonTestingUtil htu;
061  private FileSystem fs;
062  private Path testDir;
063  private Path logDir;
064
065  private void setupConfiguration(Configuration conf) {
066    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
067    conf.setInt(WALProcedureStore.PERIODIC_ROLL_CONF_KEY, 5000);
068    conf.setInt(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, 128 * 1024);
069  }
070
071  @Before
072  public void setUp() throws IOException {
073    htu = new HBaseCommonTestingUtil();
074    setupConfiguration(htu.getConfiguration());
075
076    testDir = htu.getDataTestDir();
077    fs = testDir.getFileSystem(htu.getConfiguration());
078    assertTrue(testDir.depth() > 1);
079
080    logDir = new Path(testDir, "proc-logs");
081    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
082    procStore.start(PROCEDURE_STORE_SLOTS);
083    procStore.recoverLease();
084
085    LoadCounter loader = new LoadCounter();
086    procStore.load(loader);
087    assertEquals(0, loader.getMaxProcId());
088    assertEquals(0, loader.getLoadedCount());
089    assertEquals(0, loader.getCorruptedCount());
090  }
091
092  @After
093  public void tearDown() throws IOException {
094    procStore.stop(false);
095    fs.delete(logDir, true);
096  }
097
098  @Test
099  public void testInsertUpdateDelete() throws Exception {
100    final long LAST_PROC_ID = 19999;
101    final Thread[] thread = new Thread[PROCEDURE_STORE_SLOTS];
102    final Random rand = ThreadLocalRandom.current();
103    final AtomicLong procCounter = new AtomicLong(rand.nextInt(100));
104    for (int i = 0; i < thread.length; ++i) {
105      thread[i] = new Thread() {
106        @Override
107        public void run() {
108          TestProcedure proc;
109          do {
110            // After HBASE-15579 there may be gap in the procId sequence, trying to simulate that.
111            long procId = procCounter.addAndGet(1 + rand.nextInt(3));
112            proc = new TestProcedure(procId);
113            // Insert
114            procStore.insert(proc, null);
115            // Update
116            for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) {
117              try {
118                Thread.sleep(0, rand.nextInt(15));
119              } catch (InterruptedException e) {
120              }
121              procStore.update(proc);
122            }
123            // Delete
124            procStore.delete(proc.getProcId());
125          } while (proc.getProcId() < LAST_PROC_ID);
126        }
127      };
128      thread[i].start();
129    }
130
131    for (int i = 0; i < thread.length; ++i) {
132      thread[i].join();
133    }
134
135    procStore.getStoreTracker().dump();
136    assertTrue(procCounter.get() >= LAST_PROC_ID);
137    assertTrue(procStore.getStoreTracker().isEmpty());
138    assertEquals(1, procStore.getActiveLogs().size());
139  }
140
141  @Ignore
142  @Test // REENABLE after merge of
143  // https://github.com/google/protobuf/issues/2228#issuecomment-252058282
144  public void testEntrySizeLimit() throws Exception {
145    final int NITEMS = 20;
146    for (int i = 1; i <= NITEMS; ++i) {
147      final byte[] data = new byte[256 << i];
148      LOG.info(String.format("Writing %s", StringUtils.humanSize(data.length)));
149      TestProcedure proc = new TestProcedure(i, 0, data);
150      procStore.insert(proc, null);
151    }
152
153    // check that we are able to read the big proc-blobs
154    ProcedureTestingUtility.storeRestartAndAssert(procStore, NITEMS, NITEMS, 0, 0);
155  }
156}