001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.EnumSet;
025import java.util.HashSet;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.Random;
031import java.util.Set;
032import java.util.TreeMap;
033import java.util.regex.Pattern;
034import org.apache.hadoop.hbase.ClusterMetrics;
035import org.apache.hadoop.hbase.ClusterMetrics.Option;
036import org.apache.hadoop.hbase.HBaseCluster;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.MiniHBaseCluster;
040import org.apache.hadoop.hbase.NamespaceDescriptor;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.Waiter;
044import org.apache.hadoop.hbase.client.AbstractTestUpdateConfiguration;
045import org.apache.hadoop.hbase.client.Admin;
046import org.apache.hadoop.hbase.client.BalanceRequest;
047import org.apache.hadoop.hbase.client.BalanceResponse;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
051import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
052import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
053import org.apache.hadoop.hbase.coprocessor.MasterObserver;
054import org.apache.hadoop.hbase.coprocessor.ObserverContext;
055import org.apache.hadoop.hbase.master.HMaster;
056import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
057import org.apache.hadoop.hbase.master.ServerManager;
058import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
059import org.apache.hadoop.hbase.net.Address;
060import org.junit.Rule;
061import org.junit.rules.TestName;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
066import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
067
068public abstract class TestRSGroupsBase extends AbstractTestUpdateConfiguration {
069  protected static final Logger LOG = LoggerFactory.getLogger(TestRSGroupsBase.class);
070
071  // shared
072  protected final static String groupPrefix = "Group";
073  protected final static String tablePrefix = "Group";
074  protected final static Random rand = new Random();
075
076  // shared, cluster type specific
077  protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
078  protected static Admin admin;
079  protected static HBaseCluster cluster;
080  protected static RSGroupAdmin rsGroupAdmin;
081  protected static HMaster master;
082  protected boolean INIT = false;
083  protected static RSGroupAdminEndpoint rsGroupAdminEndpoint;
084  protected static CPMasterObserver observer;
085
086  public final static long WAIT_TIMEOUT = 60000;
087  public final static int NUM_SLAVES_BASE = 4; // number of slaves for the smallest cluster
088  public static int NUM_DEAD_SERVERS = 0;
089
090  // Per test variables
091  @Rule
092  public TestName name = new TestName();
093  protected TableName tableName;
094
095  public static void setUpTestBeforeClass() throws Exception {
096    TEST_UTIL.getConfiguration().setFloat("hbase.master.balancer.stochastic.tableSkewCost", 6000);
097    TEST_UTIL.getConfiguration().set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
098      RSGroupBasedLoadBalancer.class.getName());
099    TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
100      RSGroupAdminEndpoint.class.getName() + "," + CPMasterObserver.class.getName());
101    TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
102    TEST_UTIL.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
103      NUM_SLAVES_BASE);
104    TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
105    initialize();
106  }
107
108  protected static void initialize() throws Exception {
109    admin = TEST_UTIL.getAdmin();
110    cluster = TEST_UTIL.getHBaseCluster();
111    master = TEST_UTIL.getMiniHBaseCluster().getMaster();
112
113    // wait for balancer to come online
114    TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
115      @Override
116      public boolean evaluate() throws Exception {
117        return master.isInitialized()
118          && ((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline();
119      }
120    });
121    admin.balancerSwitch(false, true);
122    rsGroupAdmin = new VerifyingRSGroupAdminClient(
123      new RSGroupAdminClient(TEST_UTIL.getConnection()), TEST_UTIL.getConfiguration());
124    MasterCoprocessorHost host = master.getMasterCoprocessorHost();
125    observer = (CPMasterObserver) host.findCoprocessor(CPMasterObserver.class.getName());
126    rsGroupAdminEndpoint =
127      (RSGroupAdminEndpoint) host.findCoprocessor(RSGroupAdminEndpoint.class.getName());
128  }
129
130  public static void tearDownAfterClass() throws Exception {
131    TEST_UTIL.shutdownMiniCluster();
132  }
133
134  public void setUpBeforeMethod() throws Exception {
135    LOG.info(name.getMethodName());
136    tableName = TableName.valueOf(tablePrefix + "_" + name.getMethodName());
137    if (!INIT) {
138      INIT = true;
139      tearDownAfterMethod();
140    }
141    observer.resetFlags();
142  }
143
144  public void tearDownAfterMethod() throws Exception {
145    deleteTableIfNecessary();
146    deleteNamespaceIfNecessary();
147    deleteGroups();
148
149    for (ServerName sn : admin.listDecommissionedRegionServers()) {
150      admin.recommissionRegionServer(sn, null);
151    }
152    assertTrue(admin.listDecommissionedRegionServers().isEmpty());
153
154    int missing = NUM_SLAVES_BASE - getNumServers();
155    LOG.info("Restoring servers: " + missing);
156    for (int i = 0; i < missing; i++) {
157      ((MiniHBaseCluster) cluster).startRegionServer();
158    }
159
160    rsGroupAdmin.addRSGroup("master");
161    ServerName masterServerName = ((MiniHBaseCluster) cluster).getMaster().getServerName();
162
163    try {
164      rsGroupAdmin.moveServers(Sets.newHashSet(masterServerName.getAddress()), "master");
165    } catch (Exception ex) {
166      LOG.warn("Got this on setup, FYI", ex);
167    }
168    assertTrue(observer.preMoveServersCalled);
169    TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
170      @Override
171      public boolean evaluate() throws Exception {
172        LOG.info("Waiting for cleanup to finish " + rsGroupAdmin.listRSGroups());
173        // Might be greater since moving servers back to default
174        // is after starting a server
175
176        return rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP).getServers().size()
177            == NUM_SLAVES_BASE;
178      }
179    });
180  }
181
182  protected RSGroupInfo addGroup(String groupName, int serverCount)
183    throws IOException, InterruptedException {
184    RSGroupInfo defaultInfo = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP);
185    rsGroupAdmin.addRSGroup(groupName);
186    Set<Address> set = new HashSet<>();
187    for (Address server : defaultInfo.getServers()) {
188      if (set.size() == serverCount) {
189        break;
190      }
191      set.add(server);
192    }
193    rsGroupAdmin.moveServers(set, groupName);
194    RSGroupInfo result = rsGroupAdmin.getRSGroupInfo(groupName);
195    return result;
196  }
197
198  protected void removeGroup(String groupName) throws IOException {
199    RSGroupInfo groupInfo = rsGroupAdmin.getRSGroupInfo(groupName);
200    rsGroupAdmin.moveTables(groupInfo.getTables(), RSGroupInfo.DEFAULT_GROUP);
201    rsGroupAdmin.moveServers(groupInfo.getServers(), RSGroupInfo.DEFAULT_GROUP);
202    rsGroupAdmin.removeRSGroup(groupName);
203  }
204
205  protected void deleteTableIfNecessary() throws IOException {
206    for (TableDescriptor desc : TEST_UTIL.getAdmin()
207      .listTableDescriptors(Pattern.compile(tablePrefix + ".*"))) {
208      TEST_UTIL.deleteTable(desc.getTableName());
209    }
210  }
211
212  protected void deleteNamespaceIfNecessary() throws IOException {
213    for (NamespaceDescriptor desc : TEST_UTIL.getAdmin().listNamespaceDescriptors()) {
214      if (desc.getName().startsWith(tablePrefix)) {
215        admin.deleteNamespace(desc.getName());
216      }
217    }
218  }
219
220  protected void deleteGroups() throws IOException {
221    RSGroupAdmin groupAdmin = new RSGroupAdminClient(TEST_UTIL.getConnection());
222    for (RSGroupInfo group : groupAdmin.listRSGroups()) {
223      if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
224        groupAdmin.moveTables(group.getTables(), RSGroupInfo.DEFAULT_GROUP);
225        groupAdmin.moveServers(group.getServers(), RSGroupInfo.DEFAULT_GROUP);
226        groupAdmin.removeRSGroup(group.getName());
227      }
228    }
229  }
230
231  protected Map<TableName, List<String>> getTableRegionMap() throws IOException {
232    Map<TableName, List<String>> map = Maps.newTreeMap();
233    Map<TableName, Map<ServerName, List<String>>> tableServerRegionMap = getTableServerRegionMap();
234    for (TableName tableName : tableServerRegionMap.keySet()) {
235      if (!map.containsKey(tableName)) {
236        map.put(tableName, new LinkedList<>());
237      }
238      for (List<String> subset : tableServerRegionMap.get(tableName).values()) {
239        map.get(tableName).addAll(subset);
240      }
241    }
242    return map;
243  }
244
245  protected Map<TableName, Map<ServerName, List<String>>> getTableServerRegionMap()
246    throws IOException {
247    Map<TableName, Map<ServerName, List<String>>> map = Maps.newTreeMap();
248    Admin admin = TEST_UTIL.getAdmin();
249    ClusterMetrics metrics =
250      admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.SERVERS_NAME));
251    for (ServerName serverName : metrics.getServersName()) {
252      for (RegionInfo region : admin.getRegions(serverName)) {
253        TableName tableName = region.getTable();
254        map.computeIfAbsent(tableName, k -> new TreeMap<>())
255          .computeIfAbsent(serverName, k -> new ArrayList<>()).add(region.getRegionNameAsString());
256      }
257    }
258    return map;
259  }
260
261  // return the real number of region servers, excluding the master embedded region server in 2.0+
262  protected int getNumServers() throws IOException {
263    ClusterMetrics status = admin.getClusterMetrics(EnumSet.of(Option.MASTER, Option.LIVE_SERVERS));
264    ServerName masterName = status.getMasterName();
265    int count = 0;
266    for (ServerName sn : status.getLiveServerMetrics().keySet()) {
267      if (!sn.equals(masterName)) {
268        count++;
269      }
270    }
271    return count;
272  }
273
274  protected String getGroupName(String baseName) {
275    return groupPrefix + "_" + baseName + "_" + rand.nextInt(Integer.MAX_VALUE);
276  }
277
278  /**
279   * The server name in group does not contain the start code, this method will find out the start
280   * code and construct the ServerName object.
281   */
282  protected ServerName getServerName(Address addr) {
283    return TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().stream()
284      .map(t -> t.getRegionServer().getServerName()).filter(sn -> sn.getAddress().equals(addr))
285      .findFirst().get();
286  }
287
288  public static class CPMasterObserver implements MasterCoprocessor, MasterObserver {
289    boolean preBalanceRSGroupCalled = false;
290    boolean postBalanceRSGroupCalled = false;
291    boolean preMoveServersCalled = false;
292    boolean postMoveServersCalled = false;
293    boolean preMoveTablesCalled = false;
294    boolean postMoveTablesCalled = false;
295    boolean preAddRSGroupCalled = false;
296    boolean postAddRSGroupCalled = false;
297    boolean preRemoveRSGroupCalled = false;
298    boolean postRemoveRSGroupCalled = false;
299    boolean preRemoveServersCalled = false;
300    boolean postRemoveServersCalled = false;
301    boolean preMoveServersAndTables = false;
302    boolean postMoveServersAndTables = false;
303    boolean preRenameRSGroupCalled = false;
304    boolean postRenameRSGroupCalled = false;
305
306    public void resetFlags() {
307      preBalanceRSGroupCalled = false;
308      postBalanceRSGroupCalled = false;
309      preMoveServersCalled = false;
310      postMoveServersCalled = false;
311      preMoveTablesCalled = false;
312      postMoveTablesCalled = false;
313      preAddRSGroupCalled = false;
314      postAddRSGroupCalled = false;
315      preRemoveRSGroupCalled = false;
316      postRemoveRSGroupCalled = false;
317      preRemoveServersCalled = false;
318      postRemoveServersCalled = false;
319      preMoveServersAndTables = false;
320      postMoveServersAndTables = false;
321      preRenameRSGroupCalled = false;
322      postRenameRSGroupCalled = false;
323    }
324
325    @Override
326    public Optional<MasterObserver> getMasterObserver() {
327      return Optional.of(this);
328    }
329
330    @Override
331    public void preMoveServersAndTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
332      Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {
333      preMoveServersAndTables = true;
334    }
335
336    @Override
337    public void postMoveServersAndTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
338      Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {
339      postMoveServersAndTables = true;
340    }
341
342    @Override
343    public void preRemoveServers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
344      Set<Address> servers) throws IOException {
345      preRemoveServersCalled = true;
346    }
347
348    @Override
349    public void postRemoveServers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
350      Set<Address> servers) throws IOException {
351      postRemoveServersCalled = true;
352    }
353
354    @Override
355    public void preRemoveRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
356      String name) throws IOException {
357      preRemoveRSGroupCalled = true;
358    }
359
360    @Override
361    public void postRemoveRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
362      String name) throws IOException {
363      postRemoveRSGroupCalled = true;
364    }
365
366    @Override
367    public void preAddRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx, String name)
368      throws IOException {
369      preAddRSGroupCalled = true;
370    }
371
372    @Override
373    public void postAddRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx, String name)
374      throws IOException {
375      postAddRSGroupCalled = true;
376    }
377
378    @Override
379    public void preMoveTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
380      Set<TableName> tables, String targetGroup) throws IOException {
381      preMoveTablesCalled = true;
382    }
383
384    @Override
385    public void postMoveTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
386      Set<TableName> tables, String targetGroup) throws IOException {
387      postMoveTablesCalled = true;
388    }
389
390    @Override
391    public void preMoveServers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
392      Set<Address> servers, String targetGroup) throws IOException {
393      preMoveServersCalled = true;
394    }
395
396    @Override
397    public void postMoveServers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
398      Set<Address> servers, String targetGroup) throws IOException {
399      postMoveServersCalled = true;
400    }
401
402    @Override
403    public void preBalanceRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
404      String groupName, BalanceRequest request) throws IOException {
405      preBalanceRSGroupCalled = true;
406    }
407
408    @Override
409    public void postBalanceRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
410      String groupName, BalanceRequest request, BalanceResponse response) throws IOException {
411      postBalanceRSGroupCalled = true;
412    }
413
414    @Override
415    public void preRenameRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, String oldName,
416      String newName) throws IOException {
417      preRenameRSGroupCalled = true;
418    }
419
420    @Override
421    public void postRenameRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, String oldName,
422      String newName) throws IOException {
423      postRenameRSGroupCalled = true;
424    }
425  }
426
427}