001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.nio.file.FileSystems;
026import java.nio.file.Files;
027import java.nio.file.Path;
028import java.nio.file.StandardCopyOption;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.EnumSet;
032import java.util.List;
033import java.util.Map;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ClusterMetrics;
036import org.apache.hadoop.hbase.ClusterMetrics.Option;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.RegionMetrics;
041import org.apache.hadoop.hbase.ServerMetrics;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.regionserver.HRegion;
045import org.apache.hadoop.hbase.regionserver.HRegionServer;
046import org.apache.hadoop.hbase.testclassification.ClientTests;
047import org.apache.hadoop.hbase.testclassification.LargeTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.junit.runner.RunWith;
055import org.junit.runners.Parameterized;
056
057import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
058import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
059
060@RunWith(Parameterized.class)
061@Category({ ClientTests.class, LargeTests.class })
062public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066      HBaseClassTestRule.forClass(TestAsyncClusterAdminApi.class);
067
068  private final Path cnfPath = FileSystems.getDefault().getPath("target/test-classes/hbase-site.xml");
069  private final Path cnf2Path = FileSystems.getDefault().getPath("target/test-classes/hbase-site2.xml");
070  private final Path cnf3Path = FileSystems.getDefault().getPath("target/test-classes/hbase-site3.xml");
071
072  @BeforeClass
073  public static void setUpBeforeClass() throws Exception {
074    TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, 0);
075    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
076    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
077    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
078    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
079    TEST_UTIL.startMiniCluster(2);
080    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
081  }
082
083  @Test
084  public void testGetMasterInfoPort() throws Exception {
085    assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getInfoServer().getPort(), (int) admin
086        .getMasterInfoPort().get());
087  }
088
089  @Test
090  public void testRegionServerOnlineConfigChange() throws Exception {
091    replaceHBaseSiteXML();
092    admin.getRegionServers().get().forEach(server -> admin.updateConfiguration(server).join());
093
094    // Check the configuration of the RegionServers
095    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
096      Configuration conf = thread.getRegionServer().getConfiguration();
097      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
098    });
099
100    restoreHBaseSiteXML();
101  }
102
103  @Test
104  public void testMasterOnlineConfigChange() throws Exception {
105    replaceHBaseSiteXML();
106    ServerName master = admin.getMaster().get();
107    admin.updateConfiguration(master).join();
108    admin.getBackupMasters().get()
109        .forEach(backupMaster -> admin.updateConfiguration(backupMaster).join());
110
111    // Check the configuration of the Masters
112    TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
113      Configuration conf = thread.getMaster().getConfiguration();
114      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
115    });
116
117    restoreHBaseSiteXML();
118  }
119
120  @Test
121  public void testAllClusterOnlineConfigChange() throws IOException {
122    replaceHBaseSiteXML();
123    admin.updateConfiguration().join();
124
125    // Check the configuration of the Masters
126    TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
127      Configuration conf = thread.getMaster().getConfiguration();
128      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
129    });
130
131    // Check the configuration of the RegionServers
132    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
133      Configuration conf = thread.getRegionServer().getConfiguration();
134      assertEquals(1000, conf.getInt("hbase.custom.config", 0));
135    });
136
137    restoreHBaseSiteXML();
138  }
139
140  private void replaceHBaseSiteXML() throws IOException {
141    // make a backup of hbase-site.xml
142    Files.copy(cnfPath, cnf3Path, StandardCopyOption.REPLACE_EXISTING);
143    // update hbase-site.xml by overwriting it
144    Files.copy(cnf2Path, cnfPath, StandardCopyOption.REPLACE_EXISTING);
145  }
146
147  private void restoreHBaseSiteXML() throws IOException {
148    // restore hbase-site.xml
149    Files.copy(cnf3Path, cnfPath, StandardCopyOption.REPLACE_EXISTING);
150  }
151
152  @Test
153  public void testRollWALWALWriter() throws Exception {
154    setUpforLogRolling();
155    String className = this.getClass().getName();
156    StringBuilder v = new StringBuilder(className);
157    while (v.length() < 1000) {
158      v.append(className);
159    }
160    byte[] value = Bytes.toBytes(v.toString());
161    HRegionServer regionServer = startAndWriteData(tableName, value);
162    LOG.info("after writing there are "
163        + AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files");
164
165    // flush all regions
166    for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
167      r.flush(true);
168    }
169    admin.rollWALWriter(regionServer.getServerName()).join();
170    int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
171    LOG.info("after flushing all regions and rolling logs there are " +
172        count + " log files");
173    assertTrue(("actual count: " + count), count <= 2);
174  }
175
176  private void setUpforLogRolling() {
177    // Force a region split after every 768KB
178    TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE,
179        768L * 1024L);
180
181    // We roll the log after every 32 writes
182    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
183
184    TEST_UTIL.getConfiguration().setInt(
185        "hbase.regionserver.logroll.errors.tolerated", 2);
186    TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
187
188    // For less frequently updated regions flush after every 2 flushes
189    TEST_UTIL.getConfiguration().setInt(
190        "hbase.hregion.memstore.optionalflushcount", 2);
191
192    // We flush the cache after every 8192 bytes
193    TEST_UTIL.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
194        8192);
195
196    // Increase the amount of time between client retries
197    TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
198
199    // Reduce thread wake frequency so that other threads can get
200    // a chance to run.
201    TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY,
202        2 * 1000);
203
204    /**** configuration for testLogRollOnDatanodeDeath ****/
205    // lower the namenode & datanode heartbeat so the namenode
206    // quickly detects datanode failures
207    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
208    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
209    // the namenode might still try to choose the recently-dead datanode
210    // for a pipeline, so try to a new pipeline multiple times
211    TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
212    TEST_UTIL.getConfiguration().setInt(
213        "hbase.regionserver.hlog.tolerable.lowreplication", 2);
214    TEST_UTIL.getConfiguration().setInt(
215        "hbase.regionserver.hlog.lowreplication.rolllimit", 3);
216  }
217
218  private HRegionServer startAndWriteData(TableName tableName, byte[] value) throws Exception {
219    createTableWithDefaultConf(tableName);
220    AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
221    HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
222    for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
223      Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
224      put.addColumn(FAMILY, null, value);
225      table.put(put).join();
226      if (i % 32 == 0) {
227        // After every 32 writes sleep to let the log roller run
228        try {
229          Thread.sleep(2000);
230        } catch (InterruptedException e) {
231          // continue
232        }
233      }
234    }
235    return regionServer;
236  }
237
238  @Test
239  public void testGetRegionLoads() throws Exception {
240    // Turn off the balancer
241    admin.balancerSwitch(false).join();
242    TableName[] tables =
243        new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"),
244            TableName.valueOf(tableName.getNameAsString() + "2"),
245            TableName.valueOf(tableName.getNameAsString() + "3") };
246    createAndLoadTable(tables);
247    // Sleep to wait region server report
248    Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2);
249    // Check if regions match with the regionLoad from the server
250    Collection<ServerName> servers = admin.getRegionServers().get();
251    for (ServerName serverName : servers) {
252      List<RegionInfo> regions = admin.getRegions(serverName).get();
253      checkRegionsAndRegionLoads(regions, admin.getRegionMetrics(serverName).get());
254    }
255
256    // Check if regionLoad matches the table's regions and nothing is missed
257    for (TableName table : tables) {
258      List<RegionInfo> tableRegions = admin.getRegions(table).get();
259      List<RegionMetrics> regionLoads = Lists.newArrayList();
260      for (ServerName serverName : servers) {
261        regionLoads.addAll(admin.getRegionMetrics(serverName, table).get());
262      }
263      checkRegionsAndRegionLoads(tableRegions, regionLoads);
264    }
265
266    // Check RegionLoad matches the regionLoad from ClusterStatus
267    ClusterMetrics clusterStatus = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).get();
268    for (Map.Entry<ServerName, ServerMetrics> entry :
269      clusterStatus.getLiveServerMetrics().entrySet()) {
270      ServerName sn = entry.getKey();
271      ServerMetrics sm = entry.getValue();
272      compareRegionLoads(sm.getRegionMetrics().values(), admin.getRegionMetrics(sn).get());
273    }
274    for (ServerName serverName : clusterStatus.getLiveServerMetrics().keySet()) {
275      ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(serverName);
276
277    }
278  }
279
280  private void compareRegionLoads(Collection<RegionMetrics> regionLoadCluster,
281      Collection<RegionMetrics> regionLoads) {
282
283    assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match",
284      regionLoadCluster.size(), regionLoads.size());
285
286    for (RegionMetrics loadCluster : regionLoadCluster) {
287      boolean matched = false;
288      for (RegionMetrics load : regionLoads) {
289        if (Bytes.equals(loadCluster.getRegionName(), load.getRegionName())) {
290          matched = true;
291          continue;
292        }
293      }
294      assertTrue("The contents of region load from cluster and server should match", matched);
295    }
296  }
297
298  private void checkRegionsAndRegionLoads(Collection<RegionInfo> regions,
299      Collection<RegionMetrics> regionLoads) {
300
301    assertEquals("No of regions and regionloads doesn't match", regions.size(), regionLoads.size());
302
303    Map<byte[], RegionMetrics> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
304    for (RegionMetrics regionLoad : regionLoads) {
305      regionLoadMap.put(regionLoad.getRegionName(), regionLoad);
306    }
307    for (RegionInfo info : regions) {
308      assertTrue("Region not in regionLoadMap region:" + info.getRegionNameAsString()
309          + " regionMap: " + regionLoadMap, regionLoadMap.containsKey(info.getRegionName()));
310    }
311  }
312
313  private void createAndLoadTable(TableName[] tables) {
314    for (TableName table : tables) {
315      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table);
316      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
317      admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join();
318      AsyncTable<?> asyncTable = ASYNC_CONN.getTable(table);
319      List<Put> puts = new ArrayList<>();
320      for (byte[] row : HBaseTestingUtility.ROWS) {
321        puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v")));
322      }
323      asyncTable.putAll(puts).join();
324    }
325  }
326}