001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import java.io.ByteArrayInputStream;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.OptionalLong;
032import java.util.Set;
033import java.util.SortedSet;
034import java.util.TreeSet;
035import java.util.concurrent.Future;
036import java.util.function.Function;
037import java.util.stream.Collectors;
038import org.apache.commons.lang3.StringUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Coprocessor;
041import org.apache.hadoop.hbase.DoNotRetryIOException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.NamespaceDescriptor;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableDescriptors;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.AsyncClusterConnection;
048import org.apache.hadoop.hbase.client.AsyncTable;
049import org.apache.hadoop.hbase.client.BalanceRequest;
050import org.apache.hadoop.hbase.client.BalanceResponse;
051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
052import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
053import org.apache.hadoop.hbase.client.Delete;
054import org.apache.hadoop.hbase.client.Get;
055import org.apache.hadoop.hbase.client.Mutation;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.RegionInfo;
058import org.apache.hadoop.hbase.client.Result;
059import org.apache.hadoop.hbase.client.ResultScanner;
060import org.apache.hadoop.hbase.client.TableDescriptor;
061import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
062import org.apache.hadoop.hbase.client.TableState;
063import org.apache.hadoop.hbase.constraint.ConstraintException;
064import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
065import org.apache.hadoop.hbase.exceptions.DeserializationException;
066import org.apache.hadoop.hbase.master.LoadBalancer;
067import org.apache.hadoop.hbase.master.MasterServices;
068import org.apache.hadoop.hbase.master.RegionPlan;
069import org.apache.hadoop.hbase.master.RegionState;
070import org.apache.hadoop.hbase.master.ServerListener;
071import org.apache.hadoop.hbase.master.ServerManager;
072import org.apache.hadoop.hbase.master.TableStateManager;
073import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
074import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
075import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
076import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
077import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
078import org.apache.hadoop.hbase.net.Address;
079import org.apache.hadoop.hbase.procedure2.Procedure;
080import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
081import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
082import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
083import org.apache.hadoop.hbase.util.Bytes;
084import org.apache.hadoop.hbase.util.FutureUtils;
085import org.apache.hadoop.hbase.util.Pair;
086import org.apache.hadoop.hbase.util.Threads;
087import org.apache.hadoop.hbase.zookeeper.ZKUtil;
088import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
089import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
090import org.apache.hadoop.util.Shell;
091import org.apache.yetus.audience.InterfaceAudience;
092import org.apache.zookeeper.KeeperException;
093import org.slf4j.Logger;
094import org.slf4j.LoggerFactory;
095
096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
097import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
098import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
099import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
100
101import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupProtos;
107
108/**
109 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the
110 * persistence store for the group information. It also makes use of zookeeper to store group
111 * information needed for bootstrapping during offline mode.
112 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached
113 * RSGroupInfo Map at {@link RSGroupInfoHolder#groupName2Group}. These Maps are persisted to the
114 * hbase:rsgroup table (and cached in zk) on each modification.
115 * <p/>
116 * Mutations on state are synchronized but reads can continue without having to wait on an instance
117 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
118 * state are read-only, just-in-case (see flushConfig).
119 * <p/>
120 * Reads must not block else there is a danger we'll deadlock.
121 * <p/>
122 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
123 * on the results of the query modifying cache in zookeeper without another thread making
124 * intermediate modifications. These clients synchronize on the 'this' instance so no other has
125 * access concurrently. Reads must be able to continue concurrently.
126 */
127@InterfaceAudience.Private
128final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
129  private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
130
131  // Assigned before user tables
132  static final TableName RSGROUP_TABLE_NAME =
133    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");
134
135  static final String KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE =
136    "should keep at least " + "one server in 'default' RSGroup.";
137
138  /** Define the config key of retries threshold when movements failed */
139  static final String FAILED_MOVE_MAX_RETRY = "hbase.rsgroup.move.max.retry";
140
141  /** Define the default number of retries */
142  static final int DEFAULT_MAX_RETRY_VALUE = 50;
143
144  private static final String RS_GROUP_ZNODE = "rsgroup";
145
146  static final byte[] META_FAMILY_BYTES = Bytes.toBytes("m");
147
148  static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i");
149
150  static final String MIGRATE_THREAD_NAME = "Migrate-RSGroup-Tables";
151
152  private static final byte[] ROW_KEY = { 0 };
153
154  /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
155  private static final TableDescriptor RSGROUP_TABLE_DESC;
156  static {
157    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME)
158      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES))
159      .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
160    try {
161      builder.setCoprocessor(
162        CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
163          .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
164    } catch (IOException ex) {
165      throw new Error(ex);
166    }
167    RSGROUP_TABLE_DESC = builder.build();
168  }
169
170  // There two Maps are immutable and wholesale replaced on each modification
171  // so are safe to access concurrently. See class comment.
172  private static final class RSGroupInfoHolder {
173    final ImmutableMap<String, RSGroupInfo> groupName2Group;
174    final ImmutableMap<TableName, RSGroupInfo> tableName2Group;
175
176    RSGroupInfoHolder() {
177      this(Collections.emptyMap());
178    }
179
180    RSGroupInfoHolder(Map<String, RSGroupInfo> rsGroupMap) {
181      ImmutableMap.Builder<String, RSGroupInfo> group2Name2GroupBuilder = ImmutableMap.builder();
182      ImmutableMap.Builder<TableName, RSGroupInfo> tableName2GroupBuilder = ImmutableMap.builder();
183      rsGroupMap.forEach((groupName, rsGroupInfo) -> {
184        group2Name2GroupBuilder.put(groupName, rsGroupInfo);
185        if (!groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
186          rsGroupInfo.getTables()
187            .forEach(tableName -> tableName2GroupBuilder.put(tableName, rsGroupInfo));
188        }
189      });
190      this.groupName2Group = group2Name2GroupBuilder.build();
191      this.tableName2Group = tableName2GroupBuilder.build();
192    }
193  }
194
195  private volatile RSGroupInfoHolder holder = new RSGroupInfoHolder();
196
197  private final MasterServices masterServices;
198  private final AsyncClusterConnection conn;
199  private final ZKWatcher watcher;
200  private final RSGroupStartupWorker rsGroupStartupWorker;
201  // contains list of groups that were last flushed to persistent store
202  private Set<String> prevRSGroups = new HashSet<>();
203
204  // Package visibility for testing
205  static class RSGroupMappingScript {
206    static final String RS_GROUP_MAPPING_SCRIPT = "hbase.rsgroup.table.mapping.script";
207    static final String RS_GROUP_MAPPING_SCRIPT_TIMEOUT =
208      "hbase.rsgroup.table.mapping.script.timeout";
209
210    private final String script;
211    private final long scriptTimeout;
212
213    RSGroupMappingScript(Configuration conf) {
214      script = conf.get(RS_GROUP_MAPPING_SCRIPT);
215      scriptTimeout = conf.getLong(RS_GROUP_MAPPING_SCRIPT_TIMEOUT, 5000); // 5 seconds
216    }
217
218    String getRSGroup(String namespace, String tablename) {
219      if (script == null || script.isEmpty()) {
220        return null;
221      }
222      Shell.ShellCommandExecutor rsgroupMappingScript =
223        new Shell.ShellCommandExecutor(new String[] { script, "", "" }, null, null, scriptTimeout);
224
225      String[] exec = rsgroupMappingScript.getExecString();
226      exec[1] = namespace;
227      exec[2] = tablename;
228      try {
229        rsgroupMappingScript.execute();
230      } catch (IOException e) {
231        // This exception may happen, like process doesn't have permission to run this script.
232        LOG.error("{}, placing {} back to default rsgroup", e.getMessage(),
233          TableName.valueOf(namespace, tablename));
234        return RSGroupInfo.DEFAULT_GROUP;
235      }
236      return rsgroupMappingScript.getOutput().trim();
237    }
238  }
239
240  private RSGroupMappingScript script;
241
242  private RSGroupInfoManagerImpl(MasterServices masterServices) {
243    this.masterServices = masterServices;
244    this.watcher = masterServices.getZooKeeper();
245    this.conn = masterServices.getAsyncClusterConnection();
246    this.rsGroupStartupWorker = new RSGroupStartupWorker();
247    this.script = new RSGroupMappingScript(masterServices.getConfiguration());
248  }
249
250  private synchronized void updateDefaultServers() {
251    LOG.info("Updating default servers.");
252    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
253    RSGroupInfo oldDefaultGroupInfo = getRSGroup(RSGroupInfo.DEFAULT_GROUP);
254    assert oldDefaultGroupInfo != null;
255    RSGroupInfo newDefaultGroupInfo =
256      new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers());
257    newDefaultGroupInfo.addAllTables(oldDefaultGroupInfo.getTables());
258    newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroupInfo);
259    // do not need to persist, as we do not persist default group.
260    resetRSGroupMap(newGroupMap);
261    LOG.info("Updated default servers, {} servers", newDefaultGroupInfo.getServers().size());
262    if (LOG.isDebugEnabled()) {
263      LOG.debug("New default servers list: {}", newDefaultGroupInfo.getServers());
264    }
265  }
266
267  private synchronized void init() throws IOException {
268    refresh(false);
269    masterServices.getServerManager().registerListener(new ServerListener() {
270
271      @Override
272      public void serverAdded(ServerName serverName) {
273        updateDefaultServers();
274      }
275
276      @Override
277      public void serverRemoved(ServerName serverName) {
278        updateDefaultServers();
279      }
280    });
281  }
282
283  static RSGroupInfoManager getInstance(MasterServices masterServices) throws IOException {
284    RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(masterServices);
285    instance.init();
286    return instance;
287  }
288
289  public void start() {
290    // create system table of rsgroup
291    rsGroupStartupWorker.start();
292  }
293
294  @Override
295  public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
296    checkGroupName(rsGroupInfo.getName());
297    Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
298    if (
299      rsGroupMap.get(rsGroupInfo.getName()) != null
300        || rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)
301    ) {
302      throw new ConstraintException("Group already exists: " + rsGroupInfo.getName());
303    }
304    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
305    newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
306    flushConfig(newGroupMap);
307    LOG.info("Add group {} done.", rsGroupInfo.getName());
308  }
309
310  private RSGroupInfo getRSGroupInfo(final String groupName) throws ConstraintException {
311    RSGroupInfo rsGroupInfo = holder.groupName2Group.get(groupName);
312    if (rsGroupInfo == null) {
313      throw new ConstraintException("RSGroup " + groupName + " does not exist");
314    }
315    return rsGroupInfo;
316  }
317
318  /** Returns Set of online Servers named for their hostname and port (not ServerName). */
319  private Set<Address> getOnlineServers() {
320    return masterServices.getServerManager().getOnlineServers().keySet().stream()
321      .map(ServerName::getAddress).collect(Collectors.toSet());
322  }
323
324  public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
325    String dstGroup) throws IOException {
326    RSGroupInfo src = getRSGroupInfo(srcGroup);
327    RSGroupInfo dst = getRSGroupInfo(dstGroup);
328    Set<Address> movedServers = new HashSet<>();
329    // If destination is 'default' rsgroup, only add servers that are online. If not online, drop
330    // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
331    // rsgroup of dead servers that are to come back later).
332    Set<Address> onlineServers =
333      dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers() : null;
334    for (Address el : servers) {
335      src.removeServer(el);
336      if (onlineServers != null) {
337        if (!onlineServers.contains(el)) {
338          if (LOG.isDebugEnabled()) {
339            LOG.debug("Dropping " + el + " during move-to-default RSGroup because not online");
340          }
341          continue;
342        }
343      }
344      dst.addServer(el);
345      movedServers.add(el);
346    }
347    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
348    newGroupMap.put(src.getName(), src);
349    newGroupMap.put(dst.getName(), dst);
350    flushConfig(newGroupMap);
351    return movedServers;
352  }
353
354  @Override
355  public RSGroupInfo getRSGroupOfServer(Address serverHostPort) {
356    for (RSGroupInfo info : holder.groupName2Group.values()) {
357      if (info.containsServer(serverHostPort)) {
358        return info;
359      }
360    }
361    return null;
362  }
363
364  @Override
365  public RSGroupInfo getRSGroup(String groupName) {
366    return holder.groupName2Group.get(groupName);
367  }
368
369  @Override
370  public synchronized void removeRSGroup(String groupName) throws IOException {
371    RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName);
372    int serverCount = rsGroupInfo.getServers().size();
373    if (serverCount > 0) {
374      throw new ConstraintException("RSGroup " + groupName + " has " + serverCount
375        + " servers; you must remove these servers from the RSGroup before"
376        + " the RSGroup can be removed.");
377    }
378    for (TableDescriptor td : masterServices.getTableDescriptors().getAll().values()) {
379      if (td.getRegionServerGroup().map(groupName::equals).orElse(false)) {
380        throw new ConstraintException("RSGroup " + groupName + " is already referenced by "
381          + td.getTableName() + "; you must remove all the tables from the RSGroup before "
382          + "the RSGroup can be removed.");
383      }
384    }
385    for (NamespaceDescriptor ns : masterServices.getClusterSchema().getNamespaces()) {
386      String nsGroup = ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
387      if (nsGroup != null && nsGroup.equals(groupName)) {
388        throw new ConstraintException(
389          "RSGroup " + groupName + " is referenced by namespace: " + ns.getName());
390      }
391    }
392    Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
393    if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
394      throw new ConstraintException(
395        "Group " + groupName + " does not exist or is a reserved " + "group");
396    }
397    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
398    newGroupMap.remove(groupName);
399    flushConfig(newGroupMap);
400    LOG.info("Remove group {} done", groupName);
401  }
402
403  @Override
404  public List<RSGroupInfo> listRSGroups() {
405    return Lists.newArrayList(holder.groupName2Group.values());
406  }
407
408  @Override
409  public boolean isOnline() {
410    return rsGroupStartupWorker.isOnline();
411  }
412
413  @Override
414  public synchronized void removeServers(Set<Address> servers) throws IOException {
415    if (servers == null || servers.isEmpty()) {
416      throw new ConstraintException("The set of servers to remove cannot be null or empty.");
417    }
418
419    // check the set of servers
420    checkForDeadOrOnlineServers(servers);
421
422    Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
423    for (Address el : servers) {
424      RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
425      if (rsGroupInfo != null) {
426        RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
427        if (newRsGroupInfo == null) {
428          rsGroupInfo.removeServer(el);
429          rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo);
430        } else {
431          newRsGroupInfo.removeServer(el);
432          rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
433        }
434      } else {
435        LOG.warn("Server " + el + " does not belong to any rsgroup.");
436      }
437    }
438
439    if (rsGroupInfos.size() > 0) {
440      Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
441      newGroupMap.putAll(rsGroupInfos);
442      flushConfig(newGroupMap);
443    }
444    LOG.info("Remove decommissioned servers {} from RSGroup done", servers);
445  }
446
447  private List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
448    List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
449    AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME);
450    try (ResultScanner scanner = table.getScanner(META_FAMILY_BYTES, META_QUALIFIER_BYTES)) {
451      for (Result result;;) {
452        result = scanner.next();
453        if (result == null) {
454          break;
455        }
456        RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
457          .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
458        rsGroupInfoList.add(ProtobufUtil.toGroupInfo(proto));
459      }
460    }
461    return rsGroupInfoList;
462  }
463
464  private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
465    String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE);
466    List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
467    // Overwrite any info stored by table, this takes precedence
468    try {
469      if (ZKUtil.checkExists(watcher, groupBasePath) != -1) {
470        List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath);
471        if (children == null) {
472          return RSGroupInfoList;
473        }
474        for (String znode : children) {
475          byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
476          if (data != null && data.length > 0) {
477            ProtobufUtil.expectPBMagicPrefix(data);
478            ByteArrayInputStream bis =
479              new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
480            RSGroupInfoList.add(ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
481          }
482        }
483        LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
484      }
485    } catch (KeeperException | DeserializationException | InterruptedException e) {
486      throw new IOException("Failed to read rsGroupZNode", e);
487    }
488    return RSGroupInfoList;
489  }
490
491  private void migrate(Collection<RSGroupInfo> groupList) {
492    TableDescriptors tds = masterServices.getTableDescriptors();
493    ProcedureExecutor<MasterProcedureEnv> procExec = masterServices.getMasterProcedureExecutor();
494    for (RSGroupInfo groupInfo : groupList) {
495      if (groupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
496        continue;
497      }
498      SortedSet<TableName> failedTables = new TreeSet<>();
499      List<MigrateRSGroupProcedure> procs = new ArrayList<>();
500      for (TableName tableName : groupInfo.getTables()) {
501        LOG.debug("Migrating {} in group {}", tableName, groupInfo.getName());
502        TableDescriptor oldTd;
503        try {
504          oldTd = tds.get(tableName);
505        } catch (IOException e) {
506          LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
507          failedTables.add(tableName);
508          continue;
509        }
510        if (oldTd == null) {
511          continue;
512        }
513        if (oldTd.getRegionServerGroup().isPresent()) {
514          // either we have already migrated it or that user has set the rs group using the new
515          // code which will set the group directly on table descriptor, skip.
516          LOG.debug("Skip migrating {} since it is already in group {}", tableName,
517            oldTd.getRegionServerGroup().get());
518          continue;
519        }
520        // This is a bit tricky. Since we know that the region server group config in
521        // TableDescriptor will only be used at master side, it is fine to just update the table
522        // descriptor on file system and also the cache, without reopening all the regions. This
523        // will be much faster than the normal modifyTable. And when upgrading, we will update
524        // master first and then region server, so after all the region servers has been reopened,
525        // the new TableDescriptor will be loaded.
526        MigrateRSGroupProcedure proc =
527          new MigrateRSGroupProcedure(procExec.getEnvironment(), tableName);
528        procExec.submitProcedure(proc);
529        procs.add(proc);
530      }
531      for (MigrateRSGroupProcedure proc : procs) {
532        try {
533          ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60000);
534        } catch (IOException e) {
535          LOG.warn("Failed to migrate rs group {} for table {}", groupInfo.getName(),
536            proc.getTableName());
537          failedTables.add(proc.getTableName());
538        }
539      }
540      LOG.debug("Done migrating {}, failed tables {}", groupInfo.getName(), failedTables);
541      synchronized (RSGroupInfoManagerImpl.this) {
542        Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
543        RSGroupInfo currentInfo = rsGroupMap.get(groupInfo.getName());
544        if (currentInfo != null) {
545          RSGroupInfo newInfo =
546            new RSGroupInfo(currentInfo.getName(), currentInfo.getServers(), failedTables);
547          Map<String, RSGroupInfo> newGroupMap = new HashMap<>(rsGroupMap);
548          newGroupMap.put(groupInfo.getName(), newInfo);
549          try {
550            flushConfig(newGroupMap);
551          } catch (IOException e) {
552            LOG.warn("Failed to persist rs group {}", newInfo.getName(), e);
553          }
554        }
555      }
556    }
557  }
558
559  // Migrate the table rs group info from RSGroupInfo into the table descriptor
560  // Notice that we do not want to block the initialize so this will be done in background, and
561  // during the migrating, the rs group info maybe incomplete and cause region to be misplaced.
562  private void migrate() {
563    Thread migrateThread = new Thread(MIGRATE_THREAD_NAME) {
564
565      @Override
566      public void run() {
567        LOG.info("Start migrating table rs group config");
568        while (!masterServices.isStopped()) {
569          Collection<RSGroupInfo> groups = holder.groupName2Group.values();
570          boolean hasTables = groups.stream().anyMatch(r -> !r.getTables().isEmpty());
571          if (!hasTables) {
572            break;
573          }
574          migrate(groups);
575        }
576        LOG.info("Done migrating table rs group info");
577      }
578    };
579    migrateThread.setDaemon(true);
580    migrateThread.start();
581  }
582
583  /**
584   * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on
585   * startup of the manager.
586   */
587  private synchronized void refresh(boolean forceOnline) throws IOException {
588    List<RSGroupInfo> groupList = new ArrayList<>();
589
590    // Overwrite anything read from zk, group table is source of truth
591    // if online read from GROUP table
592    if (forceOnline || isOnline()) {
593      LOG.debug("Refreshing in Online mode.");
594      groupList.addAll(retrieveGroupListFromGroupTable());
595    } else {
596      LOG.debug("Refreshing in Offline mode.");
597      groupList.addAll(retrieveGroupListFromZookeeper());
598    }
599
600    // This is added to the last of the list so it overwrites the 'default' rsgroup loaded
601    // from region group table or zk
602    groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(groupList)));
603
604    // populate the data
605    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
606    for (RSGroupInfo group : groupList) {
607      newGroupMap.put(group.getName(), group);
608    }
609    resetRSGroupMap(newGroupMap);
610    updateCacheOfRSGroups(newGroupMap.keySet());
611  }
612
613  private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOException {
614    List<Mutation> mutations = Lists.newArrayList();
615
616    // populate deletes
617    for (String groupName : prevRSGroups) {
618      if (!groupMap.containsKey(groupName)) {
619        Delete d = new Delete(Bytes.toBytes(groupName));
620        mutations.add(d);
621      }
622    }
623
624    // populate puts
625    for (RSGroupInfo gi : groupMap.values()) {
626      if (!gi.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
627        RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(gi);
628        Put p = new Put(Bytes.toBytes(gi.getName()));
629        p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
630        mutations.add(p);
631      }
632    }
633
634    if (mutations.size() > 0) {
635      multiMutate(mutations);
636    }
637  }
638
639  private synchronized void flushConfig() throws IOException {
640    flushConfig(holder.groupName2Group);
641  }
642
643  private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
644    // For offline mode persistence is still unavailable
645    // We're refreshing in-memory state but only for servers in default group
646    if (!isOnline()) {
647      if (newGroupMap == holder.groupName2Group) {
648        // When newGroupMap is this.rsGroupMap itself,
649        // do not need to check default group and other groups as followed
650        return;
651      }
652
653      LOG.debug("Offline mode, cannot persist to {}", RSGROUP_TABLE_NAME);
654
655      Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(holder.groupName2Group);
656      RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
657      RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
658      if (
659        !oldGroupMap.equals(newGroupMap)
660          /* compare both tables and servers in other groups */ || !oldDefaultGroup.getTables()
661            .equals(newDefaultGroup.getTables())
662        /* compare tables in default group */
663      ) {
664        throw new IOException("Only servers in default group can be updated during offline mode");
665      }
666
667      // Restore newGroupMap by putting its default group back
668      newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
669
670      // Refresh rsGroupMap
671      // according to the inputted newGroupMap (an updated copy of rsGroupMap)
672      this.holder = new RSGroupInfoHolder(newGroupMap);
673
674      LOG.debug("New RSGroup map: {}", newGroupMap);
675
676      // Do not need to update tableMap
677      // because only the update on servers in default group is allowed above,
678      // or IOException will be thrown
679      return;
680    }
681
682    /* For online mode, persist to hbase:rsgroup and Zookeeper */
683    LOG.debug("Online mode, persisting to {} and ZK", RSGROUP_TABLE_NAME);
684    flushConfigTable(newGroupMap);
685
686    // Make changes visible after having been persisted to the source of truth
687    resetRSGroupMap(newGroupMap);
688    saveRSGroupMapToZK(newGroupMap);
689    updateCacheOfRSGroups(newGroupMap.keySet());
690    LOG.info("Flush config done, new RSGroup map: {}", newGroupMap);
691  }
692
693  private void saveRSGroupMapToZK(Map<String, RSGroupInfo> newGroupMap) throws IOException {
694    LOG.debug("Saving RSGroup info to ZK");
695    try {
696      String groupBasePath =
697        ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE);
698      ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
699
700      List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
701      for (String groupName : prevRSGroups) {
702        if (!newGroupMap.containsKey(groupName)) {
703          String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
704          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
705        }
706      }
707
708      for (RSGroupInfo gi : newGroupMap.values()) {
709        if (!gi.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
710          String znode = ZNodePaths.joinZNode(groupBasePath, gi.getName());
711          RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(gi);
712          LOG.debug("Updating znode: " + znode);
713          ZKUtil.createAndFailSilent(watcher, znode);
714          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
715          zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
716            ProtobufUtil.prependPBMagic(proto.toByteArray())));
717        }
718      }
719      LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
720
721      ZKUtil.multiOrSequential(watcher, zkOps, false);
722    } catch (KeeperException e) {
723      LOG.error("Failed to write to rsGroupZNode", e);
724      masterServices.abort("Failed to write to rsGroupZNode", e);
725      throw new IOException("Failed to write to rsGroupZNode", e);
726    }
727  }
728
729  /**
730   * Make changes visible. Caller must be synchronized on 'this'.
731   */
732  private void resetRSGroupMap(Map<String, RSGroupInfo> newRSGroupMap) {
733    this.holder = new RSGroupInfoHolder(newRSGroupMap);
734  }
735
736  /**
737   * Update cache of rsgroups. Caller must be synchronized on 'this'.
738   * @param currentGroups Current list of Groups.
739   */
740  private void updateCacheOfRSGroups(final Set<String> currentGroups) {
741    this.prevRSGroups.clear();
742    this.prevRSGroups.addAll(currentGroups);
743  }
744
745  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
746  private SortedSet<Address> getDefaultServers() {
747    return getDefaultServers(listRSGroups()/* get from rsGroupMap */);
748  }
749
750  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
751  private SortedSet<Address> getDefaultServers(List<RSGroupInfo> rsGroupInfoList) {
752    // Build a list of servers in other groups than default group, from rsGroupMap
753    Set<Address> serversInOtherGroup = new HashSet<>();
754    for (RSGroupInfo group : rsGroupInfoList) {
755      if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group
756        serversInOtherGroup.addAll(group.getServers());
757      }
758    }
759
760    // Get all online servers from Zookeeper and find out servers in default group
761    SortedSet<Address> defaultServers = Sets.newTreeSet();
762    for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) {
763      Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
764      if (!serversInOtherGroup.contains(server)) { // not in other groups
765        defaultServers.add(server);
766      }
767    }
768    return defaultServers;
769  }
770
771  private class RSGroupStartupWorker extends Thread {
772    private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
773    private volatile boolean online = false;
774
775    RSGroupStartupWorker() {
776      super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
777      setDaemon(true);
778    }
779
780    @Override
781    public void run() {
782      if (waitForGroupTableOnline()) {
783        LOG.info("GroupBasedLoadBalancer is now online");
784      } else {
785        LOG.warn("Quit without making region group table online");
786      }
787    }
788
789    private boolean waitForGroupTableOnline() {
790      while (isMasterRunning(masterServices)) {
791        try {
792          TableStateManager tsm = masterServices.getTableStateManager();
793          if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) {
794            createRSGroupTable();
795          }
796          // try reading from the table
797          FutureUtils.get(conn.getTable(RSGROUP_TABLE_NAME).get(new Get(ROW_KEY)));
798          LOG.info("RSGroup table={} is online, refreshing cached information", RSGROUP_TABLE_NAME);
799          RSGroupInfoManagerImpl.this.refresh(true);
800          online = true;
801          // flush any inconsistencies between ZK and HTable
802          RSGroupInfoManagerImpl.this.flushConfig();
803          // migrate after we are online.
804          migrate();
805          return true;
806        } catch (Exception e) {
807          LOG.warn("Failed to perform check", e);
808          // 100ms is short so let's just ignore the interrupt
809          Threads.sleepWithoutInterrupt(100);
810        }
811      }
812      return false;
813    }
814
815    private void createRSGroupTable() throws IOException {
816      OptionalLong optProcId = masterServices.getProcedures().stream()
817        .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p)
818        .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId)
819        .findFirst();
820      long procId;
821      if (optProcId.isPresent()) {
822        procId = optProcId.getAsLong();
823      } else {
824        LOG.debug("Creating group table {}", RSGROUP_TABLE_NAME);
825        procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
826      }
827      // wait for region to be online
828      int tries = 600;
829      while (
830        !(masterServices.getMasterProcedureExecutor().isFinished(procId))
831          && masterServices.getMasterProcedureExecutor().isRunning() && tries > 0
832      ) {
833        try {
834          Thread.sleep(100);
835        } catch (InterruptedException e) {
836          throw new IOException("Wait interrupted ", e);
837        }
838        tries--;
839      }
840      if (tries <= 0) {
841        throw new IOException("Failed to create group table in a given time.");
842      } else {
843        Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
844        if (result != null && result.isFailed()) {
845          throw new IOException(
846            "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result));
847        }
848      }
849    }
850
851    public boolean isOnline() {
852      return online;
853    }
854  }
855
856  private static boolean isMasterRunning(MasterServices masterServices) {
857    return !masterServices.isAborted() && !masterServices.isStopped();
858  }
859
860  private void multiMutate(List<Mutation> mutations) throws IOException {
861    MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
862    for (Mutation mutation : mutations) {
863      if (mutation instanceof Put) {
864        builder
865          .addMutationRequest(ProtobufUtil.toMutation(MutationProto.MutationType.PUT, mutation));
866      } else if (mutation instanceof Delete) {
867        builder
868          .addMutationRequest(ProtobufUtil.toMutation(MutationProto.MutationType.DELETE, mutation));
869      } else {
870        throw new DoNotRetryIOException(
871          "multiMutate doesn't support " + mutation.getClass().getName());
872      }
873    }
874    MutateRowsRequest request = builder.build();
875    AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME);
876    LOG.debug("Multimutating {} with {} mutations", RSGROUP_TABLE_NAME, mutations.size());
877    FutureUtils.get(table.<MultiRowMutationService, MutateRowsResponse> coprocessorService(
878      MultiRowMutationService::newStub,
879      (stub, controller, done) -> stub.mutateRows(controller, request, done), ROW_KEY));
880    LOG.info("Multimutating {} with {} mutations done", RSGROUP_TABLE_NAME, mutations.size());
881  }
882
883  private void checkGroupName(String groupName) throws ConstraintException {
884    if (!groupName.matches("[a-zA-Z0-9_]+")) {
885      throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
886    }
887  }
888
889  @Override
890  public RSGroupInfo getRSGroupForTable(TableName tableName) throws IOException {
891    return holder.tableName2Group.get(tableName);
892  }
893
894  /**
895   * Check if the set of servers are belong to dead servers list or online servers list.
896   * @param servers servers to remove
897   */
898  private void checkForDeadOrOnlineServers(Set<Address> servers) throws IOException {
899    // This ugliness is because we only have Address, not ServerName.
900    Set<Address> onlineServers = new HashSet<>();
901    List<ServerName> drainingServers = masterServices.getServerManager().getDrainingServersList();
902    for (ServerName server : masterServices.getServerManager().getOnlineServers().keySet()) {
903      // Only online but not decommissioned servers are really online
904      if (!drainingServers.contains(server)) {
905        onlineServers.add(server.getAddress());
906      }
907    }
908
909    Set<Address> deadServers = new HashSet<>();
910    for (ServerName server : masterServices.getServerManager().getDeadServers().copyServerNames()) {
911      deadServers.add(server.getAddress());
912    }
913
914    for (Address address : servers) {
915      if (onlineServers.contains(address)) {
916        throw new DoNotRetryIOException(
917          "Server " + address + " is an online server, not allowed to remove.");
918      }
919      if (deadServers.contains(address)) {
920        throw new DoNotRetryIOException("Server " + address + " is on the dead servers list,"
921          + " Maybe it will come back again, not allowed to remove.");
922      }
923    }
924  }
925
926  private void checkOnlineServersOnly(Set<Address> servers) throws IOException {
927    // This uglyness is because we only have Address, not ServerName.
928    // Online servers are keyed by ServerName.
929    Set<Address> onlineServers = new HashSet<>();
930    for (ServerName server : masterServices.getServerManager().getOnlineServers().keySet()) {
931      onlineServers.add(server.getAddress());
932    }
933    for (Address address : servers) {
934      if (!onlineServers.contains(address)) {
935        throw new DoNotRetryIOException(
936          "Server " + address + " is not an online server in 'default' RSGroup.");
937      }
938    }
939  }
940
941  /** Returns List of Regions associated with this <code>server</code>. */
942  private List<RegionInfo> getRegions(final Address server) {
943    LinkedList<RegionInfo> regions = new LinkedList<>();
944    for (Map.Entry<RegionInfo, ServerName> el : masterServices.getAssignmentManager()
945      .getRegionStates().getRegionAssignments().entrySet()) {
946      if (el.getValue() == null) {
947        continue;
948      }
949
950      if (el.getValue().getAddress().equals(server)) {
951        addRegion(regions, el.getKey());
952      }
953    }
954    for (RegionStateNode state : masterServices.getAssignmentManager().getRegionsInTransition()) {
955      if (
956        state.getRegionLocation() != null && state.getRegionLocation().getAddress().equals(server)
957      ) {
958        addRegion(regions, state.getRegionInfo());
959      }
960    }
961    return regions;
962  }
963
964  private void addRegion(final LinkedList<RegionInfo> regions, RegionInfo hri) {
965    // If meta, move it last otherwise other unassigns fail because meta is not
966    // online for them to update state in. This is dodgy. Needs to be made more
967    // robust. See TODO below.
968    if (hri.isMetaRegion()) {
969      regions.addLast(hri);
970    } else {
971      regions.addFirst(hri);
972    }
973  }
974
975  /**
976   * Move every region from servers which are currently located on these servers, but should not be
977   * located there.
978   * @param movedServers    the servers that are moved to new group
979   * @param srcGrpServers   all servers in the source group, excluding the movedServers
980   * @param targetGroupName the target group
981   * @param sourceGroupName the source group
982   * @throws IOException if moving the server and tables fail
983   */
984  private void moveServerRegionsFromGroup(Set<Address> movedServers, Set<Address> srcGrpServers,
985    String targetGroupName, String sourceGroupName) throws IOException {
986    moveRegionsBetweenGroups(movedServers, srcGrpServers, targetGroupName, sourceGroupName,
987      rs -> getRegions(rs), info -> {
988        try {
989          String groupName = RSGroupUtil.getRSGroupInfo(masterServices, this, info.getTable())
990            .map(RSGroupInfo::getName).orElse(RSGroupInfo.DEFAULT_GROUP);
991          return groupName.equals(targetGroupName);
992        } catch (IOException e) {
993          LOG.warn("Failed to test group for region {} and target group {}", info, targetGroupName);
994          return false;
995        }
996      });
997  }
998
999  private <T> void moveRegionsBetweenGroups(Set<T> regionsOwners, Set<Address> newRegionsOwners,
1000    String targetGroupName, String sourceGroupName, Function<T, List<RegionInfo>> getRegionsInfo,
1001    Function<RegionInfo, Boolean> validation) throws IOException {
1002    // Get server names corresponding to given Addresses
1003    List<ServerName> movedServerNames = new ArrayList<>(regionsOwners.size());
1004    List<ServerName> srcGrpServerNames = new ArrayList<>(newRegionsOwners.size());
1005    for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) {
1006      // In case region move failed in previous attempt, regionsOwners and newRegionsOwners
1007      // can have the same servers. So for all servers below both conditions to be checked
1008      if (newRegionsOwners.contains(serverName.getAddress())) {
1009        srcGrpServerNames.add(serverName);
1010      }
1011      if (regionsOwners.contains(serverName.getAddress())) {
1012        movedServerNames.add(serverName);
1013      }
1014    }
1015    List<Pair<RegionInfo, Future<byte[]>>> assignmentFutures = new ArrayList<>();
1016    int retry = 0;
1017    Set<String> failedRegions = new HashSet<>();
1018    IOException toThrow = null;
1019    do {
1020      assignmentFutures.clear();
1021      failedRegions.clear();
1022      for (ServerName owner : movedServerNames) {
1023        // Get regions that are associated with this server and filter regions by group tables.
1024        for (RegionInfo region : getRegionsInfo.apply((T) owner.getAddress())) {
1025          if (!validation.apply(region)) {
1026            LOG.info("Moving region {}, which does not belong to RSGroup {}",
1027              region.getShortNameToLog(), targetGroupName);
1028            // Move region back to source RSGroup servers
1029            ServerName dest =
1030              masterServices.getLoadBalancer().randomAssignment(region, srcGrpServerNames);
1031            if (dest == null) {
1032              failedRegions.add(region.getRegionNameAsString());
1033              continue;
1034            }
1035            RegionPlan rp = new RegionPlan(region, owner, dest);
1036            try {
1037              Future<byte[]> future = masterServices.getAssignmentManager().moveAsync(rp);
1038              assignmentFutures.add(Pair.newPair(region, future));
1039            } catch (IOException ioe) {
1040              failedRegions.add(region.getRegionNameAsString());
1041              LOG.debug("Move region {} failed, will retry, current retry time is {}",
1042                region.getShortNameToLog(), retry, ioe);
1043              toThrow = ioe;
1044            }
1045          }
1046        }
1047      }
1048      waitForRegionMovement(assignmentFutures, failedRegions, sourceGroupName, retry);
1049      if (failedRegions.isEmpty()) {
1050        LOG.info("All regions from {} are moved back to {}", movedServerNames, sourceGroupName);
1051        return;
1052      } else {
1053        try {
1054          wait(1000);
1055        } catch (InterruptedException e) {
1056          LOG.warn("Sleep interrupted", e);
1057          Thread.currentThread().interrupt();
1058        }
1059        retry++;
1060      }
1061    } while (
1062      !failedRegions.isEmpty() && retry <= masterServices.getConfiguration()
1063        .getInt(FAILED_MOVE_MAX_RETRY, DEFAULT_MAX_RETRY_VALUE)
1064    );
1065
1066    // has up to max retry time or there are no more regions to move
1067    if (!failedRegions.isEmpty()) {
1068      // print failed moved regions, for later process conveniently
1069      String msg = String.format("move regions for group %s failed, failed regions: %s",
1070        sourceGroupName, failedRegions);
1071      LOG.error(msg);
1072      throw new DoNotRetryIOException(
1073        msg + ", just record the last failed region's cause, more details in server log", toThrow);
1074    }
1075  }
1076
1077  /**
1078   * Wait for all the region move to complete. Keep waiting for other region movement completion
1079   * even if some region movement fails.
1080   */
1081  private void waitForRegionMovement(List<Pair<RegionInfo, Future<byte[]>>> regionMoveFutures,
1082    Set<String> failedRegions, String sourceGroupName, int retryCount) {
1083    LOG.info("Moving {} region(s) to group {}, current retry={}", regionMoveFutures.size(),
1084      sourceGroupName, retryCount);
1085    for (Pair<RegionInfo, Future<byte[]>> pair : regionMoveFutures) {
1086      try {
1087        pair.getSecond().get();
1088        if (
1089          masterServices.getAssignmentManager().getRegionStates().getRegionState(pair.getFirst())
1090            .isFailedOpen()
1091        ) {
1092          failedRegions.add(pair.getFirst().getRegionNameAsString());
1093        }
1094      } catch (InterruptedException e) {
1095        // Dont return form there lets wait for other regions to complete movement.
1096        failedRegions.add(pair.getFirst().getRegionNameAsString());
1097        LOG.warn("Sleep interrupted", e);
1098      } catch (Exception e) {
1099        failedRegions.add(pair.getFirst().getRegionNameAsString());
1100        LOG.error("Move region {} to group {} failed, will retry on next attempt",
1101          pair.getFirst().getShortNameToLog(), sourceGroupName, e);
1102      }
1103    }
1104  }
1105
1106  private boolean isTableInGroup(TableName tableName, String groupName,
1107    Set<TableName> tablesInGroupCache) throws IOException {
1108    if (tablesInGroupCache.contains(tableName)) {
1109      return true;
1110    }
1111    if (
1112      RSGroupUtil.getRSGroupInfo(masterServices, this, tableName).map(RSGroupInfo::getName)
1113        .orElse(RSGroupInfo.DEFAULT_GROUP).equals(groupName)
1114    ) {
1115      tablesInGroupCache.add(tableName);
1116      return true;
1117    }
1118    return false;
1119  }
1120
1121  private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName)
1122    throws IOException {
1123    Map<String, RegionState> rit = Maps.newTreeMap();
1124    Set<TableName> tablesInGroupCache = new HashSet<>();
1125    for (RegionStateNode regionNode : masterServices.getAssignmentManager()
1126      .getRegionsInTransition()) {
1127      TableName tn = regionNode.getTable();
1128      if (isTableInGroup(tn, groupName, tablesInGroupCache)) {
1129        rit.put(regionNode.getRegionInfo().getEncodedName(), regionNode.toRegionState());
1130      }
1131    }
1132    return rit;
1133  }
1134
1135  /**
1136   * This is an EXPENSIVE clone. Cloning though is the safest thing to do. Can't let out original
1137   * since it can change and at least the load balancer wants to iterate this exported list. Load
1138   * balancer should iterate over this list because cloned list will ignore disabled table and split
1139   * parent region cases. This method is invoked by {@link #balanceRSGroup}
1140   * @return A clone of current assignments for this group.
1141   */
1142  Map<TableName, Map<ServerName, List<RegionInfo>>> getRSGroupAssignmentsByTable(
1143    TableStateManager tableStateManager, String groupName) throws IOException {
1144    Map<TableName, Map<ServerName, List<RegionInfo>>> result = Maps.newHashMap();
1145    Set<TableName> tablesInGroupCache = new HashSet<>();
1146    for (Map.Entry<RegionInfo, ServerName> entry : masterServices.getAssignmentManager()
1147      .getRegionStates().getRegionAssignments().entrySet()) {
1148      RegionInfo region = entry.getKey();
1149      TableName tn = region.getTable();
1150      ServerName server = entry.getValue();
1151      if (isTableInGroup(tn, groupName, tablesInGroupCache)) {
1152        if (
1153          tableStateManager.isTableState(tn, TableState.State.DISABLED, TableState.State.DISABLING)
1154        ) {
1155          continue;
1156        }
1157        if (region.isSplitParent()) {
1158          continue;
1159        }
1160        result.computeIfAbsent(tn, k -> new HashMap<>())
1161          .computeIfAbsent(server, k -> new ArrayList<>()).add(region);
1162      }
1163    }
1164    RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName);
1165    for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) {
1166      if (rsGroupInfo.containsServer(serverName.getAddress())) {
1167        for (Map<ServerName, List<RegionInfo>> map : result.values()) {
1168          map.computeIfAbsent(serverName, k -> Collections.emptyList());
1169        }
1170      }
1171    }
1172    return result;
1173  }
1174
1175  @Override
1176  public BalanceResponse balanceRSGroup(String groupName, BalanceRequest request)
1177    throws IOException {
1178    ServerManager serverManager = masterServices.getServerManager();
1179    LoadBalancer balancer = masterServices.getLoadBalancer();
1180    getRSGroupInfo(groupName);
1181
1182    BalanceResponse.Builder responseBuilder = BalanceResponse.newBuilder();
1183
1184    synchronized (balancer) {
1185      // If balance not true, don't run balancer.
1186      if (!masterServices.isBalancerOn() && !request.isDryRun()) {
1187        return responseBuilder.build();
1188      }
1189
1190      // Only allow one balance run at at time.
1191      Map<String, RegionState> groupRIT = rsGroupGetRegionsInTransition(groupName);
1192      if (groupRIT.size() > 0 && !request.isIgnoreRegionsInTransition()) {
1193        LOG.debug("Not running balancer because {} region(s) in transition: {}", groupRIT.size(),
1194          StringUtils.abbreviate(masterServices.getAssignmentManager().getRegionStates()
1195            .getRegionsInTransition().toString(), 256));
1196        return responseBuilder.build();
1197      }
1198
1199      if (serverManager.areDeadServersInProgress()) {
1200        LOG.debug("Not running balancer because processing dead regionserver(s): {}",
1201          serverManager.getDeadServers());
1202        return responseBuilder.build();
1203      }
1204
1205      // We balance per group instead of per table
1206      Map<TableName, Map<ServerName, List<RegionInfo>>> assignmentsByTable =
1207        getRSGroupAssignmentsByTable(masterServices.getTableStateManager(), groupName);
1208      List<RegionPlan> plans = balancer.balanceCluster(assignmentsByTable);
1209      boolean balancerRan = !plans.isEmpty();
1210
1211      responseBuilder.setBalancerRan(balancerRan).setMovesCalculated(plans.size());
1212
1213      if (balancerRan && !request.isDryRun()) {
1214        LOG.info("RSGroup balance {} starting with plan count: {}", groupName, plans.size());
1215        List<RegionPlan> executed = masterServices.executeRegionPlansWithThrottling(plans);
1216        responseBuilder.setMovesExecuted(executed.size());
1217        LOG.info("RSGroup balance " + groupName + " completed");
1218      }
1219
1220      return responseBuilder.build();
1221    }
1222  }
1223
1224  private void moveTablesAndWait(Set<TableName> tables, String targetGroup) throws IOException {
1225    LOG.debug("Moving {} tables to target group {}", tables.size(), targetGroup);
1226    List<Long> procIds = new ArrayList<Long>();
1227    for (TableName tableName : tables) {
1228      TableDescriptor oldTd = masterServices.getTableDescriptors().get(tableName);
1229      if (oldTd == null) {
1230        continue;
1231      }
1232      TableDescriptor newTd =
1233        TableDescriptorBuilder.newBuilder(oldTd).setRegionServerGroup(targetGroup).build();
1234      procIds.add(
1235        masterServices.modifyTable(tableName, newTd, HConstants.NO_NONCE, HConstants.NO_NONCE));
1236    }
1237    for (long procId : procIds) {
1238      Procedure<?> proc = masterServices.getMasterProcedureExecutor().getProcedure(procId);
1239      if (proc == null) {
1240        continue;
1241      }
1242      ProcedureSyncWait.waitForProcedureToCompleteIOE(masterServices.getMasterProcedureExecutor(),
1243        proc, Long.MAX_VALUE);
1244    }
1245    LOG.info("Move tables done: moved {} tables to {}", tables.size(), targetGroup);
1246    if (LOG.isDebugEnabled()) {
1247      LOG.debug("Tables moved to {}: {}", targetGroup, tables);
1248    }
1249  }
1250
1251  @Override
1252  public void setRSGroup(Set<TableName> tables, String groupName) throws IOException {
1253    getRSGroupInfo(groupName);
1254    moveTablesAndWait(tables, groupName);
1255  }
1256
1257  public void moveServers(Set<Address> servers, String targetGroupName) throws IOException {
1258    if (servers == null) {
1259      throw new ConstraintException("The list of servers to move cannot be null.");
1260    }
1261    if (servers.isEmpty()) {
1262      // For some reason this difference between null servers and isEmpty is important distinction.
1263      // TODO. Why? Stuff breaks if I equate them.
1264      return;
1265    }
1266    if (StringUtils.isEmpty(targetGroupName)) {
1267      throw new ConstraintException("RSGroup cannot be null.");
1268    }
1269
1270    // Hold a lock on the manager instance while moving servers to prevent
1271    // another writer changing our state while we are working.
1272    synchronized (this) {
1273      // Presume first server's source group. Later ensure all servers are from this group.
1274      Address firstServer = servers.iterator().next();
1275      RSGroupInfo srcGrp = getRSGroupOfServer(firstServer);
1276      if (srcGrp == null) {
1277        // Be careful. This exception message is tested for in TestRSGroupAdmin2...
1278        throw new ConstraintException(
1279          "Server " + firstServer + " is either offline or it does not exist.");
1280      }
1281
1282      // Only move online servers (when moving from 'default') or servers from other
1283      // groups. This prevents bogus servers from entering groups
1284      if (RSGroupInfo.DEFAULT_GROUP.equals(srcGrp.getName())) {
1285        if (srcGrp.getServers().size() <= servers.size()) {
1286          throw new ConstraintException(KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE);
1287        }
1288        checkOnlineServersOnly(servers);
1289      }
1290      // Ensure all servers are of same rsgroup.
1291      for (Address server : servers) {
1292        String tmpGroup = getRSGroupOfServer(server).getName();
1293        if (!tmpGroup.equals(srcGrp.getName())) {
1294          throw new ConstraintException("Move server request should only come from one source "
1295            + "RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup);
1296        }
1297      }
1298      if (srcGrp.getServers().size() <= servers.size()) {
1299        // check if there are still tables reference this group
1300        for (TableDescriptor td : masterServices.getTableDescriptors().getAll().values()) {
1301          Optional<String> optGroupName = td.getRegionServerGroup();
1302          if (optGroupName.isPresent() && optGroupName.get().equals(srcGrp.getName())) {
1303            throw new ConstraintException(
1304              "Cannot leave a RSGroup " + srcGrp.getName() + " that contains tables('"
1305                + td.getTableName() + "' at least) without servers to host them.");
1306          }
1307        }
1308      }
1309
1310      // MovedServers may be < passed in 'servers'.
1311      Set<Address> movedServers = moveServers(servers, srcGrp.getName(), targetGroupName);
1312      moveServerRegionsFromGroup(movedServers, srcGrp.getServers(), targetGroupName,
1313        srcGrp.getName());
1314      LOG.info("Move servers done: moved {} servers from {} to {}", movedServers.size(),
1315        srcGrp.getName(), targetGroupName);
1316      if (LOG.isDebugEnabled()) {
1317        LOG.debug("Servers moved from {} to {}: {}", srcGrp.getName(), targetGroupName,
1318          movedServers);
1319      }
1320    }
1321  }
1322
1323  @Override
1324  public String determineRSGroupInfoForTable(TableName tableName) {
1325    return script.getRSGroup(tableName.getNamespaceAsString(), tableName.getQualifierAsString());
1326  }
1327
1328  @Override
1329  public synchronized void renameRSGroup(String oldName, String newName) throws IOException {
1330    if (oldName.equals(RSGroupInfo.DEFAULT_GROUP)) {
1331      throw new ConstraintException(RSGroupInfo.DEFAULT_GROUP + " can't be rename");
1332    }
1333    checkGroupName(newName);
1334    // getRSGroupInfo validates old RSGroup existence.
1335    RSGroupInfo oldRSG = getRSGroupInfo(oldName);
1336    Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
1337    if (rsGroupMap.containsKey(newName)) {
1338      throw new ConstraintException("Group already exists: " + newName);
1339    }
1340
1341    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
1342    newGroupMap.remove(oldRSG.getName());
1343    RSGroupInfo newRSG = new RSGroupInfo(newName, oldRSG.getServers());
1344    newGroupMap.put(newName, newRSG);
1345    flushConfig(newGroupMap);
1346    Set<TableName> updateTables = masterServices.getTableDescriptors().getAll().values().stream()
1347      .filter(t -> oldName.equals(t.getRegionServerGroup().orElse(null)))
1348      .map(TableDescriptor::getTableName).collect(Collectors.toSet());
1349    setRSGroup(updateTables, newName);
1350    LOG.info("Rename RSGroup done: {} => {}", oldName, newName);
1351  }
1352
1353  @Override
1354  public synchronized void updateRSGroupConfig(String groupName, Map<String, String> configuration)
1355    throws IOException {
1356    if (RSGroupInfo.DEFAULT_GROUP.equals(groupName)) {
1357      // We do not persist anything of default group, therefore, it is not supported to update
1358      // default group's configuration which lost once master down.
1359      throw new ConstraintException(
1360        "configuration of " + RSGroupInfo.DEFAULT_GROUP + " can't be stored persistently");
1361    }
1362    RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName);
1363    rsGroupInfo.getConfiguration().forEach((k, v) -> rsGroupInfo.removeConfiguration(k));
1364    configuration.forEach((k, v) -> rsGroupInfo.setConfiguration(k, v));
1365    flushConfig();
1366  }
1367}