001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.EnumSet;
029import java.util.List;
030import java.util.Map;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.ClusterMetrics.Option;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.RegionMetrics;
038import org.apache.hadoop.hbase.ServerMetrics;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.regionserver.HRegion;
042import org.apache.hadoop.hbase.regionserver.HRegionServer;
043import org.apache.hadoop.hbase.testclassification.ClientTests;
044import org.apache.hadoop.hbase.testclassification.LargeTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
047import org.junit.BeforeClass;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.runner.RunWith;
052import org.junit.runners.Parameterized;
053
054import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
055import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
056
057@RunWith(Parameterized.class)
058@Category({ ClientTests.class, LargeTests.class })
059public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestAsyncClusterAdminApi.class);
064
065  @BeforeClass
066  public static void setUpBeforeClass() throws Exception {
067
068    setUpConfigurationFiles(TEST_UTIL);
069    TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, 0);
070    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
071    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
072    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
073    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
074
075    TEST_UTIL.startMiniCluster(2);
076    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
077    addResourceToRegionServerConfiguration(TEST_UTIL);
078  }
079
080  @Test
081  public void testGetMasterInfoPort() throws Exception {
082    assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getInfoServer().getPort(),
083      (int) admin.getMasterInfoPort().get());
084  }
085
086  @Test
087  public void testRegionServerOnlineConfigChange() throws Exception {
088    replaceHBaseSiteXML();
089    admin.getRegionServers().get().forEach(server -> admin.updateConfiguration(server).join());
090
091    // Check the configuration of the RegionServers
092    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
093      Configuration conf = thread.getRegionServer().getConfiguration();
094      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
095    });
096
097    restoreHBaseSiteXML();
098  }
099
100  @Test
101  public void testMasterOnlineConfigChange() throws Exception {
102    replaceHBaseSiteXML();
103    ServerName master = admin.getMaster().get();
104    admin.updateConfiguration(master).join();
105    admin.getBackupMasters().get()
106      .forEach(backupMaster -> admin.updateConfiguration(backupMaster).join());
107
108    // Check the configuration of the Masters
109    TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
110      Configuration conf = thread.getMaster().getConfiguration();
111      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
112    });
113
114    restoreHBaseSiteXML();
115  }
116
117  @Test
118  public void testAllClusterOnlineConfigChange() throws IOException {
119    replaceHBaseSiteXML();
120    admin.updateConfiguration().join();
121
122    // Check the configuration of the Masters
123    TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
124      Configuration conf = thread.getMaster().getConfiguration();
125      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
126    });
127
128    // Check the configuration of the RegionServers
129    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
130      Configuration conf = thread.getRegionServer().getConfiguration();
131      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
132    });
133
134    restoreHBaseSiteXML();
135  }
136
137  @Test
138  public void testRollWALWALWriter() throws Exception {
139    setUpforLogRolling();
140    String className = this.getClass().getName();
141    StringBuilder v = new StringBuilder(className);
142    while (v.length() < 1000) {
143      v.append(className);
144    }
145    byte[] value = Bytes.toBytes(v.toString());
146    HRegionServer regionServer = startAndWriteData(tableName, value);
147    LOG.info("after writing there are "
148      + AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files");
149
150    // flush all regions
151    for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
152      r.flush(true);
153    }
154    admin.rollWALWriter(regionServer.getServerName()).join();
155    int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
156    LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
157    assertTrue(("actual count: " + count), count <= 2);
158  }
159
160  private void setUpforLogRolling() {
161    // Force a region split after every 768KB
162    TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L);
163
164    // We roll the log after every 32 writes
165    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
166
167    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
168    TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
169
170    // For less frequently updated regions flush after every 2 flushes
171    TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
172
173    // We flush the cache after every 8192 bytes
174    TEST_UTIL.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192);
175
176    // Increase the amount of time between client retries
177    TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
178
179    // Reduce thread wake frequency so that other threads can get
180    // a chance to run.
181    TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
182
183    /**** configuration for testLogRollOnDatanodeDeath ****/
184    // lower the namenode & datanode heartbeat so the namenode
185    // quickly detects datanode failures
186    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
187    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
188    // the namenode might still try to choose the recently-dead datanode
189    // for a pipeline, so try to a new pipeline multiple times
190    TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
191    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
192    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
193  }
194
195  private HRegionServer startAndWriteData(TableName tableName, byte[] value) throws Exception {
196    createTableWithDefaultConf(tableName);
197    AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
198    HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
199    for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
200      Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
201      put.addColumn(FAMILY, null, value);
202      table.put(put).join();
203      if (i % 32 == 0) {
204        // After every 32 writes sleep to let the log roller run
205        try {
206          Thread.sleep(2000);
207        } catch (InterruptedException e) {
208          // continue
209        }
210      }
211    }
212    return regionServer;
213  }
214
215  @Test
216  public void testGetRegionLoads() throws Exception {
217    // Turn off the balancer
218    admin.balancerSwitch(false).join();
219    TableName[] tables = new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"),
220      TableName.valueOf(tableName.getNameAsString() + "2"),
221      TableName.valueOf(tableName.getNameAsString() + "3") };
222    createAndLoadTable(tables);
223    // Sleep to wait region server report
224    Thread
225      .sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2);
226    // Check if regions match with the regionLoad from the server
227    Collection<ServerName> servers = admin.getRegionServers().get();
228    for (ServerName serverName : servers) {
229      List<RegionInfo> regions = admin.getRegions(serverName).get();
230      checkRegionsAndRegionLoads(regions, admin.getRegionMetrics(serverName).get());
231    }
232
233    // Check if regionLoad matches the table's regions and nothing is missed
234    for (TableName table : tables) {
235      List<RegionInfo> tableRegions = admin.getRegions(table).get();
236      List<RegionMetrics> regionLoads = Lists.newArrayList();
237      for (ServerName serverName : servers) {
238        regionLoads.addAll(admin.getRegionMetrics(serverName, table).get());
239      }
240      checkRegionsAndRegionLoads(tableRegions, regionLoads);
241    }
242
243    // Check RegionLoad matches the regionLoad from ClusterStatus
244    ClusterMetrics clusterStatus = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).get();
245    assertEquals(servers.size(), clusterStatus.getLiveServerMetrics().size());
246    for (Map.Entry<ServerName, ServerMetrics> entry : clusterStatus.getLiveServerMetrics()
247      .entrySet()) {
248      ServerName sn = entry.getKey();
249      ServerMetrics sm = entry.getValue();
250      compareRegionLoads(sm.getRegionMetrics().values(), admin.getRegionMetrics(sn).get());
251    }
252    for (ServerName serverName : clusterStatus.getLiveServerMetrics().keySet()) {
253      ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(serverName);
254
255    }
256  }
257
258  @Test
259  public void testGetRegionServers() throws Exception {
260    List<ServerName> serverNames = new ArrayList<>(admin.getRegionServers(true).get());
261    assertEquals(2, serverNames.size());
262
263    List<ServerName> serversToDecom = new ArrayList<>();
264    ServerName serverToDecommission = serverNames.get(0);
265
266    serversToDecom.add(serverToDecommission);
267    admin.decommissionRegionServers(serversToDecom, false).join();
268
269    assertEquals(1, admin.getRegionServers(true).get().size());
270    assertEquals(2, admin.getRegionServers(false).get().size());
271
272    admin.recommissionRegionServer(serverToDecommission, Collections.emptyList()).join();
273
274    assertEquals(2, admin.getRegionServers(true).get().size());
275    assertEquals(2, admin.getRegionServers(false).get().size());
276  }
277
278  private void compareRegionLoads(Collection<RegionMetrics> regionLoadCluster,
279    Collection<RegionMetrics> regionLoads) {
280
281    assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match",
282      regionLoadCluster.size(), regionLoads.size());
283
284    for (RegionMetrics loadCluster : regionLoadCluster) {
285      boolean matched = false;
286      for (RegionMetrics load : regionLoads) {
287        if (Bytes.equals(loadCluster.getRegionName(), load.getRegionName())) {
288          matched = true;
289          continue;
290        }
291      }
292      assertTrue("The contents of region load from cluster and server should match", matched);
293    }
294  }
295
296  private void checkRegionsAndRegionLoads(Collection<RegionInfo> regions,
297    Collection<RegionMetrics> regionLoads) {
298
299    assertEquals("No of regions and regionloads doesn't match", regions.size(), regionLoads.size());
300
301    Map<byte[], RegionMetrics> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
302    for (RegionMetrics regionLoad : regionLoads) {
303      regionLoadMap.put(regionLoad.getRegionName(), regionLoad);
304    }
305    for (RegionInfo info : regions) {
306      assertTrue("Region not in regionLoadMap region:" + info.getRegionNameAsString()
307        + " regionMap: " + regionLoadMap, regionLoadMap.containsKey(info.getRegionName()));
308    }
309  }
310
311  private void createAndLoadTable(TableName[] tables) {
312    for (TableName table : tables) {
313      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table);
314      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
315      admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join();
316      AsyncTable<?> asyncTable = ASYNC_CONN.getTable(table);
317      List<Put> puts = new ArrayList<>();
318      for (byte[] row : HBaseTestingUtility.ROWS) {
319        puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v")));
320      }
321      asyncTable.putAll(puts).join();
322    }
323  }
324}