001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Random;
025import java.util.concurrent.atomic.AtomicLong;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
032import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
033import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
034import org.apache.hadoop.hbase.procedure2.util.StringUtils;
035import org.apache.hadoop.hbase.testclassification.LargeTests;
036import org.apache.hadoop.hbase.testclassification.MasterTests;
037import org.junit.After;
038import org.junit.Before;
039import org.junit.ClassRule;
040import org.junit.Ignore;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@Category({MasterTests.class, LargeTests.class})
047public class TestStressWALProcedureStore {
048
049  @ClassRule
050  public static final HBaseClassTestRule CLASS_RULE =
051      HBaseClassTestRule.forClass(TestStressWALProcedureStore.class);
052
053  private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class);
054
055  private static final int PROCEDURE_STORE_SLOTS = 8;
056
057  private WALProcedureStore procStore;
058
059  private HBaseCommonTestingUtility htu;
060  private FileSystem fs;
061  private Path testDir;
062  private Path logDir;
063
064  private void setupConfiguration(Configuration conf) {
065    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
066    conf.setInt(WALProcedureStore.PERIODIC_ROLL_CONF_KEY, 5000);
067    conf.setInt(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, 128 * 1024);
068  }
069
070  @Before
071  public void setUp() throws IOException {
072    htu = new HBaseCommonTestingUtility();
073    setupConfiguration(htu.getConfiguration());
074
075    testDir = htu.getDataTestDir();
076    fs = testDir.getFileSystem(htu.getConfiguration());
077    assertTrue(testDir.depth() > 1);
078
079    logDir = new Path(testDir, "proc-logs");
080    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
081    procStore.start(PROCEDURE_STORE_SLOTS);
082    procStore.recoverLease();
083
084    LoadCounter loader = new LoadCounter();
085    procStore.load(loader);
086    assertEquals(0, loader.getMaxProcId());
087    assertEquals(0, loader.getLoadedCount());
088    assertEquals(0, loader.getCorruptedCount());
089  }
090
091  @After
092  public void tearDown() throws IOException {
093    procStore.stop(false);
094    fs.delete(logDir, true);
095  }
096
097  @Test
098  public void testInsertUpdateDelete() throws Exception {
099    final long LAST_PROC_ID = 19999;
100    final Thread[] thread = new Thread[PROCEDURE_STORE_SLOTS];
101    final AtomicLong procCounter = new AtomicLong((long)Math.round(Math.random() * 100));
102    for (int i = 0; i < thread.length; ++i) {
103      thread[i] = new Thread() {
104        @Override
105        public void run() {
106          Random rand = new Random();
107          TestProcedure proc;
108          do {
109            // After HBASE-15579 there may be gap in the procId sequence, trying to simulate that.
110            long procId = procCounter.addAndGet(1 + rand.nextInt(3));
111            proc = new TestProcedure(procId);
112            // Insert
113            procStore.insert(proc, null);
114            // Update
115            for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) {
116              try {
117                Thread.sleep(0, rand.nextInt(15));
118              } catch (InterruptedException e) {}
119              procStore.update(proc);
120            }
121            // Delete
122            procStore.delete(proc.getProcId());
123          } while (proc.getProcId() < LAST_PROC_ID);
124        }
125      };
126      thread[i].start();
127    }
128
129    for (int i = 0; i < thread.length; ++i) {
130      thread[i].join();
131    }
132
133    procStore.getStoreTracker().dump();
134    assertTrue(procCounter.get() >= LAST_PROC_ID);
135    assertTrue(procStore.getStoreTracker().isEmpty());
136    assertEquals(1, procStore.getActiveLogs().size());
137  }
138
139  @Ignore @Test // REENABLE after merge of
140  // https://github.com/google/protobuf/issues/2228#issuecomment-252058282
141  public void testEntrySizeLimit() throws Exception {
142    final int NITEMS = 20;
143    for (int i = 1; i <= NITEMS; ++i) {
144      final byte[] data = new byte[256 << i];
145      LOG.info(String.format("Writing %s", StringUtils.humanSize(data.length)));
146      TestProcedure proc = new TestProcedure(i, 0, data);
147      procStore.insert(proc, null);
148    }
149
150    // check that we are able to read the big proc-blobs
151    ProcedureTestingUtility.storeRestartAndAssert(procStore, NITEMS, NITEMS, 0, 0);
152  }
153}