001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.DEFAULT_NUM_REGION_GROUPS; 021import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.NUM_REGION_GROUPS; 022import static org.apache.hadoop.hbase.wal.RegionGroupingProvider.DELEGATE_PROVIDER; 023import static org.apache.hadoop.hbase.wal.RegionGroupingProvider.REGION_GROUPING_STRATEGY; 024import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; 025import static org.junit.Assert.assertEquals; 026 027import java.io.IOException; 028import java.util.Arrays; 029import java.util.HashSet; 030import java.util.Set; 031import java.util.concurrent.ThreadLocalRandom; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.RegionInfoBuilder; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.testclassification.RegionServerTests; 041import org.apache.hadoop.hbase.util.CommonFSUtils; 042import org.apache.hadoop.hdfs.DistributedFileSystem; 043import org.junit.After; 044import org.junit.AfterClass; 045import org.junit.Before; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.junit.runner.RunWith; 051import org.junit.runners.Parameterized; 052import org.junit.runners.Parameterized.Parameter; 053import org.junit.runners.Parameterized.Parameters; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057@RunWith(Parameterized.class) 058@Category({ RegionServerTests.class, MediumTests.class }) 059public class TestBoundedRegionGroupingStrategy { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestBoundedRegionGroupingStrategy.class); 064 065 private static final Logger LOG = 066 LoggerFactory.getLogger(TestBoundedRegionGroupingStrategy.class); 067 068 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 069 070 private static Configuration CONF; 071 private static DistributedFileSystem FS; 072 073 @Parameter 074 public String walProvider; 075 076 @Parameters(name = "{index}: delegate-provider={0}") 077 public static Iterable<Object[]> data() { 078 return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" }); 079 } 080 081 @Before 082 public void setUp() throws Exception { 083 CONF.set(DELEGATE_PROVIDER, walProvider); 084 } 085 086 @After 087 public void tearDown() throws Exception { 088 FileStatus[] entries = FS.listStatus(new Path("/")); 089 for (FileStatus dir : entries) { 090 FS.delete(dir.getPath(), true); 091 } 092 } 093 094 @BeforeClass 095 public static void setUpBeforeClass() throws Exception { 096 CONF = TEST_UTIL.getConfiguration(); 097 // Make block sizes small. 098 CONF.setInt("dfs.blocksize", 1024 * 1024); 099 // quicker heartbeat interval for faster DN death notification 100 CONF.setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 101 CONF.setInt("dfs.heartbeat.interval", 1); 102 CONF.setInt("dfs.client.socket-timeout", 5000); 103 104 // faster failover with cluster.shutdown();fs.close() idiom 105 CONF.setInt("hbase.ipc.client.connect.max.retries", 1); 106 CONF.setInt("dfs.client.block.recovery.retries", 1); 107 CONF.setInt("hbase.ipc.client.connection.maxidletime", 500); 108 109 CONF.setClass(WAL_PROVIDER, RegionGroupingProvider.class, WALProvider.class); 110 CONF.set(REGION_GROUPING_STRATEGY, RegionGroupingProvider.Strategies.bounded.name()); 111 112 TEST_UTIL.startMiniDFSCluster(3); 113 114 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 115 } 116 117 @AfterClass 118 public static void tearDownAfterClass() throws Exception { 119 TEST_UTIL.shutdownMiniCluster(); 120 } 121 122 /** 123 * Write to a log file with three concurrent threads and verifying all data is written. 124 */ 125 @Test 126 public void testConcurrentWrites() throws Exception { 127 // Run the WPE tool with three threads writing 3000 edits each concurrently. 128 // When done, verify that all edits were written. 129 int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF), 130 new String[] { "-threads", "3", "-verify", "-noclosefs", "-iterations", "3000" }); 131 assertEquals(0, errCode); 132 } 133 134 /** 135 * Make sure we can successfully run with more regions then our bound. 136 */ 137 @Test 138 public void testMoreRegionsThanBound() throws Exception { 139 final String parallelism = Integer.toString(DEFAULT_NUM_REGION_GROUPS * 2); 140 int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF), 141 new String[] { "-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", 142 "-regions", parallelism }); 143 assertEquals(0, errCode); 144 } 145 146 @Test 147 public void testBoundsGreaterThanDefault() throws Exception { 148 final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); 149 try { 150 CONF.setInt(NUM_REGION_GROUPS, temp * 4); 151 final String parallelism = Integer.toString(temp * 4); 152 int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF), 153 new String[] { "-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", 154 "-regions", parallelism }); 155 assertEquals(0, errCode); 156 } finally { 157 CONF.setInt(NUM_REGION_GROUPS, temp); 158 } 159 } 160 161 @Test 162 public void testMoreRegionsThanBoundWithBoundsGreaterThanDefault() throws Exception { 163 final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); 164 try { 165 CONF.setInt(NUM_REGION_GROUPS, temp * 4); 166 final String parallelism = Integer.toString(temp * 4 * 2); 167 int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF), 168 new String[] { "-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", 169 "-regions", parallelism }); 170 assertEquals(0, errCode); 171 } finally { 172 CONF.setInt(NUM_REGION_GROUPS, temp); 173 } 174 } 175 176 /** 177 * Ensure that we can use Set.add to deduplicate WALs 178 */ 179 @Test 180 public void setMembershipDedups() throws IOException { 181 final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); 182 WALFactory wals = null; 183 try { 184 CONF.setInt(NUM_REGION_GROUPS, temp * 4); 185 // Set HDFS root directory for storing WAL 186 CommonFSUtils.setRootDir(CONF, TEST_UTIL.getDataTestDirOnTestFS()); 187 188 wals = new WALFactory(CONF, "setMembershipDedups"); 189 Set<WAL> seen = new HashSet<>(temp * 4); 190 int count = 0; 191 // we know that this should see one of the wals more than once 192 for (int i = 0; i < temp * 8; i++) { 193 WAL maybeNewWAL = wals.getWAL(RegionInfoBuilder 194 .newBuilder(TableName.valueOf("Table-" + ThreadLocalRandom.current().nextInt())) 195 .build()); 196 LOG.info("Iteration " + i + ", checking wal " + maybeNewWAL); 197 if (seen.add(maybeNewWAL)) { 198 count++; 199 } 200 } 201 assertEquals("received back a different number of WALs that are not equal() to each other " 202 + "than the bound we placed.", 203 temp * 4, count); 204 } finally { 205 if (wals != null) { 206 wals.close(); 207 } 208 CONF.setInt(NUM_REGION_GROUPS, temp); 209 } 210 } 211}