001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.EnumSet; 029import java.util.List; 030import java.util.Map; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ClusterMetrics; 033import org.apache.hadoop.hbase.ClusterMetrics.Option; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.RegionMetrics; 038import org.apache.hadoop.hbase.ServerMetrics; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.regionserver.HRegion; 042import org.apache.hadoop.hbase.regionserver.HRegionServer; 043import org.apache.hadoop.hbase.testclassification.ClientTests; 044import org.apache.hadoop.hbase.testclassification.LargeTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 047import org.junit.BeforeClass; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.runner.RunWith; 052import org.junit.runners.Parameterized; 053 054import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 055import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 056 057@RunWith(Parameterized.class) 058@Category({ ClientTests.class, LargeTests.class }) 059public class TestAsyncClusterAdminApi extends TestAsyncAdminBase { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestAsyncClusterAdminApi.class); 064 065 @BeforeClass 066 public static void setUpBeforeClass() throws Exception { 067 068 setUpConfigurationFiles(TEST_UTIL); 069 TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, 0); 070 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 071 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 072 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 073 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 074 075 TEST_UTIL.startMiniCluster(2); 076 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 077 addResourceToRegionServerConfiguration(TEST_UTIL); 078 } 079 080 @Test 081 public void testGetMasterInfoPort() throws Exception { 082 assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getInfoServer().getPort(), 083 (int) admin.getMasterInfoPort().get()); 084 } 085 086 @Test 087 public void testRegionServerOnlineConfigChange() throws Exception { 088 replaceHBaseSiteXML(); 089 admin.getRegionServers().get().forEach(server -> admin.updateConfiguration(server).join()); 090 091 // Check the configuration of the RegionServers 092 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 093 Configuration conf = thread.getRegionServer().getConfiguration(); 094 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 095 }); 096 097 restoreHBaseSiteXML(); 098 } 099 100 @Test 101 public void testMasterOnlineConfigChange() throws Exception { 102 replaceHBaseSiteXML(); 103 ServerName master = admin.getMaster().get(); 104 admin.updateConfiguration(master).join(); 105 admin.getBackupMasters().get() 106 .forEach(backupMaster -> admin.updateConfiguration(backupMaster).join()); 107 108 // Check the configuration of the Masters 109 TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { 110 Configuration conf = thread.getMaster().getConfiguration(); 111 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 112 }); 113 114 restoreHBaseSiteXML(); 115 } 116 117 @Test 118 public void testAllClusterOnlineConfigChange() throws IOException { 119 replaceHBaseSiteXML(); 120 admin.updateConfiguration().join(); 121 122 // Check the configuration of the Masters 123 TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { 124 Configuration conf = thread.getMaster().getConfiguration(); 125 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 126 }); 127 128 // Check the configuration of the RegionServers 129 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 130 Configuration conf = thread.getRegionServer().getConfiguration(); 131 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 132 }); 133 134 restoreHBaseSiteXML(); 135 } 136 137 @Test 138 public void testRollWALWALWriter() throws Exception { 139 setUpforLogRolling(); 140 String className = this.getClass().getName(); 141 StringBuilder v = new StringBuilder(className); 142 while (v.length() < 1000) { 143 v.append(className); 144 } 145 byte[] value = Bytes.toBytes(v.toString()); 146 HRegionServer regionServer = startAndWriteData(tableName, value); 147 LOG.info("after writing there are " 148 + AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files"); 149 150 // flush all regions 151 for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { 152 r.flush(true); 153 } 154 admin.rollWALWriter(regionServer.getServerName()).join(); 155 int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)); 156 LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); 157 assertTrue(("actual count: " + count), count <= 2); 158 } 159 160 private void setUpforLogRolling() { 161 // Force a region split after every 768KB 162 TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L); 163 164 // We roll the log after every 32 writes 165 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32); 166 167 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2); 168 TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); 169 170 // For less frequently updated regions flush after every 2 flushes 171 TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2); 172 173 // We flush the cache after every 8192 bytes 174 TEST_UTIL.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192); 175 176 // Increase the amount of time between client retries 177 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000); 178 179 // Reduce thread wake frequency so that other threads can get 180 // a chance to run. 181 TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000); 182 183 /**** configuration for testLogRollOnDatanodeDeath ****/ 184 // lower the namenode & datanode heartbeat so the namenode 185 // quickly detects datanode failures 186 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 187 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 188 // the namenode might still try to choose the recently-dead datanode 189 // for a pipeline, so try to a new pipeline multiple times 190 TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); 191 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); 192 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); 193 } 194 195 private HRegionServer startAndWriteData(TableName tableName, byte[] value) throws Exception { 196 createTableWithDefaultConf(tableName); 197 AsyncTable<?> table = ASYNC_CONN.getTable(tableName); 198 HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); 199 for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls 200 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); 201 put.addColumn(FAMILY, null, value); 202 table.put(put).join(); 203 if (i % 32 == 0) { 204 // After every 32 writes sleep to let the log roller run 205 try { 206 Thread.sleep(2000); 207 } catch (InterruptedException e) { 208 // continue 209 } 210 } 211 } 212 return regionServer; 213 } 214 215 @Test 216 public void testGetRegionLoads() throws Exception { 217 // Turn off the balancer 218 admin.balancerSwitch(false).join(); 219 TableName[] tables = new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"), 220 TableName.valueOf(tableName.getNameAsString() + "2"), 221 TableName.valueOf(tableName.getNameAsString() + "3") }; 222 createAndLoadTable(tables); 223 // Sleep to wait region server report 224 Thread 225 .sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2); 226 // Check if regions match with the regionLoad from the server 227 Collection<ServerName> servers = admin.getRegionServers().get(); 228 for (ServerName serverName : servers) { 229 List<RegionInfo> regions = admin.getRegions(serverName).get(); 230 checkRegionsAndRegionLoads(regions, admin.getRegionMetrics(serverName).get()); 231 } 232 233 // Check if regionLoad matches the table's regions and nothing is missed 234 for (TableName table : tables) { 235 List<RegionInfo> tableRegions = admin.getRegions(table).get(); 236 List<RegionMetrics> regionLoads = Lists.newArrayList(); 237 for (ServerName serverName : servers) { 238 regionLoads.addAll(admin.getRegionMetrics(serverName, table).get()); 239 } 240 checkRegionsAndRegionLoads(tableRegions, regionLoads); 241 } 242 243 // Check RegionLoad matches the regionLoad from ClusterStatus 244 ClusterMetrics clusterStatus = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).get(); 245 assertEquals(servers.size(), clusterStatus.getLiveServerMetrics().size()); 246 for (Map.Entry<ServerName, ServerMetrics> entry : clusterStatus.getLiveServerMetrics() 247 .entrySet()) { 248 ServerName sn = entry.getKey(); 249 ServerMetrics sm = entry.getValue(); 250 compareRegionLoads(sm.getRegionMetrics().values(), admin.getRegionMetrics(sn).get()); 251 } 252 for (ServerName serverName : clusterStatus.getLiveServerMetrics().keySet()) { 253 ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(serverName); 254 255 } 256 } 257 258 @Test 259 public void testGetRegionServers() throws Exception { 260 List<ServerName> serverNames = new ArrayList<>(admin.getRegionServers(true).get()); 261 assertEquals(2, serverNames.size()); 262 263 List<ServerName> serversToDecom = new ArrayList<>(); 264 ServerName serverToDecommission = serverNames.get(0); 265 266 serversToDecom.add(serverToDecommission); 267 admin.decommissionRegionServers(serversToDecom, false).join(); 268 269 assertEquals(1, admin.getRegionServers(true).get().size()); 270 assertEquals(2, admin.getRegionServers(false).get().size()); 271 272 admin.recommissionRegionServer(serverToDecommission, Collections.emptyList()).join(); 273 274 assertEquals(2, admin.getRegionServers(true).get().size()); 275 assertEquals(2, admin.getRegionServers(false).get().size()); 276 } 277 278 private void compareRegionLoads(Collection<RegionMetrics> regionLoadCluster, 279 Collection<RegionMetrics> regionLoads) { 280 281 assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match", 282 regionLoadCluster.size(), regionLoads.size()); 283 284 for (RegionMetrics loadCluster : regionLoadCluster) { 285 boolean matched = false; 286 for (RegionMetrics load : regionLoads) { 287 if (Bytes.equals(loadCluster.getRegionName(), load.getRegionName())) { 288 matched = true; 289 continue; 290 } 291 } 292 assertTrue("The contents of region load from cluster and server should match", matched); 293 } 294 } 295 296 private void checkRegionsAndRegionLoads(Collection<RegionInfo> regions, 297 Collection<RegionMetrics> regionLoads) { 298 299 assertEquals("No of regions and regionloads doesn't match", regions.size(), regionLoads.size()); 300 301 Map<byte[], RegionMetrics> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); 302 for (RegionMetrics regionLoad : regionLoads) { 303 regionLoadMap.put(regionLoad.getRegionName(), regionLoad); 304 } 305 for (RegionInfo info : regions) { 306 assertTrue("Region not in regionLoadMap region:" + info.getRegionNameAsString() 307 + " regionMap: " + regionLoadMap, regionLoadMap.containsKey(info.getRegionName())); 308 } 309 } 310 311 private void createAndLoadTable(TableName[] tables) { 312 for (TableName table : tables) { 313 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table); 314 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); 315 admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join(); 316 AsyncTable<?> asyncTable = ASYNC_CONN.getTable(table); 317 List<Put> puts = new ArrayList<>(); 318 for (byte[] row : HBaseTestingUtility.ROWS) { 319 puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"))); 320 } 321 asyncTable.putAll(puts).join(); 322 } 323 } 324}