001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.EnumSet;
028import java.util.List;
029import java.util.Map;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.ClusterMetrics;
032import org.apache.hadoop.hbase.ClusterMetrics.Option;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.RegionMetrics;
037import org.apache.hadoop.hbase.ServerMetrics;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.regionserver.HRegion;
041import org.apache.hadoop.hbase.regionserver.HRegionServer;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.junit.runner.RunWith;
051import org.junit.runners.Parameterized;
052
053import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
054import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
055
056@RunWith(Parameterized.class)
057@Category({ ClientTests.class, LargeTests.class })
058public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062      HBaseClassTestRule.forClass(TestAsyncClusterAdminApi.class);
063
064  @BeforeClass
065  public static void setUpBeforeClass() throws Exception {
066
067    setUpConfigurationFiles(TEST_UTIL);
068    TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, 0);
069    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
070    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
071    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
072    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
073
074    TEST_UTIL.startMiniCluster(2);
075    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
076    addResourceToRegionServerConfiguration(TEST_UTIL);
077  }
078
079  @Test
080  public void testGetMasterInfoPort() throws Exception {
081    assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getInfoServer().getPort(), (int) admin
082        .getMasterInfoPort().get());
083  }
084
085  @Test
086  public void testRegionServerOnlineConfigChange() throws Exception {
087    replaceHBaseSiteXML();
088    admin.getRegionServers().get().forEach(server -> admin.updateConfiguration(server).join());
089
090    // Check the configuration of the RegionServers
091    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
092      Configuration conf = thread.getRegionServer().getConfiguration();
093      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
094    });
095
096    restoreHBaseSiteXML();
097  }
098
099  @Test
100  public void testMasterOnlineConfigChange() throws Exception {
101    replaceHBaseSiteXML();
102    ServerName master = admin.getMaster().get();
103    admin.updateConfiguration(master).join();
104    admin.getBackupMasters().get()
105        .forEach(backupMaster -> admin.updateConfiguration(backupMaster).join());
106
107    // Check the configuration of the Masters
108    TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
109      Configuration conf = thread.getMaster().getConfiguration();
110      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
111    });
112
113    restoreHBaseSiteXML();
114  }
115
116  @Test
117  public void testAllClusterOnlineConfigChange() throws IOException {
118    replaceHBaseSiteXML();
119    admin.updateConfiguration().join();
120
121    // Check the configuration of the Masters
122    TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
123      Configuration conf = thread.getMaster().getConfiguration();
124      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
125    });
126
127    // Check the configuration of the RegionServers
128    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
129      Configuration conf = thread.getRegionServer().getConfiguration();
130      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
131    });
132
133    restoreHBaseSiteXML();
134  }
135
136  @Test
137  public void testRollWALWALWriter() throws Exception {
138    setUpforLogRolling();
139    String className = this.getClass().getName();
140    StringBuilder v = new StringBuilder(className);
141    while (v.length() < 1000) {
142      v.append(className);
143    }
144    byte[] value = Bytes.toBytes(v.toString());
145    HRegionServer regionServer = startAndWriteData(tableName, value);
146    LOG.info("after writing there are "
147        + AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files");
148
149    // flush all regions
150    for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
151      r.flush(true);
152    }
153    admin.rollWALWriter(regionServer.getServerName()).join();
154    int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
155    LOG.info("after flushing all regions and rolling logs there are " +
156        count + " log files");
157    assertTrue(("actual count: " + count), count <= 2);
158  }
159
160  private void setUpforLogRolling() {
161    // Force a region split after every 768KB
162    TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE,
163        768L * 1024L);
164
165    // We roll the log after every 32 writes
166    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
167
168    TEST_UTIL.getConfiguration().setInt(
169        "hbase.regionserver.logroll.errors.tolerated", 2);
170    TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
171
172    // For less frequently updated regions flush after every 2 flushes
173    TEST_UTIL.getConfiguration().setInt(
174        "hbase.hregion.memstore.optionalflushcount", 2);
175
176    // We flush the cache after every 8192 bytes
177    TEST_UTIL.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
178        8192);
179
180    // Increase the amount of time between client retries
181    TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
182
183    // Reduce thread wake frequency so that other threads can get
184    // a chance to run.
185    TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY,
186        2 * 1000);
187
188    /**** configuration for testLogRollOnDatanodeDeath ****/
189    // lower the namenode & datanode heartbeat so the namenode
190    // quickly detects datanode failures
191    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
192    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
193    // the namenode might still try to choose the recently-dead datanode
194    // for a pipeline, so try to a new pipeline multiple times
195    TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
196    TEST_UTIL.getConfiguration().setInt(
197        "hbase.regionserver.hlog.tolerable.lowreplication", 2);
198    TEST_UTIL.getConfiguration().setInt(
199        "hbase.regionserver.hlog.lowreplication.rolllimit", 3);
200  }
201
202  private HRegionServer startAndWriteData(TableName tableName, byte[] value) throws Exception {
203    createTableWithDefaultConf(tableName);
204    AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
205    HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
206    for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
207      Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
208      put.addColumn(FAMILY, null, value);
209      table.put(put).join();
210      if (i % 32 == 0) {
211        // After every 32 writes sleep to let the log roller run
212        try {
213          Thread.sleep(2000);
214        } catch (InterruptedException e) {
215          // continue
216        }
217      }
218    }
219    return regionServer;
220  }
221
222  @Test
223  public void testGetRegionLoads() throws Exception {
224    // Turn off the balancer
225    admin.balancerSwitch(false).join();
226    TableName[] tables =
227        new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"),
228            TableName.valueOf(tableName.getNameAsString() + "2"),
229            TableName.valueOf(tableName.getNameAsString() + "3") };
230    createAndLoadTable(tables);
231    // Sleep to wait region server report
232    Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2);
233    // Check if regions match with the regionLoad from the server
234    Collection<ServerName> servers = admin.getRegionServers().get();
235    for (ServerName serverName : servers) {
236      List<RegionInfo> regions = admin.getRegions(serverName).get();
237      checkRegionsAndRegionLoads(regions, admin.getRegionMetrics(serverName).get());
238    }
239
240    // Check if regionLoad matches the table's regions and nothing is missed
241    for (TableName table : tables) {
242      List<RegionInfo> tableRegions = admin.getRegions(table).get();
243      List<RegionMetrics> regionLoads = Lists.newArrayList();
244      for (ServerName serverName : servers) {
245        regionLoads.addAll(admin.getRegionMetrics(serverName, table).get());
246      }
247      checkRegionsAndRegionLoads(tableRegions, regionLoads);
248    }
249
250    // Check RegionLoad matches the regionLoad from ClusterStatus
251    ClusterMetrics clusterStatus = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).get();
252    assertEquals(servers.size(), clusterStatus.getLiveServerMetrics().size());
253    for (Map.Entry<ServerName, ServerMetrics> entry :
254      clusterStatus.getLiveServerMetrics().entrySet()) {
255      ServerName sn = entry.getKey();
256      ServerMetrics sm = entry.getValue();
257      compareRegionLoads(sm.getRegionMetrics().values(), admin.getRegionMetrics(sn).get());
258    }
259    for (ServerName serverName : clusterStatus.getLiveServerMetrics().keySet()) {
260      ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(serverName);
261
262    }
263  }
264
265  private void compareRegionLoads(Collection<RegionMetrics> regionLoadCluster,
266      Collection<RegionMetrics> regionLoads) {
267
268    assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match",
269      regionLoadCluster.size(), regionLoads.size());
270
271    for (RegionMetrics loadCluster : regionLoadCluster) {
272      boolean matched = false;
273      for (RegionMetrics load : regionLoads) {
274        if (Bytes.equals(loadCluster.getRegionName(), load.getRegionName())) {
275          matched = true;
276          continue;
277        }
278      }
279      assertTrue("The contents of region load from cluster and server should match", matched);
280    }
281  }
282
283  private void checkRegionsAndRegionLoads(Collection<RegionInfo> regions,
284      Collection<RegionMetrics> regionLoads) {
285
286    assertEquals("No of regions and regionloads doesn't match", regions.size(), regionLoads.size());
287
288    Map<byte[], RegionMetrics> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
289    for (RegionMetrics regionLoad : regionLoads) {
290      regionLoadMap.put(regionLoad.getRegionName(), regionLoad);
291    }
292    for (RegionInfo info : regions) {
293      assertTrue("Region not in regionLoadMap region:" + info.getRegionNameAsString()
294          + " regionMap: " + regionLoadMap, regionLoadMap.containsKey(info.getRegionName()));
295    }
296  }
297
298  private void createAndLoadTable(TableName[] tables) {
299    for (TableName table : tables) {
300      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table);
301      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
302      admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join();
303      AsyncTable<?> asyncTable = ASYNC_CONN.getTable(table);
304      List<Put> puts = new ArrayList<>();
305      for (byte[] row : HBaseTestingUtility.ROWS) {
306        puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v")));
307      }
308      asyncTable.putAll(puts).join();
309    }
310  }
311}