001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.EnumSet;
029import java.util.List;
030import java.util.Map;
031import java.util.function.Supplier;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.ClusterMetrics;
034import org.apache.hadoop.hbase.ClusterMetrics.Option;
035import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.RegionMetrics;
039import org.apache.hadoop.hbase.ServerMetrics;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.HRegionServer;
044import org.apache.hadoop.hbase.testclassification.ClientTests;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
048import org.junit.jupiter.api.AfterAll;
049import org.junit.jupiter.api.BeforeAll;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.TestTemplate;
052
053import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
054import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
055
056@Tag(ClientTests.TAG)
057@Tag(LargeTests.TAG)
058@HBaseParameterizedTestTemplate(name = "{index}: policy = {0}")
059public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
060
061  public TestAsyncClusterAdminApi(Supplier<AsyncAdmin> admin) {
062    super(admin);
063  }
064
065  @BeforeAll
066  public static void setUpBeforeClass() throws Exception {
067
068    setUpConfigurationFiles(TEST_UTIL);
069    TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, 0);
070    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
071    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
072    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
073    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
074
075    TEST_UTIL.startMiniCluster(2);
076    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
077    addResourceToRegionServerConfiguration(TEST_UTIL);
078  }
079
080  @AfterAll
081  public static void tearDownAfterClass() throws Exception {
082    TestAsyncAdminBase.tearDownAfterClass();
083  }
084
085  @TestTemplate
086  public void testGetMasterInfoPort() throws Exception {
087    assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getInfoServer().getPort(),
088      (int) admin.getMasterInfoPort().get());
089  }
090
091  @TestTemplate
092  public void testRegionServerOnlineConfigChange() throws Exception {
093    replaceHBaseSiteXML();
094    admin.getRegionServers().get().forEach(server -> admin.updateConfiguration(server).join());
095
096    // Check the configuration of the RegionServers
097    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
098      Configuration conf = thread.getRegionServer().getConfiguration();
099      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
100    });
101
102    restoreHBaseSiteXML();
103  }
104
105  @TestTemplate
106  public void testMasterOnlineConfigChange() throws Exception {
107    replaceHBaseSiteXML();
108    ServerName master = admin.getMaster().get();
109    admin.updateConfiguration(master).join();
110    admin.getBackupMasters().get()
111      .forEach(backupMaster -> admin.updateConfiguration(backupMaster).join());
112
113    // Check the configuration of the Masters
114    TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
115      Configuration conf = thread.getMaster().getConfiguration();
116      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
117    });
118
119    restoreHBaseSiteXML();
120  }
121
122  @TestTemplate
123  public void testAllClusterOnlineConfigChange() throws IOException {
124    replaceHBaseSiteXML();
125    admin.updateConfiguration().join();
126
127    // Check the configuration of the Masters
128    TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
129      Configuration conf = thread.getMaster().getConfiguration();
130      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
131    });
132
133    // Check the configuration of the RegionServers
134    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
135      Configuration conf = thread.getRegionServer().getConfiguration();
136      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
137    });
138
139    restoreHBaseSiteXML();
140  }
141
142  @TestTemplate
143  public void testRollWALWALWriter() throws Exception {
144    setUpforLogRolling();
145    String className = this.getClass().getName();
146    StringBuilder v = new StringBuilder(className);
147    while (v.length() < 1000) {
148      v.append(className);
149    }
150    byte[] value = Bytes.toBytes(v.toString());
151    HRegionServer regionServer = startAndWriteData(tableName, value);
152    LOG.info("after writing there are "
153      + AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files");
154
155    // flush all regions
156    for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
157      r.flush(true);
158    }
159    admin.rollWALWriter(regionServer.getServerName()).join();
160    TEST_UTIL.waitFor(5000, () -> {
161      int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
162      LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
163      return count <= 2;
164    });
165  }
166
167  private void setUpforLogRolling() {
168    // Force a region split after every 768KB
169    TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L);
170
171    // We roll the log after every 32 writes
172    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
173
174    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
175    TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
176
177    // For less frequently updated regions flush after every 2 flushes
178    TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
179
180    // We flush the cache after every 8192 bytes
181    TEST_UTIL.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192);
182
183    // Increase the amount of time between client retries
184    TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
185
186    // Reduce thread wake frequency so that other threads can get
187    // a chance to run.
188    TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
189
190    /**** configuration for testLogRollOnDatanodeDeath ****/
191    // lower the namenode & datanode heartbeat so the namenode
192    // quickly detects datanode failures
193    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
194    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
195    // the namenode might still try to choose the recently-dead datanode
196    // for a pipeline, so try to a new pipeline multiple times
197    TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
198    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
199    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
200  }
201
202  private HRegionServer startAndWriteData(TableName tableName, byte[] value) throws Exception {
203    createTableWithDefaultConf(tableName);
204    AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
205    HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
206    for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
207      Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
208      put.addColumn(FAMILY, null, value);
209      table.put(put).join();
210      if (i % 32 == 0) {
211        // After every 32 writes sleep to let the log roller run
212        try {
213          Thread.sleep(2000);
214        } catch (InterruptedException e) {
215          // continue
216        }
217      }
218    }
219    return regionServer;
220  }
221
222  @TestTemplate
223  public void testGetRegionLoads() throws Exception {
224    // Turn off the balancer
225    admin.balancerSwitch(false).join();
226    TableName[] tables = new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"),
227      TableName.valueOf(tableName.getNameAsString() + "2"),
228      TableName.valueOf(tableName.getNameAsString() + "3") };
229    createAndLoadTable(tables);
230    // Sleep to wait region server report
231    Thread
232      .sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2);
233    // Check if regions match with the regionLoad from the server
234    Collection<ServerName> servers = admin.getRegionServers().get();
235    for (ServerName serverName : servers) {
236      List<RegionInfo> regions = admin.getRegions(serverName).get();
237      checkRegionsAndRegionLoads(regions, admin.getRegionMetrics(serverName).get());
238    }
239
240    // Check if regionLoad matches the table's regions and nothing is missed
241    for (TableName table : tables) {
242      List<RegionInfo> tableRegions = admin.getRegions(table).get();
243      List<RegionMetrics> regionLoads = Lists.newArrayList();
244      for (ServerName serverName : servers) {
245        regionLoads.addAll(admin.getRegionMetrics(serverName, table).get());
246      }
247      checkRegionsAndRegionLoads(tableRegions, regionLoads);
248    }
249
250    // Check RegionLoad matches the regionLoad from ClusterStatus
251    ClusterMetrics clusterStatus = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).get();
252    assertEquals(servers.size(), clusterStatus.getLiveServerMetrics().size());
253    for (Map.Entry<ServerName, ServerMetrics> entry : clusterStatus.getLiveServerMetrics()
254      .entrySet()) {
255      ServerName sn = entry.getKey();
256      ServerMetrics sm = entry.getValue();
257      compareRegionLoads(sm.getRegionMetrics().values(), admin.getRegionMetrics(sn).get());
258    }
259    for (ServerName serverName : clusterStatus.getLiveServerMetrics().keySet()) {
260      ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(serverName);
261
262    }
263  }
264
265  @TestTemplate
266  public void testGetRegionServers() throws Exception {
267    List<ServerName> serverNames = new ArrayList<>(admin.getRegionServers(true).get());
268    assertEquals(2, serverNames.size());
269
270    List<ServerName> serversToDecom = new ArrayList<>();
271    ServerName serverToDecommission = serverNames.get(0);
272
273    serversToDecom.add(serverToDecommission);
274    admin.decommissionRegionServers(serversToDecom, false).join();
275
276    assertEquals(1, admin.getRegionServers(true).get().size());
277    assertEquals(2, admin.getRegionServers(false).get().size());
278
279    admin.recommissionRegionServer(serverToDecommission, Collections.emptyList()).join();
280
281    assertEquals(2, admin.getRegionServers(true).get().size());
282    assertEquals(2, admin.getRegionServers(false).get().size());
283  }
284
285  private void compareRegionLoads(Collection<RegionMetrics> regionLoadCluster,
286    Collection<RegionMetrics> regionLoads) {
287
288    assertEquals(regionLoadCluster.size(), regionLoads.size(),
289      "No of regionLoads from clusterStatus and regionloads from RS doesn't match");
290
291    for (RegionMetrics loadCluster : regionLoadCluster) {
292      boolean matched = false;
293      for (RegionMetrics load : regionLoads) {
294        if (Bytes.equals(loadCluster.getRegionName(), load.getRegionName())) {
295          matched = true;
296          continue;
297        }
298      }
299      assertTrue(matched, "The contents of region load from cluster and server should match");
300    }
301  }
302
303  private void checkRegionsAndRegionLoads(Collection<RegionInfo> regions,
304    Collection<RegionMetrics> regionLoads) {
305
306    assertEquals(regions.size(), regionLoads.size(), "No of regions and regionloads doesn't match");
307
308    Map<byte[], RegionMetrics> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
309    for (RegionMetrics regionLoad : regionLoads) {
310      regionLoadMap.put(regionLoad.getRegionName(), regionLoad);
311    }
312    for (RegionInfo info : regions) {
313      assertTrue(regionLoadMap.containsKey(info.getRegionName()),
314        "Region not in regionLoadMap region:" + info.getRegionNameAsString() + " regionMap: "
315          + regionLoadMap);
316    }
317  }
318
319  private void createAndLoadTable(TableName[] tables) {
320    for (TableName table : tables) {
321      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table);
322      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
323      admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join();
324      AsyncTable<?> asyncTable = ASYNC_CONN.getTable(table);
325      List<Put> puts = new ArrayList<>();
326      for (byte[] row : HBaseTestingUtil.ROWS) {
327        puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v")));
328      }
329      asyncTable.putAll(puts).join();
330    }
331  }
332}