001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.EnumSet; 028import java.util.List; 029import java.util.Map; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.ClusterMetrics; 032import org.apache.hadoop.hbase.ClusterMetrics.Option; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.RegionMetrics; 037import org.apache.hadoop.hbase.ServerMetrics; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.regionserver.HRegion; 041import org.apache.hadoop.hbase.regionserver.HRegionServer; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.junit.runner.RunWith; 051import org.junit.runners.Parameterized; 052 053import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 054import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 055 056@RunWith(Parameterized.class) 057@Category({ ClientTests.class, LargeTests.class }) 058public class TestAsyncClusterAdminApi extends TestAsyncAdminBase { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestAsyncClusterAdminApi.class); 063 064 @BeforeClass 065 public static void setUpBeforeClass() throws Exception { 066 067 setUpConfigurationFiles(TEST_UTIL); 068 TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, 0); 069 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 070 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 071 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 072 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 073 074 TEST_UTIL.startMiniCluster(2); 075 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 076 addResourceToRegionServerConfiguration(TEST_UTIL); 077 } 078 079 @Test 080 public void testGetMasterInfoPort() throws Exception { 081 assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getInfoServer().getPort(), (int) admin 082 .getMasterInfoPort().get()); 083 } 084 085 @Test 086 public void testRegionServerOnlineConfigChange() throws Exception { 087 replaceHBaseSiteXML(); 088 admin.getRegionServers().get().forEach(server -> admin.updateConfiguration(server).join()); 089 090 // Check the configuration of the RegionServers 091 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 092 Configuration conf = thread.getRegionServer().getConfiguration(); 093 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 094 }); 095 096 restoreHBaseSiteXML(); 097 } 098 099 @Test 100 public void testMasterOnlineConfigChange() throws Exception { 101 replaceHBaseSiteXML(); 102 ServerName master = admin.getMaster().get(); 103 admin.updateConfiguration(master).join(); 104 admin.getBackupMasters().get() 105 .forEach(backupMaster -> admin.updateConfiguration(backupMaster).join()); 106 107 // Check the configuration of the Masters 108 TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { 109 Configuration conf = thread.getMaster().getConfiguration(); 110 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 111 }); 112 113 restoreHBaseSiteXML(); 114 } 115 116 @Test 117 public void testAllClusterOnlineConfigChange() throws IOException { 118 replaceHBaseSiteXML(); 119 admin.updateConfiguration().join(); 120 121 // Check the configuration of the Masters 122 TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { 123 Configuration conf = thread.getMaster().getConfiguration(); 124 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 125 }); 126 127 // Check the configuration of the RegionServers 128 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 129 Configuration conf = thread.getRegionServer().getConfiguration(); 130 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 131 }); 132 133 restoreHBaseSiteXML(); 134 } 135 136 @Test 137 public void testRollWALWALWriter() throws Exception { 138 setUpforLogRolling(); 139 String className = this.getClass().getName(); 140 StringBuilder v = new StringBuilder(className); 141 while (v.length() < 1000) { 142 v.append(className); 143 } 144 byte[] value = Bytes.toBytes(v.toString()); 145 HRegionServer regionServer = startAndWriteData(tableName, value); 146 LOG.info("after writing there are " 147 + AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files"); 148 149 // flush all regions 150 for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { 151 r.flush(true); 152 } 153 admin.rollWALWriter(regionServer.getServerName()).join(); 154 int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)); 155 LOG.info("after flushing all regions and rolling logs there are " + 156 count + " log files"); 157 assertTrue(("actual count: " + count), count <= 2); 158 } 159 160 private void setUpforLogRolling() { 161 // Force a region split after every 768KB 162 TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 163 768L * 1024L); 164 165 // We roll the log after every 32 writes 166 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32); 167 168 TEST_UTIL.getConfiguration().setInt( 169 "hbase.regionserver.logroll.errors.tolerated", 2); 170 TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); 171 172 // For less frequently updated regions flush after every 2 flushes 173 TEST_UTIL.getConfiguration().setInt( 174 "hbase.hregion.memstore.optionalflushcount", 2); 175 176 // We flush the cache after every 8192 bytes 177 TEST_UTIL.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 178 8192); 179 180 // Increase the amount of time between client retries 181 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000); 182 183 // Reduce thread wake frequency so that other threads can get 184 // a chance to run. 185 TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 186 2 * 1000); 187 188 /**** configuration for testLogRollOnDatanodeDeath ****/ 189 // lower the namenode & datanode heartbeat so the namenode 190 // quickly detects datanode failures 191 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 192 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 193 // the namenode might still try to choose the recently-dead datanode 194 // for a pipeline, so try to a new pipeline multiple times 195 TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); 196 TEST_UTIL.getConfiguration().setInt( 197 "hbase.regionserver.hlog.tolerable.lowreplication", 2); 198 TEST_UTIL.getConfiguration().setInt( 199 "hbase.regionserver.hlog.lowreplication.rolllimit", 3); 200 } 201 202 private HRegionServer startAndWriteData(TableName tableName, byte[] value) throws Exception { 203 createTableWithDefaultConf(tableName); 204 AsyncTable<?> table = ASYNC_CONN.getTable(tableName); 205 HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); 206 for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls 207 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); 208 put.addColumn(FAMILY, null, value); 209 table.put(put).join(); 210 if (i % 32 == 0) { 211 // After every 32 writes sleep to let the log roller run 212 try { 213 Thread.sleep(2000); 214 } catch (InterruptedException e) { 215 // continue 216 } 217 } 218 } 219 return regionServer; 220 } 221 222 @Test 223 public void testGetRegionLoads() throws Exception { 224 // Turn off the balancer 225 admin.balancerSwitch(false).join(); 226 TableName[] tables = 227 new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"), 228 TableName.valueOf(tableName.getNameAsString() + "2"), 229 TableName.valueOf(tableName.getNameAsString() + "3") }; 230 createAndLoadTable(tables); 231 // Sleep to wait region server report 232 Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2); 233 // Check if regions match with the regionLoad from the server 234 Collection<ServerName> servers = admin.getRegionServers().get(); 235 for (ServerName serverName : servers) { 236 List<RegionInfo> regions = admin.getRegions(serverName).get(); 237 checkRegionsAndRegionLoads(regions, admin.getRegionMetrics(serverName).get()); 238 } 239 240 // Check if regionLoad matches the table's regions and nothing is missed 241 for (TableName table : tables) { 242 List<RegionInfo> tableRegions = admin.getRegions(table).get(); 243 List<RegionMetrics> regionLoads = Lists.newArrayList(); 244 for (ServerName serverName : servers) { 245 regionLoads.addAll(admin.getRegionMetrics(serverName, table).get()); 246 } 247 checkRegionsAndRegionLoads(tableRegions, regionLoads); 248 } 249 250 // Check RegionLoad matches the regionLoad from ClusterStatus 251 ClusterMetrics clusterStatus = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).get(); 252 assertEquals(servers.size(), clusterStatus.getLiveServerMetrics().size()); 253 for (Map.Entry<ServerName, ServerMetrics> entry : 254 clusterStatus.getLiveServerMetrics().entrySet()) { 255 ServerName sn = entry.getKey(); 256 ServerMetrics sm = entry.getValue(); 257 compareRegionLoads(sm.getRegionMetrics().values(), admin.getRegionMetrics(sn).get()); 258 } 259 for (ServerName serverName : clusterStatus.getLiveServerMetrics().keySet()) { 260 ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(serverName); 261 262 } 263 } 264 265 private void compareRegionLoads(Collection<RegionMetrics> regionLoadCluster, 266 Collection<RegionMetrics> regionLoads) { 267 268 assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match", 269 regionLoadCluster.size(), regionLoads.size()); 270 271 for (RegionMetrics loadCluster : regionLoadCluster) { 272 boolean matched = false; 273 for (RegionMetrics load : regionLoads) { 274 if (Bytes.equals(loadCluster.getRegionName(), load.getRegionName())) { 275 matched = true; 276 continue; 277 } 278 } 279 assertTrue("The contents of region load from cluster and server should match", matched); 280 } 281 } 282 283 private void checkRegionsAndRegionLoads(Collection<RegionInfo> regions, 284 Collection<RegionMetrics> regionLoads) { 285 286 assertEquals("No of regions and regionloads doesn't match", regions.size(), regionLoads.size()); 287 288 Map<byte[], RegionMetrics> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); 289 for (RegionMetrics regionLoad : regionLoads) { 290 regionLoadMap.put(regionLoad.getRegionName(), regionLoad); 291 } 292 for (RegionInfo info : regions) { 293 assertTrue("Region not in regionLoadMap region:" + info.getRegionNameAsString() 294 + " regionMap: " + regionLoadMap, regionLoadMap.containsKey(info.getRegionName())); 295 } 296 } 297 298 private void createAndLoadTable(TableName[] tables) { 299 for (TableName table : tables) { 300 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table); 301 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); 302 admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join(); 303 AsyncTable<?> asyncTable = ASYNC_CONN.getTable(table); 304 List<Put> puts = new ArrayList<>(); 305 for (byte[] row : HBaseTestingUtility.ROWS) { 306 puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"))); 307 } 308 asyncTable.putAll(puts).join(); 309 } 310 } 311}