001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.DEFAULT_NUM_REGION_GROUPS;
021import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.NUM_REGION_GROUPS;
022import static org.apache.hadoop.hbase.wal.RegionGroupingProvider.DELEGATE_PROVIDER;
023import static org.apache.hadoop.hbase.wal.RegionGroupingProvider.REGION_GROUPING_STRATEGY;
024import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
025import static org.junit.Assert.assertEquals;
026
027import java.io.IOException;
028import java.util.Arrays;
029import java.util.HashSet;
030import java.util.Set;
031import java.util.concurrent.ThreadLocalRandom;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfoBuilder;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.testclassification.RegionServerTests;
041import org.apache.hadoop.hbase.util.CommonFSUtils;
042import org.apache.hadoop.hdfs.DistributedFileSystem;
043import org.junit.After;
044import org.junit.AfterClass;
045import org.junit.Before;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.junit.runner.RunWith;
051import org.junit.runners.Parameterized;
052import org.junit.runners.Parameterized.Parameter;
053import org.junit.runners.Parameterized.Parameters;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057@RunWith(Parameterized.class)
058@Category({ RegionServerTests.class, MediumTests.class })
059public class TestBoundedRegionGroupingStrategy {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestBoundedRegionGroupingStrategy.class);
064
065  private static final Logger LOG =
066    LoggerFactory.getLogger(TestBoundedRegionGroupingStrategy.class);
067
068  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
069
070  private static Configuration CONF;
071  private static DistributedFileSystem FS;
072
073  @Parameter
074  public String walProvider;
075
076  @Parameters(name = "{index}: delegate-provider={0}")
077  public static Iterable<Object[]> data() {
078    return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" });
079  }
080
081  @Before
082  public void setUp() throws Exception {
083    CONF.set(DELEGATE_PROVIDER, walProvider);
084  }
085
086  @After
087  public void tearDown() throws Exception {
088    FileStatus[] entries = FS.listStatus(new Path("/"));
089    for (FileStatus dir : entries) {
090      FS.delete(dir.getPath(), true);
091    }
092  }
093
094  @BeforeClass
095  public static void setUpBeforeClass() throws Exception {
096    CONF = TEST_UTIL.getConfiguration();
097    // Make block sizes small.
098    CONF.setInt("dfs.blocksize", 1024 * 1024);
099    // quicker heartbeat interval for faster DN death notification
100    CONF.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
101    CONF.setInt("dfs.heartbeat.interval", 1);
102    CONF.setInt("dfs.client.socket-timeout", 5000);
103
104    // faster failover with cluster.shutdown();fs.close() idiom
105    CONF.setInt("hbase.ipc.client.connect.max.retries", 1);
106    CONF.setInt("dfs.client.block.recovery.retries", 1);
107    CONF.setInt("hbase.ipc.client.connection.maxidletime", 500);
108
109    CONF.setClass(WAL_PROVIDER, RegionGroupingProvider.class, WALProvider.class);
110    CONF.set(REGION_GROUPING_STRATEGY, RegionGroupingProvider.Strategies.bounded.name());
111
112    TEST_UTIL.startMiniDFSCluster(3);
113
114    FS = TEST_UTIL.getDFSCluster().getFileSystem();
115  }
116
117  @AfterClass
118  public static void tearDownAfterClass() throws Exception {
119    TEST_UTIL.shutdownMiniCluster();
120  }
121
122  /**
123   * Write to a log file with three concurrent threads and verifying all data is written.
124   */
125  @Test
126  public void testConcurrentWrites() throws Exception {
127    // Run the WPE tool with three threads writing 3000 edits each concurrently.
128    // When done, verify that all edits were written.
129    int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF),
130      new String[] { "-threads", "3", "-verify", "-noclosefs", "-iterations", "3000" });
131    assertEquals(0, errCode);
132  }
133
134  /**
135   * Make sure we can successfully run with more regions then our bound.
136   */
137  @Test
138  public void testMoreRegionsThanBound() throws Exception {
139    final String parallelism = Integer.toString(DEFAULT_NUM_REGION_GROUPS * 2);
140    int errCode =
141      WALPerformanceEvaluation.innerMain(new Configuration(CONF), new String[] { "-threads",
142        parallelism, "-verify", "-noclosefs", "-iterations", "3000", "-regions", parallelism });
143    assertEquals(0, errCode);
144  }
145
146  @Test
147  public void testBoundsGreaterThanDefault() throws Exception {
148    final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
149    try {
150      CONF.setInt(NUM_REGION_GROUPS, temp * 4);
151      final String parallelism = Integer.toString(temp * 4);
152      int errCode =
153        WALPerformanceEvaluation.innerMain(new Configuration(CONF), new String[] { "-threads",
154          parallelism, "-verify", "-noclosefs", "-iterations", "3000", "-regions", parallelism });
155      assertEquals(0, errCode);
156    } finally {
157      CONF.setInt(NUM_REGION_GROUPS, temp);
158    }
159  }
160
161  @Test
162  public void testMoreRegionsThanBoundWithBoundsGreaterThanDefault() throws Exception {
163    final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
164    try {
165      CONF.setInt(NUM_REGION_GROUPS, temp * 4);
166      final String parallelism = Integer.toString(temp * 4 * 2);
167      int errCode =
168        WALPerformanceEvaluation.innerMain(new Configuration(CONF), new String[] { "-threads",
169          parallelism, "-verify", "-noclosefs", "-iterations", "3000", "-regions", parallelism });
170      assertEquals(0, errCode);
171    } finally {
172      CONF.setInt(NUM_REGION_GROUPS, temp);
173    }
174  }
175
176  /**
177   * Ensure that we can use Set.add to deduplicate WALs
178   */
179  @Test
180  public void setMembershipDedups() throws IOException {
181    final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
182    WALFactory wals = null;
183    try {
184      CONF.setInt(NUM_REGION_GROUPS, temp * 4);
185      // Set HDFS root directory for storing WAL
186      CommonFSUtils.setRootDir(CONF, TEST_UTIL.getDataTestDirOnTestFS());
187
188      wals = new WALFactory(CONF, "setMembershipDedups");
189      Set<WAL> seen = new HashSet<>(temp * 4);
190      int count = 0;
191      // we know that this should see one of the wals more than once
192      for (int i = 0; i < temp * 8; i++) {
193        WAL maybeNewWAL = wals.getWAL(RegionInfoBuilder
194          .newBuilder(TableName.valueOf("Table-" + ThreadLocalRandom.current().nextInt())).build());
195        LOG.info("Iteration " + i + ", checking wal " + maybeNewWAL);
196        if (seen.add(maybeNewWAL)) {
197          count++;
198        }
199      }
200      assertEquals("received back a different number of WALs that are not equal() to each other "
201        + "than the bound we placed.", temp * 4, count);
202    } finally {
203      if (wals != null) {
204        wals.close();
205      }
206      CONF.setInt(NUM_REGION_GROUPS, temp);
207    }
208  }
209}