001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import java.io.IOException;
023import java.util.Random;
024import java.util.concurrent.atomic.AtomicLong;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
032import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
033import org.apache.hadoop.hbase.procedure2.util.StringUtils;
034import org.apache.hadoop.hbase.testclassification.MasterTests;
035import org.apache.hadoop.hbase.testclassification.MediumTests;
036import org.junit.After;
037import org.junit.Before;
038import org.junit.ClassRule;
039import org.junit.Ignore;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045@Category({MasterTests.class, MediumTests.class})
046public class TestStressWALProcedureStore {
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050      HBaseClassTestRule.forClass(TestStressWALProcedureStore.class);
051
052  private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class);
053
054  private static final int PROCEDURE_STORE_SLOTS = 8;
055
056  private WALProcedureStore procStore;
057
058  private HBaseCommonTestingUtility htu;
059  private FileSystem fs;
060  private Path testDir;
061  private Path logDir;
062
063  private void setupConfiguration(Configuration conf) {
064    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
065    conf.setInt(WALProcedureStore.PERIODIC_ROLL_CONF_KEY, 5000);
066    conf.setInt(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, 128 * 1024);
067  }
068
069  @Before
070  public void setUp() throws IOException {
071    htu = new HBaseCommonTestingUtility();
072    setupConfiguration(htu.getConfiguration());
073
074    testDir = htu.getDataTestDir();
075    fs = testDir.getFileSystem(htu.getConfiguration());
076    assertTrue(testDir.depth() > 1);
077
078    logDir = new Path(testDir, "proc-logs");
079    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
080    procStore.start(PROCEDURE_STORE_SLOTS);
081    procStore.recoverLease();
082
083    LoadCounter loader = new LoadCounter();
084    procStore.load(loader);
085    assertEquals(0, loader.getMaxProcId());
086    assertEquals(0, loader.getLoadedCount());
087    assertEquals(0, loader.getCorruptedCount());
088  }
089
090  @After
091  public void tearDown() throws IOException {
092    procStore.stop(false);
093    fs.delete(logDir, true);
094  }
095
096  @Test
097  public void testInsertUpdateDelete() throws Exception {
098    final long LAST_PROC_ID = 19999;
099    final Thread[] thread = new Thread[PROCEDURE_STORE_SLOTS];
100    final AtomicLong procCounter = new AtomicLong((long)Math.round(Math.random() * 100));
101    for (int i = 0; i < thread.length; ++i) {
102      thread[i] = new Thread() {
103        @Override
104        public void run() {
105          Random rand = new Random();
106          TestProcedure proc;
107          do {
108            // After HBASE-15579 there may be gap in the procId sequence, trying to simulate that.
109            long procId = procCounter.addAndGet(1 + rand.nextInt(3));
110            proc = new TestProcedure(procId);
111            // Insert
112            procStore.insert(proc, null);
113            // Update
114            for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) {
115              try {
116                Thread.sleep(0, rand.nextInt(15));
117              } catch (InterruptedException e) {}
118              procStore.update(proc);
119            }
120            // Delete
121            procStore.delete(proc.getProcId());
122          } while (proc.getProcId() < LAST_PROC_ID);
123        }
124      };
125      thread[i].start();
126    }
127
128    for (int i = 0; i < thread.length; ++i) {
129      thread[i].join();
130    }
131
132    procStore.getStoreTracker().dump();
133    assertTrue(procCounter.get() >= LAST_PROC_ID);
134    assertTrue(procStore.getStoreTracker().isEmpty());
135    assertEquals(1, procStore.getActiveLogs().size());
136  }
137
138  @Ignore @Test // REENABLE after merge of
139  // https://github.com/google/protobuf/issues/2228#issuecomment-252058282
140  public void testEntrySizeLimit() throws Exception {
141    final int NITEMS = 20;
142    for (int i = 1; i <= NITEMS; ++i) {
143      final byte[] data = new byte[256 << i];
144      LOG.info(String.format("Writing %s", StringUtils.humanSize(data.length)));
145      TestProcedure proc = new TestProcedure(i, 0, data);
146      procStore.insert(proc, null);
147    }
148
149    // check that we are able to read the big proc-blobs
150    ProcedureTestingUtility.storeRestartAndAssert(procStore, NITEMS, NITEMS, 0, 0);
151  }
152}