001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.EnumSet; 029import java.util.List; 030import java.util.Map; 031import java.util.function.Supplier; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.ClusterMetrics; 034import org.apache.hadoop.hbase.ClusterMetrics.Option; 035import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.RegionMetrics; 039import org.apache.hadoop.hbase.ServerMetrics; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.regionserver.HRegion; 043import org.apache.hadoop.hbase.regionserver.HRegionServer; 044import org.apache.hadoop.hbase.testclassification.ClientTests; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 048import org.junit.jupiter.api.AfterAll; 049import org.junit.jupiter.api.BeforeAll; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.TestTemplate; 052 053import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 054import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 055 056@Tag(ClientTests.TAG) 057@Tag(LargeTests.TAG) 058@HBaseParameterizedTestTemplate(name = "{index}: policy = {0}") 059public class TestAsyncClusterAdminApi extends TestAsyncAdminBase { 060 061 public TestAsyncClusterAdminApi(Supplier<AsyncAdmin> admin) { 062 super(admin); 063 } 064 065 @BeforeAll 066 public static void setUpBeforeClass() throws Exception { 067 068 setUpConfigurationFiles(TEST_UTIL); 069 TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, 0); 070 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 071 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 072 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 073 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 074 075 TEST_UTIL.startMiniCluster(2); 076 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 077 addResourceToRegionServerConfiguration(TEST_UTIL); 078 } 079 080 @AfterAll 081 public static void tearDownAfterClass() throws Exception { 082 TestAsyncAdminBase.tearDownAfterClass(); 083 } 084 085 @TestTemplate 086 public void testGetMasterInfoPort() throws Exception { 087 assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getInfoServer().getPort(), 088 (int) admin.getMasterInfoPort().get()); 089 } 090 091 @TestTemplate 092 public void testRegionServerOnlineConfigChange() throws Exception { 093 replaceHBaseSiteXML(); 094 admin.getRegionServers().get().forEach(server -> admin.updateConfiguration(server).join()); 095 096 // Check the configuration of the RegionServers 097 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 098 Configuration conf = thread.getRegionServer().getConfiguration(); 099 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 100 }); 101 102 restoreHBaseSiteXML(); 103 } 104 105 @TestTemplate 106 public void testMasterOnlineConfigChange() throws Exception { 107 replaceHBaseSiteXML(); 108 ServerName master = admin.getMaster().get(); 109 admin.updateConfiguration(master).join(); 110 admin.getBackupMasters().get() 111 .forEach(backupMaster -> admin.updateConfiguration(backupMaster).join()); 112 113 // Check the configuration of the Masters 114 TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { 115 Configuration conf = thread.getMaster().getConfiguration(); 116 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 117 }); 118 119 restoreHBaseSiteXML(); 120 } 121 122 @TestTemplate 123 public void testAllClusterOnlineConfigChange() throws IOException { 124 replaceHBaseSiteXML(); 125 admin.updateConfiguration().join(); 126 127 // Check the configuration of the Masters 128 TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { 129 Configuration conf = thread.getMaster().getConfiguration(); 130 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 131 }); 132 133 // Check the configuration of the RegionServers 134 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 135 Configuration conf = thread.getRegionServer().getConfiguration(); 136 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 137 }); 138 139 restoreHBaseSiteXML(); 140 } 141 142 @TestTemplate 143 public void testRollWALWALWriter() throws Exception { 144 setUpforLogRolling(); 145 String className = this.getClass().getName(); 146 StringBuilder v = new StringBuilder(className); 147 while (v.length() < 1000) { 148 v.append(className); 149 } 150 byte[] value = Bytes.toBytes(v.toString()); 151 HRegionServer regionServer = startAndWriteData(tableName, value); 152 LOG.info("after writing there are " 153 + AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files"); 154 155 // flush all regions 156 for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { 157 r.flush(true); 158 } 159 admin.rollWALWriter(regionServer.getServerName()).join(); 160 TEST_UTIL.waitFor(5000, () -> { 161 int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)); 162 LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); 163 return count <= 2; 164 }); 165 } 166 167 private void setUpforLogRolling() { 168 // Force a region split after every 768KB 169 TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L); 170 171 // We roll the log after every 32 writes 172 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32); 173 174 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2); 175 TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); 176 177 // For less frequently updated regions flush after every 2 flushes 178 TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2); 179 180 // We flush the cache after every 8192 bytes 181 TEST_UTIL.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192); 182 183 // Increase the amount of time between client retries 184 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000); 185 186 // Reduce thread wake frequency so that other threads can get 187 // a chance to run. 188 TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000); 189 190 /**** configuration for testLogRollOnDatanodeDeath ****/ 191 // lower the namenode & datanode heartbeat so the namenode 192 // quickly detects datanode failures 193 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 194 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 195 // the namenode might still try to choose the recently-dead datanode 196 // for a pipeline, so try to a new pipeline multiple times 197 TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); 198 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); 199 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); 200 } 201 202 private HRegionServer startAndWriteData(TableName tableName, byte[] value) throws Exception { 203 createTableWithDefaultConf(tableName); 204 AsyncTable<?> table = ASYNC_CONN.getTable(tableName); 205 HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); 206 for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls 207 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); 208 put.addColumn(FAMILY, null, value); 209 table.put(put).join(); 210 if (i % 32 == 0) { 211 // After every 32 writes sleep to let the log roller run 212 try { 213 Thread.sleep(2000); 214 } catch (InterruptedException e) { 215 // continue 216 } 217 } 218 } 219 return regionServer; 220 } 221 222 @TestTemplate 223 public void testGetRegionLoads() throws Exception { 224 // Turn off the balancer 225 admin.balancerSwitch(false).join(); 226 TableName[] tables = new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"), 227 TableName.valueOf(tableName.getNameAsString() + "2"), 228 TableName.valueOf(tableName.getNameAsString() + "3") }; 229 createAndLoadTable(tables); 230 // Sleep to wait region server report 231 Thread 232 .sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2); 233 // Check if regions match with the regionLoad from the server 234 Collection<ServerName> servers = admin.getRegionServers().get(); 235 for (ServerName serverName : servers) { 236 List<RegionInfo> regions = admin.getRegions(serverName).get(); 237 checkRegionsAndRegionLoads(regions, admin.getRegionMetrics(serverName).get()); 238 } 239 240 // Check if regionLoad matches the table's regions and nothing is missed 241 for (TableName table : tables) { 242 List<RegionInfo> tableRegions = admin.getRegions(table).get(); 243 List<RegionMetrics> regionLoads = Lists.newArrayList(); 244 for (ServerName serverName : servers) { 245 regionLoads.addAll(admin.getRegionMetrics(serverName, table).get()); 246 } 247 checkRegionsAndRegionLoads(tableRegions, regionLoads); 248 } 249 250 // Check RegionLoad matches the regionLoad from ClusterStatus 251 ClusterMetrics clusterStatus = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).get(); 252 assertEquals(servers.size(), clusterStatus.getLiveServerMetrics().size()); 253 for (Map.Entry<ServerName, ServerMetrics> entry : clusterStatus.getLiveServerMetrics() 254 .entrySet()) { 255 ServerName sn = entry.getKey(); 256 ServerMetrics sm = entry.getValue(); 257 compareRegionLoads(sm.getRegionMetrics().values(), admin.getRegionMetrics(sn).get()); 258 } 259 for (ServerName serverName : clusterStatus.getLiveServerMetrics().keySet()) { 260 ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(serverName); 261 262 } 263 } 264 265 @TestTemplate 266 public void testGetRegionServers() throws Exception { 267 List<ServerName> serverNames = new ArrayList<>(admin.getRegionServers(true).get()); 268 assertEquals(2, serverNames.size()); 269 270 List<ServerName> serversToDecom = new ArrayList<>(); 271 ServerName serverToDecommission = serverNames.get(0); 272 273 serversToDecom.add(serverToDecommission); 274 admin.decommissionRegionServers(serversToDecom, false).join(); 275 276 assertEquals(1, admin.getRegionServers(true).get().size()); 277 assertEquals(2, admin.getRegionServers(false).get().size()); 278 279 admin.recommissionRegionServer(serverToDecommission, Collections.emptyList()).join(); 280 281 assertEquals(2, admin.getRegionServers(true).get().size()); 282 assertEquals(2, admin.getRegionServers(false).get().size()); 283 } 284 285 private void compareRegionLoads(Collection<RegionMetrics> regionLoadCluster, 286 Collection<RegionMetrics> regionLoads) { 287 288 assertEquals(regionLoadCluster.size(), regionLoads.size(), 289 "No of regionLoads from clusterStatus and regionloads from RS doesn't match"); 290 291 for (RegionMetrics loadCluster : regionLoadCluster) { 292 boolean matched = false; 293 for (RegionMetrics load : regionLoads) { 294 if (Bytes.equals(loadCluster.getRegionName(), load.getRegionName())) { 295 matched = true; 296 continue; 297 } 298 } 299 assertTrue(matched, "The contents of region load from cluster and server should match"); 300 } 301 } 302 303 private void checkRegionsAndRegionLoads(Collection<RegionInfo> regions, 304 Collection<RegionMetrics> regionLoads) { 305 306 assertEquals(regions.size(), regionLoads.size(), "No of regions and regionloads doesn't match"); 307 308 Map<byte[], RegionMetrics> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); 309 for (RegionMetrics regionLoad : regionLoads) { 310 regionLoadMap.put(regionLoad.getRegionName(), regionLoad); 311 } 312 for (RegionInfo info : regions) { 313 assertTrue(regionLoadMap.containsKey(info.getRegionName()), 314 "Region not in regionLoadMap region:" + info.getRegionNameAsString() + " regionMap: " 315 + regionLoadMap); 316 } 317 } 318 319 private void createAndLoadTable(TableName[] tables) { 320 for (TableName table : tables) { 321 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table); 322 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); 323 admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join(); 324 AsyncTable<?> asyncTable = ASYNC_CONN.getTable(table); 325 List<Put> puts = new ArrayList<>(); 326 for (byte[] row : HBaseTestingUtil.ROWS) { 327 puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"))); 328 } 329 asyncTable.putAll(puts).join(); 330 } 331 } 332}