001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.hasItem;
022import static org.hamcrest.Matchers.hasItems;
023import static org.hamcrest.Matchers.hasSize;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertThrows;
026
027import java.io.IOException;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.List;
031import java.util.Set;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HRegionLocation;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.master.HMaster;
039import org.apache.hadoop.hbase.regionserver.BootstrapNodeManager;
040import org.apache.hadoop.hbase.regionserver.RSRpcServices;
041import org.apache.hadoop.hbase.security.User;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.junit.jupiter.api.AfterAll;
045import org.junit.jupiter.api.AfterEach;
046import org.junit.jupiter.api.BeforeAll;
047import org.junit.jupiter.api.BeforeEach;
048import org.junit.jupiter.api.Tag;
049import org.junit.jupiter.api.Test;
050
051import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
052import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
053
054@Tag(MediumTests.TAG)
055@Tag(ClientTests.TAG)
056public class TestRpcConnectionRegistry {
057
058  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
059
060  private RpcConnectionRegistry registry;
061
062  @BeforeAll
063  public static void setUpBeforeClass() throws Exception {
064    // allow refresh immediately so we will switch to use region servers soon.
065    UTIL.getConfiguration().setLong(RpcConnectionRegistry.INITIAL_REFRESH_DELAY_SECS, 1);
066    UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
067    UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
068    UTIL.getConfiguration().setLong(BootstrapNodeManager.REQUEST_MASTER_MIN_INTERVAL_SECS, 1);
069    UTIL.startMiniCluster(3);
070    HBaseTestingUtil.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
071  }
072
073  @AfterAll
074  public static void tearDownAfterClass() throws Exception {
075    UTIL.shutdownMiniCluster();
076  }
077
078  @BeforeEach
079  public void setUp() throws IOException {
080    registry = new RpcConnectionRegistry(UTIL.getConfiguration(), User.getCurrent());
081  }
082
083  @AfterEach
084  public void tearDown() throws IOException {
085    Closeables.close(registry, true);
086  }
087
088  private void setMaxNodeCount(int count) {
089    UTIL.getMiniHBaseCluster().getMasterThreads().stream()
090      .map(t -> t.getMaster().getConfiguration())
091      .forEach(conf -> conf.setInt(RSRpcServices.CLIENT_BOOTSTRAP_NODE_LIMIT, count));
092    UTIL.getMiniHBaseCluster().getRegionServerThreads().stream()
093      .map(t -> t.getRegionServer().getConfiguration())
094      .forEach(conf -> conf.setInt(RSRpcServices.CLIENT_BOOTSTRAP_NODE_LIMIT, count));
095  }
096
097  @Test
098  public void testRegistryRPCs() throws Exception {
099    HMaster activeMaster = UTIL.getHBaseCluster().getMaster();
100    // should only contains the active master
101    Set<ServerName> initialParsedServers = registry.getParsedServers();
102    assertThat(initialParsedServers, hasSize(1));
103    // no start code in configuration
104    assertThat(initialParsedServers,
105      hasItem(ServerName.valueOf(activeMaster.getServerName().getHostname(),
106        activeMaster.getServerName().getPort(), -1)));
107    // Since our initial delay is 1 second, finally we should have refreshed the endpoints
108    UTIL.waitFor(5000, () -> registry.getParsedServers()
109      .contains(activeMaster.getServerManager().getOnlineServersList().get(0)));
110    Set<ServerName> parsedServers = registry.getParsedServers();
111    assertThat(parsedServers,
112      hasSize(activeMaster.getServerManager().getOnlineServersList().size()));
113    assertThat(parsedServers,
114      hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0])));
115
116    // Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
117    // because not all replicas had made it up before test started.
118    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
119
120    assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
121    assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
122    List<HRegionLocation> metaLocations =
123      Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
124    List<HRegionLocation> actualMetaLocations = activeMaster.getMetaLocations();
125    Collections.sort(metaLocations);
126    Collections.sort(actualMetaLocations);
127    assertEquals(actualMetaLocations, metaLocations);
128
129    // test that the node count config works
130    setMaxNodeCount(1);
131    UTIL.waitFor(10000, () -> registry.getParsedServers().size() == 1);
132  }
133
134  /**
135   * Make sure that we can create the RpcClient when there are broken servers in the bootstrap nodes
136   */
137  @Test
138  public void testBrokenBootstrapNodes() throws Exception {
139    Configuration conf = new Configuration(UTIL.getConfiguration());
140    String currentMasterAddrs = Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY));
141    HMaster activeMaster = UTIL.getHBaseCluster().getMaster();
142    String clusterId = activeMaster.getClusterId();
143    // Add a non-working master
144    ServerName badServer = ServerName.valueOf("localhost", 1234, -1);
145    conf.set(RpcConnectionRegistry.BOOTSTRAP_NODES, badServer.toShortString());
146    // only a bad server, the request should fail
147    try (RpcConnectionRegistry reg = new RpcConnectionRegistry(conf, User.getCurrent())) {
148      assertThrows(IOException.class, () -> reg.getParsedServers());
149    }
150
151    conf.set(RpcConnectionRegistry.BOOTSTRAP_NODES,
152      badServer.toShortString() + ", " + currentMasterAddrs);
153    // we will choose bootstrap node randomly so here we need to test it multiple times to make sure
154    // that we can skip the broken node
155    for (int i = 0; i < 10; i++) {
156      try (RpcConnectionRegistry reg = new RpcConnectionRegistry(conf, User.getCurrent())) {
157        assertEquals(clusterId, reg.getClusterId().get());
158      }
159    }
160  }
161}