001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.nio.file.FileSystems; 026import java.nio.file.Files; 027import java.nio.file.Path; 028import java.nio.file.StandardCopyOption; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.EnumSet; 032import java.util.List; 033import java.util.Map; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.ClusterMetrics; 036import org.apache.hadoop.hbase.ClusterMetrics.Option; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.RegionMetrics; 041import org.apache.hadoop.hbase.ServerMetrics; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.regionserver.HRegion; 045import org.apache.hadoop.hbase.regionserver.HRegionServer; 046import org.apache.hadoop.hbase.testclassification.ClientTests; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.junit.runner.RunWith; 055import org.junit.runners.Parameterized; 056 057import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 058import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 059 060@RunWith(Parameterized.class) 061@Category({ ClientTests.class, LargeTests.class }) 062public class TestAsyncClusterAdminApi extends TestAsyncAdminBase { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestAsyncClusterAdminApi.class); 067 068 private final Path cnfPath = FileSystems.getDefault().getPath("target/test-classes/hbase-site.xml"); 069 private final Path cnf2Path = FileSystems.getDefault().getPath("target/test-classes/hbase-site2.xml"); 070 private final Path cnf3Path = FileSystems.getDefault().getPath("target/test-classes/hbase-site3.xml"); 071 072 @BeforeClass 073 public static void setUpBeforeClass() throws Exception { 074 TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, 0); 075 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 076 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 077 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 078 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 079 TEST_UTIL.startMiniCluster(2); 080 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 081 } 082 083 @Test 084 public void testGetMasterInfoPort() throws Exception { 085 assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getInfoServer().getPort(), (int) admin 086 .getMasterInfoPort().get()); 087 } 088 089 @Test 090 public void testRegionServerOnlineConfigChange() throws Exception { 091 replaceHBaseSiteXML(); 092 admin.getRegionServers().get().forEach(server -> admin.updateConfiguration(server).join()); 093 094 // Check the configuration of the RegionServers 095 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 096 Configuration conf = thread.getRegionServer().getConfiguration(); 097 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 098 }); 099 100 restoreHBaseSiteXML(); 101 } 102 103 @Test 104 public void testMasterOnlineConfigChange() throws Exception { 105 replaceHBaseSiteXML(); 106 ServerName master = admin.getMaster().get(); 107 admin.updateConfiguration(master).join(); 108 admin.getBackupMasters().get() 109 .forEach(backupMaster -> admin.updateConfiguration(backupMaster).join()); 110 111 // Check the configuration of the Masters 112 TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { 113 Configuration conf = thread.getMaster().getConfiguration(); 114 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 115 }); 116 117 restoreHBaseSiteXML(); 118 } 119 120 @Test 121 public void testAllClusterOnlineConfigChange() throws IOException { 122 replaceHBaseSiteXML(); 123 admin.updateConfiguration().join(); 124 125 // Check the configuration of the Masters 126 TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { 127 Configuration conf = thread.getMaster().getConfiguration(); 128 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 129 }); 130 131 // Check the configuration of the RegionServers 132 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 133 Configuration conf = thread.getRegionServer().getConfiguration(); 134 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 135 }); 136 137 restoreHBaseSiteXML(); 138 } 139 140 private void replaceHBaseSiteXML() throws IOException { 141 // make a backup of hbase-site.xml 142 Files.copy(cnfPath, cnf3Path, StandardCopyOption.REPLACE_EXISTING); 143 // update hbase-site.xml by overwriting it 144 Files.copy(cnf2Path, cnfPath, StandardCopyOption.REPLACE_EXISTING); 145 } 146 147 private void restoreHBaseSiteXML() throws IOException { 148 // restore hbase-site.xml 149 Files.copy(cnf3Path, cnfPath, StandardCopyOption.REPLACE_EXISTING); 150 } 151 152 @Test 153 public void testRollWALWALWriter() throws Exception { 154 setUpforLogRolling(); 155 String className = this.getClass().getName(); 156 StringBuilder v = new StringBuilder(className); 157 while (v.length() < 1000) { 158 v.append(className); 159 } 160 byte[] value = Bytes.toBytes(v.toString()); 161 HRegionServer regionServer = startAndWriteData(tableName, value); 162 LOG.info("after writing there are " 163 + AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files"); 164 165 // flush all regions 166 for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { 167 r.flush(true); 168 } 169 admin.rollWALWriter(regionServer.getServerName()).join(); 170 int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)); 171 LOG.info("after flushing all regions and rolling logs there are " + 172 count + " log files"); 173 assertTrue(("actual count: " + count), count <= 2); 174 } 175 176 private void setUpforLogRolling() { 177 // Force a region split after every 768KB 178 TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 179 768L * 1024L); 180 181 // We roll the log after every 32 writes 182 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32); 183 184 TEST_UTIL.getConfiguration().setInt( 185 "hbase.regionserver.logroll.errors.tolerated", 2); 186 TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); 187 188 // For less frequently updated regions flush after every 2 flushes 189 TEST_UTIL.getConfiguration().setInt( 190 "hbase.hregion.memstore.optionalflushcount", 2); 191 192 // We flush the cache after every 8192 bytes 193 TEST_UTIL.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 194 8192); 195 196 // Increase the amount of time between client retries 197 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000); 198 199 // Reduce thread wake frequency so that other threads can get 200 // a chance to run. 201 TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 202 2 * 1000); 203 204 /**** configuration for testLogRollOnDatanodeDeath ****/ 205 // lower the namenode & datanode heartbeat so the namenode 206 // quickly detects datanode failures 207 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 208 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 209 // the namenode might still try to choose the recently-dead datanode 210 // for a pipeline, so try to a new pipeline multiple times 211 TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); 212 TEST_UTIL.getConfiguration().setInt( 213 "hbase.regionserver.hlog.tolerable.lowreplication", 2); 214 TEST_UTIL.getConfiguration().setInt( 215 "hbase.regionserver.hlog.lowreplication.rolllimit", 3); 216 } 217 218 private HRegionServer startAndWriteData(TableName tableName, byte[] value) throws Exception { 219 createTableWithDefaultConf(tableName); 220 AsyncTable<?> table = ASYNC_CONN.getTable(tableName); 221 HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); 222 for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls 223 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); 224 put.addColumn(FAMILY, null, value); 225 table.put(put).join(); 226 if (i % 32 == 0) { 227 // After every 32 writes sleep to let the log roller run 228 try { 229 Thread.sleep(2000); 230 } catch (InterruptedException e) { 231 // continue 232 } 233 } 234 } 235 return regionServer; 236 } 237 238 @Test 239 public void testGetRegionLoads() throws Exception { 240 // Turn off the balancer 241 admin.balancerSwitch(false).join(); 242 TableName[] tables = 243 new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"), 244 TableName.valueOf(tableName.getNameAsString() + "2"), 245 TableName.valueOf(tableName.getNameAsString() + "3") }; 246 createAndLoadTable(tables); 247 // Sleep to wait region server report 248 Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2); 249 // Check if regions match with the regionLoad from the server 250 Collection<ServerName> servers = admin.getRegionServers().get(); 251 for (ServerName serverName : servers) { 252 List<RegionInfo> regions = admin.getRegions(serverName).get(); 253 checkRegionsAndRegionLoads(regions, admin.getRegionMetrics(serverName).get()); 254 } 255 256 // Check if regionLoad matches the table's regions and nothing is missed 257 for (TableName table : tables) { 258 List<RegionInfo> tableRegions = admin.getRegions(table).get(); 259 List<RegionMetrics> regionLoads = Lists.newArrayList(); 260 for (ServerName serverName : servers) { 261 regionLoads.addAll(admin.getRegionMetrics(serverName, table).get()); 262 } 263 checkRegionsAndRegionLoads(tableRegions, regionLoads); 264 } 265 266 // Check RegionLoad matches the regionLoad from ClusterStatus 267 ClusterMetrics clusterStatus = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).get(); 268 for (Map.Entry<ServerName, ServerMetrics> entry : 269 clusterStatus.getLiveServerMetrics().entrySet()) { 270 ServerName sn = entry.getKey(); 271 ServerMetrics sm = entry.getValue(); 272 compareRegionLoads(sm.getRegionMetrics().values(), admin.getRegionMetrics(sn).get()); 273 } 274 for (ServerName serverName : clusterStatus.getLiveServerMetrics().keySet()) { 275 ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(serverName); 276 277 } 278 } 279 280 private void compareRegionLoads(Collection<RegionMetrics> regionLoadCluster, 281 Collection<RegionMetrics> regionLoads) { 282 283 assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match", 284 regionLoadCluster.size(), regionLoads.size()); 285 286 for (RegionMetrics loadCluster : regionLoadCluster) { 287 boolean matched = false; 288 for (RegionMetrics load : regionLoads) { 289 if (Bytes.equals(loadCluster.getRegionName(), load.getRegionName())) { 290 matched = true; 291 continue; 292 } 293 } 294 assertTrue("The contents of region load from cluster and server should match", matched); 295 } 296 } 297 298 private void checkRegionsAndRegionLoads(Collection<RegionInfo> regions, 299 Collection<RegionMetrics> regionLoads) { 300 301 assertEquals("No of regions and regionloads doesn't match", regions.size(), regionLoads.size()); 302 303 Map<byte[], RegionMetrics> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); 304 for (RegionMetrics regionLoad : regionLoads) { 305 regionLoadMap.put(regionLoad.getRegionName(), regionLoad); 306 } 307 for (RegionInfo info : regions) { 308 assertTrue("Region not in regionLoadMap region:" + info.getRegionNameAsString() 309 + " regionMap: " + regionLoadMap, regionLoadMap.containsKey(info.getRegionName())); 310 } 311 } 312 313 private void createAndLoadTable(TableName[] tables) { 314 for (TableName table : tables) { 315 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table); 316 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); 317 admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join(); 318 AsyncTable<?> asyncTable = ASYNC_CONN.getTable(table); 319 List<Put> puts = new ArrayList<>(); 320 for (byte[] row : HBaseTestingUtility.ROWS) { 321 puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"))); 322 } 323 asyncTable.putAll(puts).join(); 324 } 325 } 326}