001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import java.io.ByteArrayInputStream;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.LinkedList;
029import java.util.List;
030import java.util.Map;
031import java.util.Optional;
032import java.util.OptionalLong;
033import java.util.Set;
034import java.util.SortedSet;
035import java.util.TreeSet;
036import java.util.function.Function;
037import java.util.stream.Collectors;
038import org.apache.commons.lang3.StringUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Coprocessor;
041import org.apache.hadoop.hbase.DoNotRetryIOException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.NamespaceDescriptor;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableDescriptors;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.AsyncClusterConnection;
048import org.apache.hadoop.hbase.client.AsyncTable;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Delete;
052import org.apache.hadoop.hbase.client.Get;
053import org.apache.hadoop.hbase.client.Mutation;
054import org.apache.hadoop.hbase.client.Put;
055import org.apache.hadoop.hbase.client.RegionInfo;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.ResultScanner;
058import org.apache.hadoop.hbase.client.TableDescriptor;
059import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
060import org.apache.hadoop.hbase.client.TableState;
061import org.apache.hadoop.hbase.constraint.ConstraintException;
062import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
063import org.apache.hadoop.hbase.exceptions.DeserializationException;
064import org.apache.hadoop.hbase.master.LoadBalancer;
065import org.apache.hadoop.hbase.master.MasterServices;
066import org.apache.hadoop.hbase.master.RegionPlan;
067import org.apache.hadoop.hbase.master.RegionState;
068import org.apache.hadoop.hbase.master.ServerListener;
069import org.apache.hadoop.hbase.master.ServerManager;
070import org.apache.hadoop.hbase.master.TableStateManager;
071import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
072import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
073import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
074import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
075import org.apache.hadoop.hbase.net.Address;
076import org.apache.hadoop.hbase.procedure2.Procedure;
077import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
078import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
079import org.apache.hadoop.hbase.util.Bytes;
080import org.apache.hadoop.hbase.util.FutureUtils;
081import org.apache.hadoop.hbase.util.Threads;
082import org.apache.hadoop.hbase.zookeeper.ZKUtil;
083import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
084import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
085import org.apache.hadoop.util.Shell;
086import org.apache.yetus.audience.InterfaceAudience;
087import org.apache.zookeeper.KeeperException;
088import org.slf4j.Logger;
089import org.slf4j.LoggerFactory;
090
091import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
092import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
093import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
094import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
095import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
096
097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupProtos;
103/**
104 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the
105 * persistence store for the group information. It also makes use of zookeeper to store group
106 * information needed for bootstrapping during offline mode.
107 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached
108 * RSGroupInfo Map at {@link RSGroupInfoHolder#groupName2Group}.
109 * These Maps are persisted to the hbase:rsgroup table (and cached in zk) on each modification.
110 * <p/>
111 * Mutations on state are synchronized but reads can continue without having to wait on an instance
112 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
113 * state are read-only, just-in-case (see flushConfig).
114 * <p/>
115 * Reads must not block else there is a danger we'll deadlock.
116 * <p/>
117 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
118 * on the results of the query modifying cache in zookeeper without another thread making
119 * intermediate modifications. These clients synchronize on the 'this' instance so no other has
120 * access concurrently. Reads must be able to continue concurrently.
121 */
122@InterfaceAudience.Private
123final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
124  private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
125
126  // Assigned before user tables
127  @VisibleForTesting
128  static final TableName RSGROUP_TABLE_NAME =
129      TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");
130
131  @VisibleForTesting
132  static final String KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE = "should keep at least " +
133      "one server in 'default' RSGroup.";
134
135  /** Define the config key of retries threshold when movements failed */
136  @VisibleForTesting
137  static final String FAILED_MOVE_MAX_RETRY = "hbase.rsgroup.move.max.retry";
138
139  /** Define the default number of retries */
140  @VisibleForTesting
141  static final int DEFAULT_MAX_RETRY_VALUE = 50;
142
143  private static final String RS_GROUP_ZNODE = "rsgroup";
144
145  @VisibleForTesting
146  static final byte[] META_FAMILY_BYTES = Bytes.toBytes("m");
147
148  @VisibleForTesting
149  static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i");
150
151  @VisibleForTesting
152  static final String MIGRATE_THREAD_NAME = "Migrate-RSGroup-Tables";
153
154  private static final byte[] ROW_KEY = { 0 };
155
156  /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
157  private static final TableDescriptor RSGROUP_TABLE_DESC;
158  static {
159    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME)
160      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES))
161      .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
162    try {
163      builder.setCoprocessor(
164        CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
165          .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
166    } catch (IOException ex) {
167      throw new Error(ex);
168    }
169    RSGROUP_TABLE_DESC = builder.build();
170  }
171
172  // There two Maps are immutable and wholesale replaced on each modification
173  // so are safe to access concurrently. See class comment.
174  private static final class RSGroupInfoHolder {
175    final ImmutableMap<String, RSGroupInfo> groupName2Group;
176    final ImmutableMap<TableName, RSGroupInfo> tableName2Group;
177
178    RSGroupInfoHolder() {
179      this(Collections.emptyMap());
180    }
181
182    RSGroupInfoHolder(Map<String, RSGroupInfo> rsGroupMap) {
183      ImmutableMap.Builder<String, RSGroupInfo> group2Name2GroupBuilder = ImmutableMap.builder();
184      ImmutableMap.Builder<TableName, RSGroupInfo> tableName2GroupBuilder = ImmutableMap.builder();
185      rsGroupMap.forEach((groupName, rsGroupInfo) -> {
186        group2Name2GroupBuilder.put(groupName, rsGroupInfo);
187        if (!groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
188          rsGroupInfo.getTables()
189            .forEach(tableName -> tableName2GroupBuilder.put(tableName, rsGroupInfo));
190        }
191      });
192      this.groupName2Group = group2Name2GroupBuilder.build();
193      this.tableName2Group = tableName2GroupBuilder.build();
194    }
195  }
196
197  private volatile RSGroupInfoHolder holder = new RSGroupInfoHolder();
198
199  private final MasterServices masterServices;
200  private final AsyncClusterConnection conn;
201  private final ZKWatcher watcher;
202  private final RSGroupStartupWorker rsGroupStartupWorker;
203  // contains list of groups that were last flushed to persistent store
204  private Set<String> prevRSGroups = new HashSet<>();
205
206  // Package visibility for testing
207  static class RSGroupMappingScript {
208    static final String RS_GROUP_MAPPING_SCRIPT = "hbase.rsgroup.table.mapping.script";
209    static final String RS_GROUP_MAPPING_SCRIPT_TIMEOUT =
210      "hbase.rsgroup.table.mapping.script.timeout";
211    private Shell.ShellCommandExecutor rsgroupMappingScript;
212
213    RSGroupMappingScript(Configuration conf) {
214      String script = conf.get(RS_GROUP_MAPPING_SCRIPT);
215      if (script == null || script.isEmpty()) {
216        return;
217      }
218
219      rsgroupMappingScript = new Shell.ShellCommandExecutor(
220        new String[] { script, "", "" }, null, null,
221        conf.getLong(RS_GROUP_MAPPING_SCRIPT_TIMEOUT, 5000) // 5 seconds
222      );
223    }
224
225    String getRSGroup(String namespace, String tablename) {
226      if (rsgroupMappingScript == null) {
227        return null;
228      }
229      String[] exec = rsgroupMappingScript.getExecString();
230      exec[1] = namespace;
231      exec[2] = tablename;
232      try {
233        rsgroupMappingScript.execute();
234      } catch (IOException e) {
235        // This exception may happen, like process doesn't have permission to run this script.
236        LOG.error("{}, placing {} back to default rsgroup",
237          e.getMessage(),
238          TableName.valueOf(namespace, tablename));
239        return RSGroupInfo.DEFAULT_GROUP;
240      }
241      return rsgroupMappingScript.getOutput().trim();
242    }
243  }
244  private RSGroupMappingScript script;
245
246  private RSGroupInfoManagerImpl(MasterServices masterServices) {
247    this.masterServices = masterServices;
248    this.watcher = masterServices.getZooKeeper();
249    this.conn = masterServices.getAsyncClusterConnection();
250    this.rsGroupStartupWorker = new RSGroupStartupWorker();
251    this.script = new RSGroupMappingScript(masterServices.getConfiguration());
252  }
253
254  private synchronized void updateDefaultServers() {
255    LOG.info("Updating default servers.");
256    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
257    RSGroupInfo oldDefaultGroupInfo = getRSGroup(RSGroupInfo.DEFAULT_GROUP);
258    assert oldDefaultGroupInfo != null;
259    RSGroupInfo newDefaultGroupInfo =
260      new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers());
261    newDefaultGroupInfo.addAllTables(oldDefaultGroupInfo.getTables());
262    newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroupInfo);
263    // do not need to persist, as we do not persist default group.
264    resetRSGroupMap(newGroupMap);
265    LOG.info("Updated default servers, {} servers", newDefaultGroupInfo.getServers().size());
266  }
267
268  private synchronized void init() throws IOException {
269    refresh(false);
270    masterServices.getServerManager().registerListener(new ServerListener() {
271
272      @Override
273      public void serverAdded(ServerName serverName) {
274        updateDefaultServers();
275      }
276
277      @Override
278      public void serverRemoved(ServerName serverName) {
279        updateDefaultServers();
280      }
281    });
282    migrate();
283  }
284
285  static RSGroupInfoManager getInstance(MasterServices masterServices) throws IOException {
286    RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(masterServices);
287    instance.init();
288    return instance;
289  }
290
291  public void start() {
292    // create system table of rsgroup
293    rsGroupStartupWorker.start();
294  }
295
296  @Override
297  public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
298    checkGroupName(rsGroupInfo.getName());
299    Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
300    if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
301      rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
302      throw new ConstraintException("Group already exists: " + rsGroupInfo.getName());
303    }
304    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
305    newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
306    flushConfig(newGroupMap);
307  }
308
309  private RSGroupInfo getRSGroupInfo(final String groupName) throws ConstraintException {
310    RSGroupInfo rsGroupInfo = holder.groupName2Group.get(groupName);
311    if (rsGroupInfo == null) {
312      throw new ConstraintException("RSGroup " + groupName + " does not exist");
313    }
314    return rsGroupInfo;
315  }
316
317  /**
318   * @return Set of online Servers named for their hostname and port (not ServerName).
319   */
320  private Set<Address> getOnlineServers() {
321    return masterServices.getServerManager().getOnlineServers().keySet().stream()
322      .map(ServerName::getAddress).collect(Collectors.toSet());
323  }
324
325  public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
326      String dstGroup) throws IOException {
327    RSGroupInfo src = getRSGroupInfo(srcGroup);
328    RSGroupInfo dst = getRSGroupInfo(dstGroup);
329    // If destination is 'default' rsgroup, only add servers that are online. If not online, drop
330    // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
331    // rsgroup of dead servers that are to come back later).
332    Set<Address> onlineServers =
333      dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers() : null;
334    for (Address el : servers) {
335      src.removeServer(el);
336      if (onlineServers != null) {
337        if (!onlineServers.contains(el)) {
338          if (LOG.isDebugEnabled()) {
339            LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online");
340          }
341          continue;
342        }
343      }
344      dst.addServer(el);
345    }
346    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
347    newGroupMap.put(src.getName(), src);
348    newGroupMap.put(dst.getName(), dst);
349    flushConfig(newGroupMap);
350    return dst.getServers();
351  }
352
353  @Override
354  public RSGroupInfo getRSGroupOfServer(Address serverHostPort) {
355    for (RSGroupInfo info : holder.groupName2Group.values()) {
356      if (info.containsServer(serverHostPort)) {
357        return info;
358      }
359    }
360    return null;
361  }
362
363  @Override
364  public RSGroupInfo getRSGroup(String groupName) {
365    return holder.groupName2Group.get(groupName);
366  }
367
368  @Override
369  public synchronized void removeRSGroup(String groupName) throws IOException {
370    RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName);
371    int serverCount = rsGroupInfo.getServers().size();
372    if (serverCount > 0) {
373      throw new ConstraintException("RSGroup " + groupName + " has " + serverCount +
374          " servers; you must remove these servers from the RSGroup before" +
375          " the RSGroup can be removed.");
376    }
377    for (TableDescriptor td : masterServices.getTableDescriptors().getAll().values()) {
378      if (td.getRegionServerGroup().map(groupName::equals).orElse(false)) {
379        throw new ConstraintException("RSGroup " + groupName + " is already referenced by " +
380            td.getTableName() + "; you must remove all the tables from the rsgroup before " +
381            "the rsgroup can be removed.");
382      }
383    }
384    for (NamespaceDescriptor ns : masterServices.getClusterSchema().getNamespaces()) {
385      String nsGroup = ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
386      if (nsGroup != null && nsGroup.equals(groupName)) {
387        throw new ConstraintException(
388            "RSGroup " + groupName + " is referenced by namespace: " + ns.getName());
389      }
390    }
391    Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
392    if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
393      throw new ConstraintException(
394        "Group " + groupName + " does not exist or is a reserved " + "group");
395    }
396    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
397    newGroupMap.remove(groupName);
398    flushConfig(newGroupMap);
399  }
400
401  @Override
402  public List<RSGroupInfo> listRSGroups() {
403    return Lists.newArrayList(holder.groupName2Group.values());
404  }
405
406  @Override
407  public boolean isOnline() {
408    return rsGroupStartupWorker.isOnline();
409  }
410
411  @Override
412  public synchronized void removeServers(Set<Address> servers) throws IOException {
413    if (servers == null || servers.isEmpty()) {
414      throw new ConstraintException("The set of servers to remove cannot be null or empty.");
415    }
416
417    // check the set of servers
418    checkForDeadOrOnlineServers(servers);
419
420    Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
421    for (Address el : servers) {
422      RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
423      if (rsGroupInfo != null) {
424        RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
425        if (newRsGroupInfo == null) {
426          rsGroupInfo.removeServer(el);
427          rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo);
428        } else {
429          newRsGroupInfo.removeServer(el);
430          rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
431        }
432      } else {
433        LOG.warn("Server " + el + " does not belong to any rsgroup.");
434      }
435    }
436
437    if (rsGroupInfos.size() > 0) {
438      Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
439      newGroupMap.putAll(rsGroupInfos);
440      flushConfig(newGroupMap);
441    }
442    LOG.info("Remove decommissioned servers {} from RSGroup done", servers);
443  }
444
445  private List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
446    List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
447    AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME);
448    try (ResultScanner scanner = table.getScanner(META_FAMILY_BYTES, META_QUALIFIER_BYTES)) {
449      for (Result result;;) {
450        result = scanner.next();
451        if (result == null) {
452          break;
453        }
454        RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
455            .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
456        rsGroupInfoList.add(ProtobufUtil.toGroupInfo(proto));
457      }
458    }
459    return rsGroupInfoList;
460  }
461
462  private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
463    String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE);
464    List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
465    // Overwrite any info stored by table, this takes precedence
466    try {
467      if (ZKUtil.checkExists(watcher, groupBasePath) != -1) {
468        List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath);
469        if (children == null) {
470          return RSGroupInfoList;
471        }
472        for (String znode : children) {
473          byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
474          if (data.length > 0) {
475            ProtobufUtil.expectPBMagicPrefix(data);
476            ByteArrayInputStream bis =
477              new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
478            RSGroupInfoList
479              .add(ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
480          }
481        }
482        LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
483      }
484    } catch (KeeperException | DeserializationException | InterruptedException e) {
485      throw new IOException("Failed to read rsGroupZNode", e);
486    }
487    return RSGroupInfoList;
488  }
489
490  private void migrate(Collection<RSGroupInfo> groupList) {
491    TableDescriptors tds = masterServices.getTableDescriptors();
492    for (RSGroupInfo groupInfo : groupList) {
493      if (groupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
494        continue;
495      }
496      SortedSet<TableName> failedTables = new TreeSet<>();
497      for (TableName tableName : groupInfo.getTables()) {
498        LOG.debug("Migrating {} in group {}", tableName, groupInfo.getName());
499        TableDescriptor oldTd;
500        try {
501          oldTd = tds.get(tableName);
502        } catch (IOException e) {
503          LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
504          failedTables.add(tableName);
505          continue;
506        }
507        if (oldTd == null) {
508          continue;
509        }
510        if (oldTd.getRegionServerGroup().isPresent()) {
511          // either we have already migrated it or that user has set the rs group using the new
512          // code which will set the group directly on table descriptor, skip.
513          LOG.debug("Skip migrating {} since it is already in group {}", tableName,
514            oldTd.getRegionServerGroup().get());
515          continue;
516        }
517        TableDescriptor newTd = TableDescriptorBuilder.newBuilder(oldTd)
518          .setRegionServerGroup(groupInfo.getName()).build();
519        // This is a bit tricky. Since we know that the region server group config in
520        // TableDescriptor will only be used at master side, it is fine to just update the table
521        // descriptor on file system and also the cache, without reopening all the regions. This
522        // will be much faster than the normal modifyTable. And when upgrading, we will update
523        // master first and then region server, so after all the region servers has been reopened,
524        // the new TableDescriptor will be loaded.
525        try {
526          tds.update(newTd);
527        } catch (IOException e) {
528          LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
529          failedTables.add(tableName);
530          continue;
531        }
532      }
533      LOG.debug("Done migrating {}, failed tables {}", groupInfo.getName(), failedTables);
534      synchronized (RSGroupInfoManagerImpl.this) {
535        Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
536        RSGroupInfo currentInfo = rsGroupMap.get(groupInfo.getName());
537        if (currentInfo != null) {
538          RSGroupInfo newInfo =
539            new RSGroupInfo(currentInfo.getName(), currentInfo.getServers(), failedTables);
540          Map<String, RSGroupInfo> newGroupMap = new HashMap<>(rsGroupMap);
541          newGroupMap.put(groupInfo.getName(), newInfo);
542          try {
543            flushConfig(newGroupMap);
544          } catch (IOException e) {
545            LOG.warn("Failed to persist rs group {}", newInfo.getName(), e);
546          }
547        }
548      }
549    }
550  }
551
552  // Migrate the table rs group info from RSGroupInfo into the table descriptor
553  // Notice that we do not want to block the initialize so this will be done in background, and
554  // during the migrating, the rs group info maybe incomplete and cause region to be misplaced.
555  private void migrate() {
556    Thread migrateThread = new Thread(MIGRATE_THREAD_NAME) {
557
558      @Override
559      public void run() {
560        LOG.info("Start migrating table rs group config");
561        while (!masterServices.isStopped()) {
562          Collection<RSGroupInfo> groups = holder.groupName2Group.values();
563          boolean hasTables = groups.stream().anyMatch(r -> !r.getTables().isEmpty());
564          if (!hasTables) {
565            break;
566          }
567          migrate(groups);
568        }
569        LOG.info("Done migrating table rs group info");
570      }
571    };
572    migrateThread.setDaemon(true);
573    migrateThread.start();
574  }
575
576  /**
577   * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on
578   * startup of the manager.
579   */
580  private synchronized void refresh(boolean forceOnline) throws IOException {
581    List<RSGroupInfo> groupList = new ArrayList<>();
582
583    // Overwrite anything read from zk, group table is source of truth
584    // if online read from GROUP table
585    if (forceOnline || isOnline()) {
586      LOG.debug("Refreshing in Online mode.");
587      groupList.addAll(retrieveGroupListFromGroupTable());
588    } else {
589      LOG.debug("Refreshing in Offline mode.");
590      groupList.addAll(retrieveGroupListFromZookeeper());
591    }
592
593    // This is added to the last of the list so it overwrites the 'default' rsgroup loaded
594    // from region group table or zk
595    groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(groupList)));
596
597    // populate the data
598    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
599    for (RSGroupInfo group : groupList) {
600      newGroupMap.put(group.getName(), group);
601    }
602    resetRSGroupMap(newGroupMap);
603    updateCacheOfRSGroups(newGroupMap.keySet());
604  }
605
606  private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOException {
607    List<Mutation> mutations = Lists.newArrayList();
608
609    // populate deletes
610    for (String groupName : prevRSGroups) {
611      if (!groupMap.containsKey(groupName)) {
612        Delete d = new Delete(Bytes.toBytes(groupName));
613        mutations.add(d);
614      }
615    }
616
617    // populate puts
618    for (RSGroupInfo gi : groupMap.values()) {
619      if (!gi.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
620        RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(gi);
621        Put p = new Put(Bytes.toBytes(gi.getName()));
622        p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
623        mutations.add(p);
624      }
625    }
626
627    if (mutations.size() > 0) {
628      multiMutate(mutations);
629    }
630  }
631
632  private synchronized void flushConfig() throws IOException {
633    flushConfig(holder.groupName2Group);
634  }
635
636  private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
637    // For offline mode persistence is still unavailable
638    // We're refreshing in-memory state but only for servers in default group
639    if (!isOnline()) {
640      if (newGroupMap == holder.groupName2Group) {
641        // When newGroupMap is this.rsGroupMap itself,
642        // do not need to check default group and other groups as followed
643        return;
644      }
645
646      Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(holder.groupName2Group);
647      RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
648      RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
649      if (!oldGroupMap.equals(newGroupMap) /* compare both tables and servers in other groups */ ||
650          !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())
651      /* compare tables in default group */) {
652        throw new IOException("Only servers in default group can be updated during offline mode");
653      }
654
655      // Restore newGroupMap by putting its default group back
656      newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
657
658      // Refresh rsGroupMap
659      // according to the inputted newGroupMap (an updated copy of rsGroupMap)
660      this.holder = new RSGroupInfoHolder(newGroupMap);
661
662      // Do not need to update tableMap
663      // because only the update on servers in default group is allowed above,
664      // or IOException will be thrown
665      return;
666    }
667
668    /* For online mode, persist to hbase:rsgroup and Zookeeper */
669    flushConfigTable(newGroupMap);
670
671    // Make changes visible after having been persisted to the source of truth
672    resetRSGroupMap(newGroupMap);
673    saveRSGroupMapToZK(newGroupMap);
674    updateCacheOfRSGroups(newGroupMap.keySet());
675  }
676
677  private void saveRSGroupMapToZK(Map<String, RSGroupInfo> newGroupMap) throws IOException {
678    try {
679      String groupBasePath =
680          ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE);
681      ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
682
683      List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
684      for (String groupName : prevRSGroups) {
685        if (!newGroupMap.containsKey(groupName)) {
686          String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
687          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
688        }
689      }
690
691      for (RSGroupInfo gi : newGroupMap.values()) {
692        if (!gi.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
693          String znode = ZNodePaths.joinZNode(groupBasePath, gi.getName());
694          RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(gi);
695          LOG.debug("Updating znode: " + znode);
696          ZKUtil.createAndFailSilent(watcher, znode);
697          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
698          zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
699            ProtobufUtil.prependPBMagic(proto.toByteArray())));
700        }
701      }
702      LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
703
704      ZKUtil.multiOrSequential(watcher, zkOps, false);
705    } catch (KeeperException e) {
706      LOG.error("Failed to write to rsGroupZNode", e);
707      masterServices.abort("Failed to write to rsGroupZNode", e);
708      throw new IOException("Failed to write to rsGroupZNode", e);
709    }
710  }
711
712  /**
713   * Make changes visible. Caller must be synchronized on 'this'.
714   */
715  private void resetRSGroupMap(Map<String, RSGroupInfo> newRSGroupMap) {
716    this.holder = new RSGroupInfoHolder(newRSGroupMap);
717  }
718
719  /**
720   * Update cache of rsgroups. Caller must be synchronized on 'this'.
721   * @param currentGroups Current list of Groups.
722   */
723  private void updateCacheOfRSGroups(final Set<String> currentGroups) {
724    this.prevRSGroups.clear();
725    this.prevRSGroups.addAll(currentGroups);
726  }
727
728  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
729  private SortedSet<Address> getDefaultServers() {
730    return getDefaultServers(listRSGroups()/* get from rsGroupMap */);
731  }
732
733  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
734  private SortedSet<Address> getDefaultServers(List<RSGroupInfo> rsGroupInfoList) {
735    // Build a list of servers in other groups than default group, from rsGroupMap
736    Set<Address> serversInOtherGroup = new HashSet<>();
737    for (RSGroupInfo group : rsGroupInfoList) {
738      if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group
739        serversInOtherGroup.addAll(group.getServers());
740      }
741    }
742
743    // Get all online servers from Zookeeper and find out servers in default group
744    SortedSet<Address> defaultServers = Sets.newTreeSet();
745    for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) {
746      Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
747      if (!serversInOtherGroup.contains(server)) { // not in other groups
748        defaultServers.add(server);
749      }
750    }
751    return defaultServers;
752  }
753
754  private class RSGroupStartupWorker extends Thread {
755    private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
756    private volatile boolean online = false;
757
758    RSGroupStartupWorker() {
759      super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
760      setDaemon(true);
761    }
762
763    @Override
764    public void run() {
765      if (waitForGroupTableOnline()) {
766        LOG.info("GroupBasedLoadBalancer is now online");
767      } else {
768        LOG.warn("Quit without making region group table online");
769      }
770    }
771
772    private boolean waitForGroupTableOnline() {
773      while (isMasterRunning(masterServices)) {
774        try {
775          TableStateManager tsm = masterServices.getTableStateManager();
776          if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) {
777            createRSGroupTable();
778          }
779          // try reading from the table
780          FutureUtils.get(conn.getTable(RSGROUP_TABLE_NAME).get(new Get(ROW_KEY)));
781          LOG.info("RSGroup table={} is online, refreshing cached information", RSGROUP_TABLE_NAME);
782          RSGroupInfoManagerImpl.this.refresh(true);
783          online = true;
784          // flush any inconsistencies between ZK and HTable
785          RSGroupInfoManagerImpl.this.flushConfig();
786          // migrate after we are online.
787          migrate();
788          return true;
789        } catch (Exception e) {
790          LOG.warn("Failed to perform check", e);
791          // 100ms is short so let's just ignore the interrupt
792          Threads.sleepWithoutInterrupt(100);
793        }
794      }
795      return false;
796    }
797
798    private void createRSGroupTable() throws IOException {
799      OptionalLong optProcId = masterServices.getProcedures().stream()
800        .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p)
801        .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId)
802        .findFirst();
803      long procId;
804      if (optProcId.isPresent()) {
805        procId = optProcId.getAsLong();
806      } else {
807        procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
808      }
809      // wait for region to be online
810      int tries = 600;
811      while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) &&
812        masterServices.getMasterProcedureExecutor().isRunning() && tries > 0) {
813        try {
814          Thread.sleep(100);
815        } catch (InterruptedException e) {
816          throw new IOException("Wait interrupted ", e);
817        }
818        tries--;
819      }
820      if (tries <= 0) {
821        throw new IOException("Failed to create group table in a given time.");
822      } else {
823        Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
824        if (result != null && result.isFailed()) {
825          throw new IOException("Failed to create group table. " +
826              MasterProcedureUtil.unwrapRemoteIOException(result));
827        }
828      }
829    }
830
831    public boolean isOnline() {
832      return online;
833    }
834  }
835
836  private static boolean isMasterRunning(MasterServices masterServices) {
837    return !masterServices.isAborted() && !masterServices.isStopped();
838  }
839
840  private void multiMutate(List<Mutation> mutations) throws IOException {
841    MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
842    for (Mutation mutation : mutations) {
843      if (mutation instanceof Put) {
844        builder
845            .addMutationRequest(ProtobufUtil.toMutation(MutationProto.MutationType.PUT, mutation));
846      } else if (mutation instanceof Delete) {
847        builder.addMutationRequest(
848          ProtobufUtil.toMutation(MutationProto.MutationType.DELETE, mutation));
849      } else {
850        throw new DoNotRetryIOException(
851            "multiMutate doesn't support " + mutation.getClass().getName());
852      }
853    }
854    MutateRowsRequest request = builder.build();
855    AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME);
856    FutureUtils.get(table.<MultiRowMutationService, MutateRowsResponse> coprocessorService(
857      MultiRowMutationService::newStub,
858      (stub, controller, done) -> stub.mutateRows(controller, request, done), ROW_KEY));
859  }
860
861  private void checkGroupName(String groupName) throws ConstraintException {
862    if (!groupName.matches("[a-zA-Z0-9_]+")) {
863      throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
864    }
865  }
866
867  @Override
868  public RSGroupInfo getRSGroupForTable(TableName tableName) throws IOException {
869    return holder.tableName2Group.get(tableName);
870  }
871
872
873  /**
874   * Check if the set of servers are belong to dead servers list or online servers list.
875   * @param servers servers to remove
876   */
877  private void checkForDeadOrOnlineServers(Set<Address> servers) throws IOException {
878    // This uglyness is because we only have Address, not ServerName.
879    Set<Address> onlineServers = new HashSet<>();
880    List<ServerName> drainingServers = masterServices.getServerManager().getDrainingServersList();
881    for (ServerName server : masterServices.getServerManager().getOnlineServers().keySet()) {
882      // Only online but not decommissioned servers are really online
883      if (!drainingServers.contains(server)) {
884        onlineServers.add(server.getAddress());
885      }
886    }
887
888    Set<Address> deadServers = new HashSet<>();
889    for(ServerName server: masterServices.getServerManager().getDeadServers().copyServerNames()) {
890      deadServers.add(server.getAddress());
891    }
892
893    for (Address address: servers) {
894      if (onlineServers.contains(address)) {
895        throw new DoNotRetryIOException(
896            "Server " + address + " is an online server, not allowed to remove.");
897      }
898      if (deadServers.contains(address)) {
899        throw new DoNotRetryIOException(
900            "Server " + address + " is on the dead servers list,"
901                + " Maybe it will come back again, not allowed to remove.");
902      }
903    }
904  }
905
906  private void checkOnlineServersOnly(Set<Address> servers) throws IOException {
907    // This uglyness is because we only have Address, not ServerName.
908    // Online servers are keyed by ServerName.
909    Set<Address> onlineServers = new HashSet<>();
910    for(ServerName server: masterServices.getServerManager().getOnlineServers().keySet()) {
911      onlineServers.add(server.getAddress());
912    }
913    for (Address address: servers) {
914      if (!onlineServers.contains(address)) {
915        throw new DoNotRetryIOException("Server " + address +
916            " is not an online server in 'default' RSGroup.");
917      }
918    }
919  }
920
921  /**
922   * @return List of Regions associated with this <code>server</code>.
923   */
924  private List<RegionInfo> getRegions(final Address server) {
925    LinkedList<RegionInfo> regions = new LinkedList<>();
926    for (Map.Entry<RegionInfo, ServerName> el :
927        masterServices.getAssignmentManager().getRegionStates().getRegionAssignments().entrySet()) {
928      if (el.getValue() == null) {
929        continue;
930      }
931
932      if (el.getValue().getAddress().equals(server)) {
933        addRegion(regions, el.getKey());
934      }
935    }
936    for (RegionStateNode state : masterServices.getAssignmentManager().getRegionsInTransition()) {
937      if (state.getRegionLocation() != null &&
938          state.getRegionLocation().getAddress().equals(server)) {
939        addRegion(regions, state.getRegionInfo());
940      }
941    }
942    return regions;
943  }
944
945  private void addRegion(final LinkedList<RegionInfo> regions, RegionInfo hri) {
946    // If meta, move it last otherwise other unassigns fail because meta is not
947    // online for them to update state in. This is dodgy. Needs to be made more
948    // robust. See TODO below.
949    if (hri.isMetaRegion()) {
950      regions.addLast(hri);
951    } else {
952      regions.addFirst(hri);
953    }
954  }
955
956  /**
957   * Move every region from servers which are currently located on these servers, but should not be
958   * located there.
959   * @param servers the servers that will move to new group
960   * @param targetGroupName the target group name
961   * @throws IOException if moving the server and tables fail
962   */
963  private void moveServerRegionsFromGroup(Set<Address> servers, String targetGroupName)
964      throws IOException {
965    moveRegionsBetweenGroups(servers, targetGroupName, rs -> getRegions(rs), info -> {
966      try {
967        String groupName = RSGroupUtil.getRSGroupInfo(masterServices, this, info.getTable())
968            .map(RSGroupInfo::getName).orElse(RSGroupInfo.DEFAULT_GROUP);
969        return groupName.equals(targetGroupName);
970      } catch (IOException e) {
971        LOG.warn("Failed to test group for region {} and target group {}", info, targetGroupName);
972        return false;
973      }
974    }, rs -> rs.getHostname());
975  }
976
977  private <T> void moveRegionsBetweenGroups(Set<T> regionsOwners, String targetGroupName,
978      Function<T, List<RegionInfo>> getRegionsInfo, Function<RegionInfo, Boolean> validation,
979      Function<T, String> getOwnerName) throws IOException {
980    boolean hasRegionsToMove;
981    int retry = 0;
982    Set<T> allOwners = new HashSet<>(regionsOwners);
983    Set<String> failedRegions = new HashSet<>();
984    IOException toThrow = null;
985    do {
986      hasRegionsToMove = false;
987      for (Iterator<T> iter = allOwners.iterator(); iter.hasNext(); ) {
988        T owner = iter.next();
989        // Get regions that are associated with this server and filter regions by group tables.
990        for (RegionInfo region : getRegionsInfo.apply(owner)) {
991          if (!validation.apply(region)) {
992            LOG.info("Moving region {}, which do not belong to RSGroup {}",
993                region.getShortNameToLog(), targetGroupName);
994            try {
995              this.masterServices.getAssignmentManager().move(region);
996              failedRegions.remove(region.getRegionNameAsString());
997            } catch (IOException ioe) {
998              LOG.debug("Move region {} from group failed, will retry, current retry time is {}",
999                  region.getShortNameToLog(), retry, ioe);
1000              toThrow = ioe;
1001              failedRegions.add(region.getRegionNameAsString());
1002            }
1003            if (masterServices.getAssignmentManager().getRegionStates().
1004                getRegionState(region).isFailedOpen()) {
1005              continue;
1006            }
1007            hasRegionsToMove = true;
1008          }
1009        }
1010
1011        if (!hasRegionsToMove) {
1012          LOG.info("No more regions to move from {} to RSGroup", getOwnerName.apply(owner));
1013          iter.remove();
1014        }
1015      }
1016
1017      retry++;
1018      try {
1019        wait(1000);
1020      } catch (InterruptedException e) {
1021        LOG.warn("Sleep interrupted", e);
1022        Thread.currentThread().interrupt();
1023      }
1024    } while (hasRegionsToMove && retry <=
1025        masterServices.getConfiguration().getInt(FAILED_MOVE_MAX_RETRY, DEFAULT_MAX_RETRY_VALUE));
1026
1027    //has up to max retry time or there are no more regions to move
1028    if (hasRegionsToMove) {
1029      // print failed moved regions, for later process conveniently
1030      String msg = String
1031          .format("move regions for group %s failed, failed regions: %s", targetGroupName,
1032              failedRegions);
1033      LOG.error(msg);
1034      throw new DoNotRetryIOException(
1035          msg + ", just record the last failed region's cause, more details in server log",
1036          toThrow);
1037    }
1038  }
1039
1040  private boolean isTableInGroup(TableName tableName, String groupName,
1041      Set<TableName> tablesInGroupCache) throws IOException {
1042    if (tablesInGroupCache.contains(tableName)) {
1043      return true;
1044    }
1045    if (RSGroupUtil.getRSGroupInfo(masterServices, this, tableName)
1046        .map(RSGroupInfo::getName)
1047        .orElse(RSGroupInfo.DEFAULT_GROUP).equals(groupName)) {
1048      tablesInGroupCache.add(tableName);
1049      return true;
1050    }
1051    return false;
1052  }
1053
1054  private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName)
1055      throws IOException {
1056    Map<String, RegionState> rit = Maps.newTreeMap();
1057    Set<TableName> tablesInGroupCache = new HashSet<>();
1058    for (RegionStateNode regionNode :
1059        masterServices.getAssignmentManager().getRegionsInTransition()) {
1060      TableName tn = regionNode.getTable();
1061      if (isTableInGroup(tn, groupName, tablesInGroupCache)) {
1062        rit.put(regionNode.getRegionInfo().getEncodedName(), regionNode.toRegionState());
1063      }
1064    }
1065    return rit;
1066  }
1067
1068  /**
1069   * This is an EXPENSIVE clone. Cloning though is the safest thing to do. Can't let out original
1070   * since it can change and at least the load balancer wants to iterate this exported list. Load
1071   * balancer should iterate over this list because cloned list will ignore disabled table and split
1072   * parent region cases. This method is invoked by {@link #balanceRSGroup}
1073   * @return A clone of current assignments for this group.
1074   */
1075  @VisibleForTesting
1076  Map<TableName, Map<ServerName, List<RegionInfo>>> getRSGroupAssignmentsByTable(
1077    TableStateManager tableStateManager, String groupName) throws IOException {
1078    Map<TableName, Map<ServerName, List<RegionInfo>>> result = Maps.newHashMap();
1079    Set<TableName> tablesInGroupCache = new HashSet<>();
1080    for (Map.Entry<RegionInfo, ServerName> entry : masterServices.getAssignmentManager()
1081      .getRegionStates().getRegionAssignments().entrySet()) {
1082      RegionInfo region = entry.getKey();
1083      TableName tn = region.getTable();
1084      ServerName server = entry.getValue();
1085      if (isTableInGroup(tn, groupName, tablesInGroupCache)) {
1086        if (tableStateManager
1087          .isTableState(tn, TableState.State.DISABLED, TableState.State.DISABLING)) {
1088          continue;
1089        }
1090        if (region.isSplitParent()) {
1091          continue;
1092        }
1093        result.computeIfAbsent(tn, k -> new HashMap<>())
1094          .computeIfAbsent(server, k -> new ArrayList<>()).add(region);
1095      }
1096    }
1097    RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName);
1098    for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) {
1099      if (rsGroupInfo.containsServer(serverName.getAddress())) {
1100        for (Map<ServerName, List<RegionInfo>> map : result.values()) {
1101          map.computeIfAbsent(serverName, k -> Collections.emptyList());
1102        }
1103      }
1104    }
1105    return result;
1106  }
1107
1108  @Override
1109  public boolean balanceRSGroup(String groupName) throws IOException {
1110    ServerManager serverManager = masterServices.getServerManager();
1111    LoadBalancer balancer = masterServices.getLoadBalancer();
1112    getRSGroupInfo(groupName);
1113
1114    synchronized (balancer) {
1115      // If balance not true, don't run balancer.
1116      if (!masterServices.isBalancerOn()) {
1117        return false;
1118      }
1119      // Only allow one balance run at at time.
1120      Map<String, RegionState> groupRIT = rsGroupGetRegionsInTransition(groupName);
1121      if (groupRIT.size() > 0) {
1122        LOG.debug("Not running balancer because {} region(s) in transition: {}", groupRIT.size(),
1123            StringUtils.abbreviate(masterServices.getAssignmentManager().getRegionStates()
1124                    .getRegionsInTransition().toString(),
1125                256));
1126        return false;
1127      }
1128      if (serverManager.areDeadServersInProgress()) {
1129        LOG.debug("Not running balancer because processing dead regionserver(s): {}",
1130            serverManager.getDeadServers());
1131        return false;
1132      }
1133
1134      // We balance per group instead of per table
1135      Map<TableName, Map<ServerName, List<RegionInfo>>> assignmentsByTable =
1136          getRSGroupAssignmentsByTable(masterServices.getTableStateManager(), groupName);
1137      List<RegionPlan> plans = balancer.balanceCluster(assignmentsByTable);
1138      boolean balancerRan = !plans.isEmpty();
1139      if (balancerRan) {
1140        LOG.info("RSGroup balance {} starting with plan count: {}", groupName, plans.size());
1141        masterServices.executeRegionPlansWithThrottling(plans);
1142        LOG.info("RSGroup balance " + groupName + " completed");
1143      }
1144      return balancerRan;
1145    }
1146  }
1147
1148  private void moveTablesAndWait(Set<TableName> tables, String targetGroup) throws IOException {
1149    List<Long> procIds = new ArrayList<Long>();
1150    for (TableName tableName : tables) {
1151      TableDescriptor oldTd = masterServices.getTableDescriptors().get(tableName);
1152      if (oldTd == null) {
1153        continue;
1154      }
1155      TableDescriptor newTd =
1156          TableDescriptorBuilder.newBuilder(oldTd).setRegionServerGroup(targetGroup).build();
1157      procIds.add(masterServices.modifyTable(tableName, newTd, HConstants.NO_NONCE,
1158          HConstants.NO_NONCE));
1159    }
1160    for (long procId : procIds) {
1161      Procedure<?> proc = masterServices.getMasterProcedureExecutor().getProcedure(procId);
1162      if (proc == null) {
1163        continue;
1164      }
1165      ProcedureSyncWait.waitForProcedureToCompleteIOE(masterServices.getMasterProcedureExecutor(),
1166          proc, Long.MAX_VALUE);
1167    }
1168  }
1169
1170  @Override
1171  public void setRSGroup(Set<TableName> tables, String groupName) throws IOException {
1172    getRSGroupInfo(groupName);
1173    moveTablesAndWait(tables, groupName);
1174  }
1175
1176  public void moveServers(Set<Address> servers, String targetGroupName) throws IOException {
1177    if (servers == null) {
1178      throw new ConstraintException("The list of servers to move cannot be null.");
1179    }
1180    if (servers.isEmpty()) {
1181      // For some reason this difference between null servers and isEmpty is important distinction.
1182      // TODO. Why? Stuff breaks if I equate them.
1183      return;
1184    }
1185    if (StringUtils.isEmpty(targetGroupName)) {
1186      throw new ConstraintException("RSGroup cannot be null.");
1187    }
1188    getRSGroupInfo(targetGroupName);
1189
1190    // Hold a lock on the manager instance while moving servers to prevent
1191    // another writer changing our state while we are working.
1192    synchronized (this) {
1193      // Presume first server's source group. Later ensure all servers are from this group.
1194      Address firstServer = servers.iterator().next();
1195      RSGroupInfo srcGrp = getRSGroupOfServer(firstServer);
1196      if (srcGrp == null) {
1197        // Be careful. This exception message is tested for in TestRSGroupsBase...
1198        throw new ConstraintException("Source RSGroup for server " + firstServer
1199            + " does not exist.");
1200      }
1201
1202      // Only move online servers (when moving from 'default') or servers from other
1203      // groups. This prevents bogus servers from entering groups
1204      if (RSGroupInfo.DEFAULT_GROUP.equals(srcGrp.getName())) {
1205        if (srcGrp.getServers().size() <= servers.size()) {
1206          throw new ConstraintException(KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE);
1207        }
1208        checkOnlineServersOnly(servers);
1209      }
1210      // Ensure all servers are of same rsgroup.
1211      for (Address server: servers) {
1212        String tmpGroup = getRSGroupOfServer(server).getName();
1213        if (!tmpGroup.equals(srcGrp.getName())) {
1214          throw new ConstraintException("Move server request should only come from one source " +
1215              "RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup);
1216        }
1217      }
1218      if (srcGrp.getServers().size() <= servers.size()) {
1219        // check if there are still tables reference this group
1220        for (TableDescriptor td : masterServices.getTableDescriptors().getAll().values()) {
1221          Optional<String> optGroupName = td.getRegionServerGroup();
1222          if (optGroupName.isPresent() && optGroupName.get().equals(srcGrp.getName())) {
1223            throw new ConstraintException(
1224                "Cannot leave a RSGroup " + srcGrp.getName() + " that contains tables('" +
1225                    td.getTableName() + "' at least) without servers to host them.");
1226          }
1227        }
1228      }
1229
1230      // MovedServers may be < passed in 'servers'.
1231      Set<Address> movedServers = moveServers(servers, srcGrp.getName(),
1232          targetGroupName);
1233      moveServerRegionsFromGroup(movedServers, targetGroupName);
1234      LOG.info("Move servers done: {} => {}", srcGrp.getName(), targetGroupName);
1235    }
1236  }
1237
1238  @Override
1239  public String determineRSGroupInfoForTable(TableName tableName) {
1240    return script.getRSGroup(tableName.getNamespaceAsString(), tableName.getQualifierAsString());
1241  }
1242
1243  @Override
1244  public synchronized void renameRSGroup(String oldName, String newName) throws IOException {
1245    if (oldName.equals(RSGroupInfo.DEFAULT_GROUP)) {
1246      throw new ConstraintException(RSGroupInfo.DEFAULT_GROUP + " can't be rename");
1247    }
1248    checkGroupName(newName);
1249    //getRSGroupInfo validates old RSGroup existence.
1250    RSGroupInfo oldRSG = getRSGroupInfo(oldName);
1251    Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
1252    if (rsGroupMap.containsKey(newName)) {
1253      throw new ConstraintException("Group already exists: " + newName);
1254    }
1255
1256    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
1257    newGroupMap.remove(oldRSG.getName());
1258    RSGroupInfo newRSG = new RSGroupInfo(newName, oldRSG.getServers());
1259    newGroupMap.put(newName, newRSG);
1260    flushConfig(newGroupMap);
1261    Set<TableName> updateTables =
1262      masterServices.getTableDescriptors().getAll().values()
1263                    .stream()
1264                    .filter(t -> oldName.equals(t.getRegionServerGroup().orElse(null)))
1265                    .map(TableDescriptor::getTableName)
1266                    .collect(Collectors.toSet());
1267    setRSGroup(updateTables, newName);
1268  }
1269
1270  @Override
1271  public synchronized void updateRSGroupConfig(String groupName, Map<String, String> configuration)
1272      throws IOException {
1273    if (RSGroupInfo.DEFAULT_GROUP.equals(groupName)) {
1274      // We do not persist anything of default group, therefore, it is not supported to update
1275      // default group's configuration which lost once master down.
1276      throw new ConstraintException("configuration of " + RSGroupInfo.DEFAULT_GROUP
1277        + " can't be stored persistently");
1278    }
1279    RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName);
1280    rsGroupInfo.getConfiguration().forEach((k, v) -> rsGroupInfo.removeConfiguration(k));
1281    configuration.forEach((k, v) -> rsGroupInfo.setConfiguration(k, v));
1282    flushConfig();
1283  }
1284}