001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.EnumSet;
029import java.util.List;
030import java.util.Map;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.ClusterMetrics.Option;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.RegionMetrics;
038import org.apache.hadoop.hbase.ServerMetrics;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.regionserver.HRegion;
042import org.apache.hadoop.hbase.regionserver.HRegionServer;
043import org.apache.hadoop.hbase.testclassification.ClientTests;
044import org.apache.hadoop.hbase.testclassification.LargeTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
047import org.junit.BeforeClass;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.runner.RunWith;
052import org.junit.runners.Parameterized;
053
054import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
055import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
056
057@RunWith(Parameterized.class)
058@Category({ ClientTests.class, LargeTests.class })
059public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestAsyncClusterAdminApi.class);
064
065  @BeforeClass
066  public static void setUpBeforeClass() throws Exception {
067
068    setUpConfigurationFiles(TEST_UTIL);
069    TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, 0);
070    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
071    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
072    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
073    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
074
075    TEST_UTIL.startMiniCluster(2);
076    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
077    addResourceToRegionServerConfiguration(TEST_UTIL);
078  }
079
080  @Test
081  public void testGetMasterInfoPort() throws Exception {
082    assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getInfoServer().getPort(),
083      (int) admin.getMasterInfoPort().get());
084  }
085
086  @Test
087  public void testRegionServerOnlineConfigChange() throws Exception {
088    replaceHBaseSiteXML();
089    admin.getRegionServers().get().forEach(server -> admin.updateConfiguration(server).join());
090
091    // Check the configuration of the RegionServers
092    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
093      Configuration conf = thread.getRegionServer().getConfiguration();
094      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
095    });
096
097    restoreHBaseSiteXML();
098  }
099
100  @Test
101  public void testMasterOnlineConfigChange() throws Exception {
102    replaceHBaseSiteXML();
103    ServerName master = admin.getMaster().get();
104    admin.updateConfiguration(master).join();
105    admin.getBackupMasters().get()
106      .forEach(backupMaster -> admin.updateConfiguration(backupMaster).join());
107
108    // Check the configuration of the Masters
109    TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
110      Configuration conf = thread.getMaster().getConfiguration();
111      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
112    });
113
114    restoreHBaseSiteXML();
115  }
116
117  @Test
118  public void testAllClusterOnlineConfigChange() throws IOException {
119    replaceHBaseSiteXML();
120    admin.updateConfiguration().join();
121
122    // Check the configuration of the Masters
123    TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
124      Configuration conf = thread.getMaster().getConfiguration();
125      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
126    });
127
128    // Check the configuration of the RegionServers
129    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
130      Configuration conf = thread.getRegionServer().getConfiguration();
131      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
132    });
133
134    restoreHBaseSiteXML();
135  }
136
137  @Test
138  public void testRollWALWALWriter() throws Exception {
139    setUpforLogRolling();
140    String className = this.getClass().getName();
141    StringBuilder v = new StringBuilder(className);
142    while (v.length() < 1000) {
143      v.append(className);
144    }
145    byte[] value = Bytes.toBytes(v.toString());
146    HRegionServer regionServer = startAndWriteData(tableName, value);
147    LOG.info("after writing there are "
148      + AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files");
149
150    // flush all regions
151    for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
152      r.flush(true);
153    }
154    admin.rollWALWriter(regionServer.getServerName()).join();
155    TEST_UTIL.waitFor(5000, () -> {
156      int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
157      LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
158      return count <= 2;
159    });
160  }
161
162  private void setUpforLogRolling() {
163    // Force a region split after every 768KB
164    TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L);
165
166    // We roll the log after every 32 writes
167    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
168
169    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
170    TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
171
172    // For less frequently updated regions flush after every 2 flushes
173    TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
174
175    // We flush the cache after every 8192 bytes
176    TEST_UTIL.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192);
177
178    // Increase the amount of time between client retries
179    TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
180
181    // Reduce thread wake frequency so that other threads can get
182    // a chance to run.
183    TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
184
185    /**** configuration for testLogRollOnDatanodeDeath ****/
186    // lower the namenode & datanode heartbeat so the namenode
187    // quickly detects datanode failures
188    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
189    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
190    // the namenode might still try to choose the recently-dead datanode
191    // for a pipeline, so try to a new pipeline multiple times
192    TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
193    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
194    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
195  }
196
197  private HRegionServer startAndWriteData(TableName tableName, byte[] value) throws Exception {
198    createTableWithDefaultConf(tableName);
199    AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
200    HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
201    for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
202      Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
203      put.addColumn(FAMILY, null, value);
204      table.put(put).join();
205      if (i % 32 == 0) {
206        // After every 32 writes sleep to let the log roller run
207        try {
208          Thread.sleep(2000);
209        } catch (InterruptedException e) {
210          // continue
211        }
212      }
213    }
214    return regionServer;
215  }
216
217  @Test
218  public void testGetRegionLoads() throws Exception {
219    // Turn off the balancer
220    admin.balancerSwitch(false).join();
221    TableName[] tables = new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"),
222      TableName.valueOf(tableName.getNameAsString() + "2"),
223      TableName.valueOf(tableName.getNameAsString() + "3") };
224    createAndLoadTable(tables);
225    // Sleep to wait region server report
226    Thread
227      .sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2);
228    // Check if regions match with the regionLoad from the server
229    Collection<ServerName> servers = admin.getRegionServers().get();
230    for (ServerName serverName : servers) {
231      List<RegionInfo> regions = admin.getRegions(serverName).get();
232      checkRegionsAndRegionLoads(regions, admin.getRegionMetrics(serverName).get());
233    }
234
235    // Check if regionLoad matches the table's regions and nothing is missed
236    for (TableName table : tables) {
237      List<RegionInfo> tableRegions = admin.getRegions(table).get();
238      List<RegionMetrics> regionLoads = Lists.newArrayList();
239      for (ServerName serverName : servers) {
240        regionLoads.addAll(admin.getRegionMetrics(serverName, table).get());
241      }
242      checkRegionsAndRegionLoads(tableRegions, regionLoads);
243    }
244
245    // Check RegionLoad matches the regionLoad from ClusterStatus
246    ClusterMetrics clusterStatus = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).get();
247    assertEquals(servers.size(), clusterStatus.getLiveServerMetrics().size());
248    for (Map.Entry<ServerName, ServerMetrics> entry : clusterStatus.getLiveServerMetrics()
249      .entrySet()) {
250      ServerName sn = entry.getKey();
251      ServerMetrics sm = entry.getValue();
252      compareRegionLoads(sm.getRegionMetrics().values(), admin.getRegionMetrics(sn).get());
253    }
254    for (ServerName serverName : clusterStatus.getLiveServerMetrics().keySet()) {
255      ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(serverName);
256
257    }
258  }
259
260  @Test
261  public void testGetRegionServers() throws Exception {
262    List<ServerName> serverNames = new ArrayList<>(admin.getRegionServers(true).get());
263    assertEquals(2, serverNames.size());
264
265    List<ServerName> serversToDecom = new ArrayList<>();
266    ServerName serverToDecommission = serverNames.get(0);
267
268    serversToDecom.add(serverToDecommission);
269    admin.decommissionRegionServers(serversToDecom, false).join();
270
271    assertEquals(1, admin.getRegionServers(true).get().size());
272    assertEquals(2, admin.getRegionServers(false).get().size());
273
274    admin.recommissionRegionServer(serverToDecommission, Collections.emptyList()).join();
275
276    assertEquals(2, admin.getRegionServers(true).get().size());
277    assertEquals(2, admin.getRegionServers(false).get().size());
278  }
279
280  private void compareRegionLoads(Collection<RegionMetrics> regionLoadCluster,
281    Collection<RegionMetrics> regionLoads) {
282
283    assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match",
284      regionLoadCluster.size(), regionLoads.size());
285
286    for (RegionMetrics loadCluster : regionLoadCluster) {
287      boolean matched = false;
288      for (RegionMetrics load : regionLoads) {
289        if (Bytes.equals(loadCluster.getRegionName(), load.getRegionName())) {
290          matched = true;
291          continue;
292        }
293      }
294      assertTrue("The contents of region load from cluster and server should match", matched);
295    }
296  }
297
298  private void checkRegionsAndRegionLoads(Collection<RegionInfo> regions,
299    Collection<RegionMetrics> regionLoads) {
300
301    assertEquals("No of regions and regionloads doesn't match", regions.size(), regionLoads.size());
302
303    Map<byte[], RegionMetrics> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
304    for (RegionMetrics regionLoad : regionLoads) {
305      regionLoadMap.put(regionLoad.getRegionName(), regionLoad);
306    }
307    for (RegionInfo info : regions) {
308      assertTrue("Region not in regionLoadMap region:" + info.getRegionNameAsString()
309        + " regionMap: " + regionLoadMap, regionLoadMap.containsKey(info.getRegionName()));
310    }
311  }
312
313  private void createAndLoadTable(TableName[] tables) {
314    for (TableName table : tables) {
315      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table);
316      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
317      admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join();
318      AsyncTable<?> asyncTable = ASYNC_CONN.getTable(table);
319      List<Put> puts = new ArrayList<>();
320      for (byte[] row : HBaseTestingUtil.ROWS) {
321        puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v")));
322      }
323      asyncTable.putAll(puts).join();
324    }
325  }
326}