001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import java.io.ByteArrayInputStream;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.OptionalLong;
032import java.util.Set;
033import java.util.SortedSet;
034import java.util.TreeSet;
035import java.util.concurrent.Future;
036import java.util.function.Function;
037import java.util.stream.Collectors;
038import org.apache.commons.lang3.StringUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Coprocessor;
041import org.apache.hadoop.hbase.DoNotRetryIOException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.NamespaceDescriptor;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableDescriptors;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.AsyncClusterConnection;
048import org.apache.hadoop.hbase.client.AsyncTable;
049import org.apache.hadoop.hbase.client.BalanceRequest;
050import org.apache.hadoop.hbase.client.BalanceResponse;
051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
052import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
053import org.apache.hadoop.hbase.client.Delete;
054import org.apache.hadoop.hbase.client.Get;
055import org.apache.hadoop.hbase.client.Mutation;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.RegionInfo;
058import org.apache.hadoop.hbase.client.Result;
059import org.apache.hadoop.hbase.client.ResultScanner;
060import org.apache.hadoop.hbase.client.TableDescriptor;
061import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
062import org.apache.hadoop.hbase.client.TableState;
063import org.apache.hadoop.hbase.constraint.ConstraintException;
064import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
065import org.apache.hadoop.hbase.exceptions.DeserializationException;
066import org.apache.hadoop.hbase.master.LoadBalancer;
067import org.apache.hadoop.hbase.master.MasterServices;
068import org.apache.hadoop.hbase.master.RegionPlan;
069import org.apache.hadoop.hbase.master.RegionState;
070import org.apache.hadoop.hbase.master.ServerListener;
071import org.apache.hadoop.hbase.master.ServerManager;
072import org.apache.hadoop.hbase.master.TableStateManager;
073import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
074import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
075import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
076import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
077import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
078import org.apache.hadoop.hbase.net.Address;
079import org.apache.hadoop.hbase.procedure2.Procedure;
080import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
081import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
082import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
083import org.apache.hadoop.hbase.util.Bytes;
084import org.apache.hadoop.hbase.util.FutureUtils;
085import org.apache.hadoop.hbase.util.Pair;
086import org.apache.hadoop.hbase.util.Threads;
087import org.apache.hadoop.hbase.zookeeper.ZKUtil;
088import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
089import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
090import org.apache.hadoop.util.Shell;
091import org.apache.yetus.audience.InterfaceAudience;
092import org.apache.zookeeper.KeeperException;
093import org.slf4j.Logger;
094import org.slf4j.LoggerFactory;
095
096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
097import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
098import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
099import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
100
101import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupProtos;
107
108/**
109 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the
110 * persistence store for the group information. It also makes use of zookeeper to store group
111 * information needed for bootstrapping during offline mode.
112 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached
113 * RSGroupInfo Map at {@link RSGroupInfoHolder#groupName2Group}. These Maps are persisted to the
114 * hbase:rsgroup table (and cached in zk) on each modification.
115 * <p/>
116 * Mutations on state are synchronized but reads can continue without having to wait on an instance
117 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
118 * state are read-only, just-in-case (see flushConfig).
119 * <p/>
120 * Reads must not block else there is a danger we'll deadlock.
121 * <p/>
122 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
123 * on the results of the query modifying cache in zookeeper without another thread making
124 * intermediate modifications. These clients synchronize on the 'this' instance so no other has
125 * access concurrently. Reads must be able to continue concurrently.
126 */
127@InterfaceAudience.Private
128final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
129  private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
130
131  // Assigned before user tables
132  static final TableName RSGROUP_TABLE_NAME =
133    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");
134
135  static final String KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE =
136    "should keep at least " + "one server in 'default' RSGroup.";
137
138  /** Define the config key of retries threshold when movements failed */
139  static final String FAILED_MOVE_MAX_RETRY = "hbase.rsgroup.move.max.retry";
140
141  /** Define the default number of retries */
142  static final int DEFAULT_MAX_RETRY_VALUE = 50;
143
144  private static final String RS_GROUP_ZNODE = "rsgroup";
145
146  static final byte[] META_FAMILY_BYTES = Bytes.toBytes("m");
147
148  static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i");
149
150  static final String MIGRATE_THREAD_NAME = "Migrate-RSGroup-Tables";
151
152  private static final byte[] ROW_KEY = { 0 };
153
154  /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
155  private static final TableDescriptor RSGROUP_TABLE_DESC;
156  static {
157    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME)
158      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES))
159      .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
160    try {
161      builder.setCoprocessor(
162        CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
163          .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
164    } catch (IOException ex) {
165      throw new Error(ex);
166    }
167    RSGROUP_TABLE_DESC = builder.build();
168  }
169
170  // There two Maps are immutable and wholesale replaced on each modification
171  // so are safe to access concurrently. See class comment.
172  private static final class RSGroupInfoHolder {
173    final ImmutableMap<String, RSGroupInfo> groupName2Group;
174    final ImmutableMap<TableName, RSGroupInfo> tableName2Group;
175
176    RSGroupInfoHolder() {
177      this(Collections.emptyMap());
178    }
179
180    RSGroupInfoHolder(Map<String, RSGroupInfo> rsGroupMap) {
181      ImmutableMap.Builder<String, RSGroupInfo> group2Name2GroupBuilder = ImmutableMap.builder();
182      ImmutableMap.Builder<TableName, RSGroupInfo> tableName2GroupBuilder = ImmutableMap.builder();
183      rsGroupMap.forEach((groupName, rsGroupInfo) -> {
184        group2Name2GroupBuilder.put(groupName, rsGroupInfo);
185        if (!groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
186          rsGroupInfo.getTables()
187            .forEach(tableName -> tableName2GroupBuilder.put(tableName, rsGroupInfo));
188        }
189      });
190      this.groupName2Group = group2Name2GroupBuilder.build();
191      this.tableName2Group = tableName2GroupBuilder.build();
192    }
193  }
194
195  private volatile RSGroupInfoHolder holder = new RSGroupInfoHolder();
196
197  private final MasterServices masterServices;
198  private final AsyncClusterConnection conn;
199  private final ZKWatcher watcher;
200  private final RSGroupStartupWorker rsGroupStartupWorker;
201  // contains list of groups that were last flushed to persistent store
202  private Set<String> prevRSGroups = new HashSet<>();
203
204  // Package visibility for testing
205  static class RSGroupMappingScript {
206    static final String RS_GROUP_MAPPING_SCRIPT = "hbase.rsgroup.table.mapping.script";
207    static final String RS_GROUP_MAPPING_SCRIPT_TIMEOUT =
208      "hbase.rsgroup.table.mapping.script.timeout";
209    private Shell.ShellCommandExecutor rsgroupMappingScript;
210
211    RSGroupMappingScript(Configuration conf) {
212      String script = conf.get(RS_GROUP_MAPPING_SCRIPT);
213      if (script == null || script.isEmpty()) {
214        return;
215      }
216
217      rsgroupMappingScript = new Shell.ShellCommandExecutor(new String[] { script, "", "" }, null,
218        null, conf.getLong(RS_GROUP_MAPPING_SCRIPT_TIMEOUT, 5000) // 5 seconds
219      );
220    }
221
222    String getRSGroup(String namespace, String tablename) {
223      if (rsgroupMappingScript == null) {
224        return null;
225      }
226      String[] exec = rsgroupMappingScript.getExecString();
227      exec[1] = namespace;
228      exec[2] = tablename;
229      try {
230        rsgroupMappingScript.execute();
231      } catch (IOException e) {
232        // This exception may happen, like process doesn't have permission to run this script.
233        LOG.error("{}, placing {} back to default rsgroup", e.getMessage(),
234          TableName.valueOf(namespace, tablename));
235        return RSGroupInfo.DEFAULT_GROUP;
236      }
237      return rsgroupMappingScript.getOutput().trim();
238    }
239  }
240
241  private RSGroupMappingScript script;
242
243  private RSGroupInfoManagerImpl(MasterServices masterServices) {
244    this.masterServices = masterServices;
245    this.watcher = masterServices.getZooKeeper();
246    this.conn = masterServices.getAsyncClusterConnection();
247    this.rsGroupStartupWorker = new RSGroupStartupWorker();
248    this.script = new RSGroupMappingScript(masterServices.getConfiguration());
249  }
250
251  private synchronized void updateDefaultServers() {
252    LOG.info("Updating default servers.");
253    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
254    RSGroupInfo oldDefaultGroupInfo = getRSGroup(RSGroupInfo.DEFAULT_GROUP);
255    assert oldDefaultGroupInfo != null;
256    RSGroupInfo newDefaultGroupInfo =
257      new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers());
258    newDefaultGroupInfo.addAllTables(oldDefaultGroupInfo.getTables());
259    newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroupInfo);
260    // do not need to persist, as we do not persist default group.
261    resetRSGroupMap(newGroupMap);
262    LOG.info("Updated default servers, {} servers", newDefaultGroupInfo.getServers().size());
263    if (LOG.isDebugEnabled()) {
264      LOG.debug("New default servers list: {}", newDefaultGroupInfo.getServers());
265    }
266  }
267
268  private synchronized void init() throws IOException {
269    refresh(false);
270    masterServices.getServerManager().registerListener(new ServerListener() {
271
272      @Override
273      public void serverAdded(ServerName serverName) {
274        updateDefaultServers();
275      }
276
277      @Override
278      public void serverRemoved(ServerName serverName) {
279        updateDefaultServers();
280      }
281    });
282  }
283
284  static RSGroupInfoManager getInstance(MasterServices masterServices) throws IOException {
285    RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(masterServices);
286    instance.init();
287    return instance;
288  }
289
290  public void start() {
291    // create system table of rsgroup
292    rsGroupStartupWorker.start();
293  }
294
295  @Override
296  public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
297    checkGroupName(rsGroupInfo.getName());
298    Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
299    if (
300      rsGroupMap.get(rsGroupInfo.getName()) != null
301        || rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)
302    ) {
303      throw new ConstraintException("Group already exists: " + rsGroupInfo.getName());
304    }
305    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
306    newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
307    flushConfig(newGroupMap);
308    LOG.info("Add group {} done.", rsGroupInfo.getName());
309  }
310
311  private RSGroupInfo getRSGroupInfo(final String groupName) throws ConstraintException {
312    RSGroupInfo rsGroupInfo = holder.groupName2Group.get(groupName);
313    if (rsGroupInfo == null) {
314      throw new ConstraintException("RSGroup " + groupName + " does not exist");
315    }
316    return rsGroupInfo;
317  }
318
319  /**
320   * @return Set of online Servers named for their hostname and port (not ServerName).
321   */
322  private Set<Address> getOnlineServers() {
323    return masterServices.getServerManager().getOnlineServers().keySet().stream()
324      .map(ServerName::getAddress).collect(Collectors.toSet());
325  }
326
327  public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
328    String dstGroup) throws IOException {
329    RSGroupInfo src = getRSGroupInfo(srcGroup);
330    RSGroupInfo dst = getRSGroupInfo(dstGroup);
331    Set<Address> movedServers = new HashSet<>();
332    // If destination is 'default' rsgroup, only add servers that are online. If not online, drop
333    // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
334    // rsgroup of dead servers that are to come back later).
335    Set<Address> onlineServers =
336      dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers() : null;
337    for (Address el : servers) {
338      src.removeServer(el);
339      if (onlineServers != null) {
340        if (!onlineServers.contains(el)) {
341          if (LOG.isDebugEnabled()) {
342            LOG.debug("Dropping " + el + " during move-to-default RSGroup because not online");
343          }
344          continue;
345        }
346      }
347      dst.addServer(el);
348      movedServers.add(el);
349    }
350    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
351    newGroupMap.put(src.getName(), src);
352    newGroupMap.put(dst.getName(), dst);
353    flushConfig(newGroupMap);
354    return movedServers;
355  }
356
357  @Override
358  public RSGroupInfo getRSGroupOfServer(Address serverHostPort) {
359    for (RSGroupInfo info : holder.groupName2Group.values()) {
360      if (info.containsServer(serverHostPort)) {
361        return info;
362      }
363    }
364    return null;
365  }
366
367  @Override
368  public RSGroupInfo getRSGroup(String groupName) {
369    return holder.groupName2Group.get(groupName);
370  }
371
372  @Override
373  public synchronized void removeRSGroup(String groupName) throws IOException {
374    RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName);
375    int serverCount = rsGroupInfo.getServers().size();
376    if (serverCount > 0) {
377      throw new ConstraintException("RSGroup " + groupName + " has " + serverCount
378        + " servers; you must remove these servers from the RSGroup before"
379        + " the RSGroup can be removed.");
380    }
381    for (TableDescriptor td : masterServices.getTableDescriptors().getAll().values()) {
382      if (td.getRegionServerGroup().map(groupName::equals).orElse(false)) {
383        throw new ConstraintException("RSGroup " + groupName + " is already referenced by "
384          + td.getTableName() + "; you must remove all the tables from the RSGroup before "
385          + "the RSGroup can be removed.");
386      }
387    }
388    for (NamespaceDescriptor ns : masterServices.getClusterSchema().getNamespaces()) {
389      String nsGroup = ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
390      if (nsGroup != null && nsGroup.equals(groupName)) {
391        throw new ConstraintException(
392          "RSGroup " + groupName + " is referenced by namespace: " + ns.getName());
393      }
394    }
395    Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
396    if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
397      throw new ConstraintException(
398        "Group " + groupName + " does not exist or is a reserved " + "group");
399    }
400    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
401    newGroupMap.remove(groupName);
402    flushConfig(newGroupMap);
403    LOG.info("Remove group {} done", groupName);
404  }
405
406  @Override
407  public List<RSGroupInfo> listRSGroups() {
408    return Lists.newArrayList(holder.groupName2Group.values());
409  }
410
411  @Override
412  public boolean isOnline() {
413    return rsGroupStartupWorker.isOnline();
414  }
415
416  @Override
417  public synchronized void removeServers(Set<Address> servers) throws IOException {
418    if (servers == null || servers.isEmpty()) {
419      throw new ConstraintException("The set of servers to remove cannot be null or empty.");
420    }
421
422    // check the set of servers
423    checkForDeadOrOnlineServers(servers);
424
425    Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
426    for (Address el : servers) {
427      RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
428      if (rsGroupInfo != null) {
429        RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
430        if (newRsGroupInfo == null) {
431          rsGroupInfo.removeServer(el);
432          rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo);
433        } else {
434          newRsGroupInfo.removeServer(el);
435          rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
436        }
437      } else {
438        LOG.warn("Server " + el + " does not belong to any rsgroup.");
439      }
440    }
441
442    if (rsGroupInfos.size() > 0) {
443      Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
444      newGroupMap.putAll(rsGroupInfos);
445      flushConfig(newGroupMap);
446    }
447    LOG.info("Remove decommissioned servers {} from RSGroup done", servers);
448  }
449
450  private List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
451    List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
452    AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME);
453    try (ResultScanner scanner = table.getScanner(META_FAMILY_BYTES, META_QUALIFIER_BYTES)) {
454      for (Result result;;) {
455        result = scanner.next();
456        if (result == null) {
457          break;
458        }
459        RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
460          .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
461        rsGroupInfoList.add(ProtobufUtil.toGroupInfo(proto));
462      }
463    }
464    return rsGroupInfoList;
465  }
466
467  private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
468    String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE);
469    List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
470    // Overwrite any info stored by table, this takes precedence
471    try {
472      if (ZKUtil.checkExists(watcher, groupBasePath) != -1) {
473        List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath);
474        if (children == null) {
475          return RSGroupInfoList;
476        }
477        for (String znode : children) {
478          byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
479          if (data != null && data.length > 0) {
480            ProtobufUtil.expectPBMagicPrefix(data);
481            ByteArrayInputStream bis =
482              new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
483            RSGroupInfoList.add(ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
484          }
485        }
486        LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
487      }
488    } catch (KeeperException | DeserializationException | InterruptedException e) {
489      throw new IOException("Failed to read rsGroupZNode", e);
490    }
491    return RSGroupInfoList;
492  }
493
494  private void migrate(Collection<RSGroupInfo> groupList) {
495    TableDescriptors tds = masterServices.getTableDescriptors();
496    ProcedureExecutor<MasterProcedureEnv> procExec = masterServices.getMasterProcedureExecutor();
497    for (RSGroupInfo groupInfo : groupList) {
498      if (groupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
499        continue;
500      }
501      SortedSet<TableName> failedTables = new TreeSet<>();
502      List<MigrateRSGroupProcedure> procs = new ArrayList<>();
503      for (TableName tableName : groupInfo.getTables()) {
504        LOG.debug("Migrating {} in group {}", tableName, groupInfo.getName());
505        TableDescriptor oldTd;
506        try {
507          oldTd = tds.get(tableName);
508        } catch (IOException e) {
509          LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
510          failedTables.add(tableName);
511          continue;
512        }
513        if (oldTd == null) {
514          continue;
515        }
516        if (oldTd.getRegionServerGroup().isPresent()) {
517          // either we have already migrated it or that user has set the rs group using the new
518          // code which will set the group directly on table descriptor, skip.
519          LOG.debug("Skip migrating {} since it is already in group {}", tableName,
520            oldTd.getRegionServerGroup().get());
521          continue;
522        }
523        // This is a bit tricky. Since we know that the region server group config in
524        // TableDescriptor will only be used at master side, it is fine to just update the table
525        // descriptor on file system and also the cache, without reopening all the regions. This
526        // will be much faster than the normal modifyTable. And when upgrading, we will update
527        // master first and then region server, so after all the region servers has been reopened,
528        // the new TableDescriptor will be loaded.
529        MigrateRSGroupProcedure proc =
530          new MigrateRSGroupProcedure(procExec.getEnvironment(), tableName);
531        procExec.submitProcedure(proc);
532        procs.add(proc);
533      }
534      for (MigrateRSGroupProcedure proc : procs) {
535        try {
536          ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60000);
537        } catch (IOException e) {
538          LOG.warn("Failed to migrate rs group {} for table {}", groupInfo.getName(),
539            proc.getTableName());
540          failedTables.add(proc.getTableName());
541        }
542      }
543      LOG.debug("Done migrating {}, failed tables {}", groupInfo.getName(), failedTables);
544      synchronized (RSGroupInfoManagerImpl.this) {
545        Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
546        RSGroupInfo currentInfo = rsGroupMap.get(groupInfo.getName());
547        if (currentInfo != null) {
548          RSGroupInfo newInfo =
549            new RSGroupInfo(currentInfo.getName(), currentInfo.getServers(), failedTables);
550          Map<String, RSGroupInfo> newGroupMap = new HashMap<>(rsGroupMap);
551          newGroupMap.put(groupInfo.getName(), newInfo);
552          try {
553            flushConfig(newGroupMap);
554          } catch (IOException e) {
555            LOG.warn("Failed to persist rs group {}", newInfo.getName(), e);
556          }
557        }
558      }
559    }
560  }
561
562  // Migrate the table rs group info from RSGroupInfo into the table descriptor
563  // Notice that we do not want to block the initialize so this will be done in background, and
564  // during the migrating, the rs group info maybe incomplete and cause region to be misplaced.
565  private void migrate() {
566    Thread migrateThread = new Thread(MIGRATE_THREAD_NAME) {
567
568      @Override
569      public void run() {
570        LOG.info("Start migrating table rs group config");
571        while (!masterServices.isStopped()) {
572          Collection<RSGroupInfo> groups = holder.groupName2Group.values();
573          boolean hasTables = groups.stream().anyMatch(r -> !r.getTables().isEmpty());
574          if (!hasTables) {
575            break;
576          }
577          migrate(groups);
578        }
579        LOG.info("Done migrating table rs group info");
580      }
581    };
582    migrateThread.setDaemon(true);
583    migrateThread.start();
584  }
585
586  /**
587   * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on
588   * startup of the manager.
589   */
590  private synchronized void refresh(boolean forceOnline) throws IOException {
591    List<RSGroupInfo> groupList = new ArrayList<>();
592
593    // Overwrite anything read from zk, group table is source of truth
594    // if online read from GROUP table
595    if (forceOnline || isOnline()) {
596      LOG.debug("Refreshing in Online mode.");
597      groupList.addAll(retrieveGroupListFromGroupTable());
598    } else {
599      LOG.debug("Refreshing in Offline mode.");
600      groupList.addAll(retrieveGroupListFromZookeeper());
601    }
602
603    // This is added to the last of the list so it overwrites the 'default' rsgroup loaded
604    // from region group table or zk
605    groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(groupList)));
606
607    // populate the data
608    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
609    for (RSGroupInfo group : groupList) {
610      newGroupMap.put(group.getName(), group);
611    }
612    resetRSGroupMap(newGroupMap);
613    updateCacheOfRSGroups(newGroupMap.keySet());
614  }
615
616  private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOException {
617    List<Mutation> mutations = Lists.newArrayList();
618
619    // populate deletes
620    for (String groupName : prevRSGroups) {
621      if (!groupMap.containsKey(groupName)) {
622        Delete d = new Delete(Bytes.toBytes(groupName));
623        mutations.add(d);
624      }
625    }
626
627    // populate puts
628    for (RSGroupInfo gi : groupMap.values()) {
629      if (!gi.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
630        RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(gi);
631        Put p = new Put(Bytes.toBytes(gi.getName()));
632        p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
633        mutations.add(p);
634      }
635    }
636
637    if (mutations.size() > 0) {
638      multiMutate(mutations);
639    }
640  }
641
642  private synchronized void flushConfig() throws IOException {
643    flushConfig(holder.groupName2Group);
644  }
645
646  private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
647    // For offline mode persistence is still unavailable
648    // We're refreshing in-memory state but only for servers in default group
649    if (!isOnline()) {
650      if (newGroupMap == holder.groupName2Group) {
651        // When newGroupMap is this.rsGroupMap itself,
652        // do not need to check default group and other groups as followed
653        return;
654      }
655
656      LOG.debug("Offline mode, cannot persist to {}", RSGROUP_TABLE_NAME);
657
658      Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(holder.groupName2Group);
659      RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
660      RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
661      if (
662        !oldGroupMap.equals(newGroupMap)
663          /* compare both tables and servers in other groups */ || !oldDefaultGroup.getTables()
664            .equals(newDefaultGroup.getTables())
665        /* compare tables in default group */
666      ) {
667        throw new IOException("Only servers in default group can be updated during offline mode");
668      }
669
670      // Restore newGroupMap by putting its default group back
671      newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
672
673      // Refresh rsGroupMap
674      // according to the inputted newGroupMap (an updated copy of rsGroupMap)
675      this.holder = new RSGroupInfoHolder(newGroupMap);
676
677      LOG.debug("New RSGroup map: {}", newGroupMap);
678
679      // Do not need to update tableMap
680      // because only the update on servers in default group is allowed above,
681      // or IOException will be thrown
682      return;
683    }
684
685    /* For online mode, persist to hbase:rsgroup and Zookeeper */
686    LOG.debug("Online mode, persisting to {} and ZK", RSGROUP_TABLE_NAME);
687    flushConfigTable(newGroupMap);
688
689    // Make changes visible after having been persisted to the source of truth
690    resetRSGroupMap(newGroupMap);
691    saveRSGroupMapToZK(newGroupMap);
692    updateCacheOfRSGroups(newGroupMap.keySet());
693    LOG.info("Flush config done, new RSGroup map: {}", newGroupMap);
694  }
695
696  private void saveRSGroupMapToZK(Map<String, RSGroupInfo> newGroupMap) throws IOException {
697    LOG.debug("Saving RSGroup info to ZK");
698    try {
699      String groupBasePath =
700        ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE);
701      ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
702
703      List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
704      for (String groupName : prevRSGroups) {
705        if (!newGroupMap.containsKey(groupName)) {
706          String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
707          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
708        }
709      }
710
711      for (RSGroupInfo gi : newGroupMap.values()) {
712        if (!gi.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
713          String znode = ZNodePaths.joinZNode(groupBasePath, gi.getName());
714          RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(gi);
715          LOG.debug("Updating znode: " + znode);
716          ZKUtil.createAndFailSilent(watcher, znode);
717          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
718          zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
719            ProtobufUtil.prependPBMagic(proto.toByteArray())));
720        }
721      }
722      LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
723
724      ZKUtil.multiOrSequential(watcher, zkOps, false);
725    } catch (KeeperException e) {
726      LOG.error("Failed to write to rsGroupZNode", e);
727      masterServices.abort("Failed to write to rsGroupZNode", e);
728      throw new IOException("Failed to write to rsGroupZNode", e);
729    }
730  }
731
732  /**
733   * Make changes visible. Caller must be synchronized on 'this'.
734   */
735  private void resetRSGroupMap(Map<String, RSGroupInfo> newRSGroupMap) {
736    this.holder = new RSGroupInfoHolder(newRSGroupMap);
737  }
738
739  /**
740   * Update cache of rsgroups. Caller must be synchronized on 'this'.
741   * @param currentGroups Current list of Groups.
742   */
743  private void updateCacheOfRSGroups(final Set<String> currentGroups) {
744    this.prevRSGroups.clear();
745    this.prevRSGroups.addAll(currentGroups);
746  }
747
748  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
749  private SortedSet<Address> getDefaultServers() {
750    return getDefaultServers(listRSGroups()/* get from rsGroupMap */);
751  }
752
753  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
754  private SortedSet<Address> getDefaultServers(List<RSGroupInfo> rsGroupInfoList) {
755    // Build a list of servers in other groups than default group, from rsGroupMap
756    Set<Address> serversInOtherGroup = new HashSet<>();
757    for (RSGroupInfo group : rsGroupInfoList) {
758      if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group
759        serversInOtherGroup.addAll(group.getServers());
760      }
761    }
762
763    // Get all online servers from Zookeeper and find out servers in default group
764    SortedSet<Address> defaultServers = Sets.newTreeSet();
765    for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) {
766      Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
767      if (!serversInOtherGroup.contains(server)) { // not in other groups
768        defaultServers.add(server);
769      }
770    }
771    return defaultServers;
772  }
773
774  private class RSGroupStartupWorker extends Thread {
775    private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
776    private volatile boolean online = false;
777
778    RSGroupStartupWorker() {
779      super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
780      setDaemon(true);
781    }
782
783    @Override
784    public void run() {
785      if (waitForGroupTableOnline()) {
786        LOG.info("GroupBasedLoadBalancer is now online");
787      } else {
788        LOG.warn("Quit without making region group table online");
789      }
790    }
791
792    private boolean waitForGroupTableOnline() {
793      while (isMasterRunning(masterServices)) {
794        try {
795          TableStateManager tsm = masterServices.getTableStateManager();
796          if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) {
797            createRSGroupTable();
798          }
799          // try reading from the table
800          FutureUtils.get(conn.getTable(RSGROUP_TABLE_NAME).get(new Get(ROW_KEY)));
801          LOG.info("RSGroup table={} is online, refreshing cached information", RSGROUP_TABLE_NAME);
802          RSGroupInfoManagerImpl.this.refresh(true);
803          online = true;
804          // flush any inconsistencies between ZK and HTable
805          RSGroupInfoManagerImpl.this.flushConfig();
806          // migrate after we are online.
807          migrate();
808          return true;
809        } catch (Exception e) {
810          LOG.warn("Failed to perform check", e);
811          // 100ms is short so let's just ignore the interrupt
812          Threads.sleepWithoutInterrupt(100);
813        }
814      }
815      return false;
816    }
817
818    private void createRSGroupTable() throws IOException {
819      OptionalLong optProcId = masterServices.getProcedures().stream()
820        .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p)
821        .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId)
822        .findFirst();
823      long procId;
824      if (optProcId.isPresent()) {
825        procId = optProcId.getAsLong();
826      } else {
827        LOG.debug("Creating group table {}", RSGROUP_TABLE_NAME);
828        procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
829      }
830      // wait for region to be online
831      int tries = 600;
832      while (
833        !(masterServices.getMasterProcedureExecutor().isFinished(procId))
834          && masterServices.getMasterProcedureExecutor().isRunning() && tries > 0
835      ) {
836        try {
837          Thread.sleep(100);
838        } catch (InterruptedException e) {
839          throw new IOException("Wait interrupted ", e);
840        }
841        tries--;
842      }
843      if (tries <= 0) {
844        throw new IOException("Failed to create group table in a given time.");
845      } else {
846        Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
847        if (result != null && result.isFailed()) {
848          throw new IOException(
849            "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result));
850        }
851      }
852    }
853
854    public boolean isOnline() {
855      return online;
856    }
857  }
858
859  private static boolean isMasterRunning(MasterServices masterServices) {
860    return !masterServices.isAborted() && !masterServices.isStopped();
861  }
862
863  private void multiMutate(List<Mutation> mutations) throws IOException {
864    MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
865    for (Mutation mutation : mutations) {
866      if (mutation instanceof Put) {
867        builder
868          .addMutationRequest(ProtobufUtil.toMutation(MutationProto.MutationType.PUT, mutation));
869      } else if (mutation instanceof Delete) {
870        builder
871          .addMutationRequest(ProtobufUtil.toMutation(MutationProto.MutationType.DELETE, mutation));
872      } else {
873        throw new DoNotRetryIOException(
874          "multiMutate doesn't support " + mutation.getClass().getName());
875      }
876    }
877    MutateRowsRequest request = builder.build();
878    AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME);
879    LOG.debug("Multimutating {} with {} mutations", RSGROUP_TABLE_NAME, mutations.size());
880    FutureUtils.get(table.<MultiRowMutationService, MutateRowsResponse> coprocessorService(
881      MultiRowMutationService::newStub,
882      (stub, controller, done) -> stub.mutateRows(controller, request, done), ROW_KEY));
883    LOG.info("Multimutating {} with {} mutations done", RSGROUP_TABLE_NAME, mutations.size());
884  }
885
886  private void checkGroupName(String groupName) throws ConstraintException {
887    if (!groupName.matches("[a-zA-Z0-9_]+")) {
888      throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
889    }
890  }
891
892  @Override
893  public RSGroupInfo getRSGroupForTable(TableName tableName) throws IOException {
894    return holder.tableName2Group.get(tableName);
895  }
896
897  /**
898   * Check if the set of servers are belong to dead servers list or online servers list.
899   * @param servers servers to remove
900   */
901  private void checkForDeadOrOnlineServers(Set<Address> servers) throws IOException {
902    // This ugliness is because we only have Address, not ServerName.
903    Set<Address> onlineServers = new HashSet<>();
904    List<ServerName> drainingServers = masterServices.getServerManager().getDrainingServersList();
905    for (ServerName server : masterServices.getServerManager().getOnlineServers().keySet()) {
906      // Only online but not decommissioned servers are really online
907      if (!drainingServers.contains(server)) {
908        onlineServers.add(server.getAddress());
909      }
910    }
911
912    Set<Address> deadServers = new HashSet<>();
913    for (ServerName server : masterServices.getServerManager().getDeadServers().copyServerNames()) {
914      deadServers.add(server.getAddress());
915    }
916
917    for (Address address : servers) {
918      if (onlineServers.contains(address)) {
919        throw new DoNotRetryIOException(
920          "Server " + address + " is an online server, not allowed to remove.");
921      }
922      if (deadServers.contains(address)) {
923        throw new DoNotRetryIOException("Server " + address + " is on the dead servers list,"
924          + " Maybe it will come back again, not allowed to remove.");
925      }
926    }
927  }
928
929  private void checkOnlineServersOnly(Set<Address> servers) throws IOException {
930    // This uglyness is because we only have Address, not ServerName.
931    // Online servers are keyed by ServerName.
932    Set<Address> onlineServers = new HashSet<>();
933    for (ServerName server : masterServices.getServerManager().getOnlineServers().keySet()) {
934      onlineServers.add(server.getAddress());
935    }
936    for (Address address : servers) {
937      if (!onlineServers.contains(address)) {
938        throw new DoNotRetryIOException(
939          "Server " + address + " is not an online server in 'default' RSGroup.");
940      }
941    }
942  }
943
944  /**
945   * @return List of Regions associated with this <code>server</code>.
946   */
947  private List<RegionInfo> getRegions(final Address server) {
948    LinkedList<RegionInfo> regions = new LinkedList<>();
949    for (Map.Entry<RegionInfo, ServerName> el : masterServices.getAssignmentManager()
950      .getRegionStates().getRegionAssignments().entrySet()) {
951      if (el.getValue() == null) {
952        continue;
953      }
954
955      if (el.getValue().getAddress().equals(server)) {
956        addRegion(regions, el.getKey());
957      }
958    }
959    for (RegionStateNode state : masterServices.getAssignmentManager().getRegionsInTransition()) {
960      if (
961        state.getRegionLocation() != null && state.getRegionLocation().getAddress().equals(server)
962      ) {
963        addRegion(regions, state.getRegionInfo());
964      }
965    }
966    return regions;
967  }
968
969  private void addRegion(final LinkedList<RegionInfo> regions, RegionInfo hri) {
970    // If meta, move it last otherwise other unassigns fail because meta is not
971    // online for them to update state in. This is dodgy. Needs to be made more
972    // robust. See TODO below.
973    if (hri.isMetaRegion()) {
974      regions.addLast(hri);
975    } else {
976      regions.addFirst(hri);
977    }
978  }
979
980  /**
981   * Move every region from servers which are currently located on these servers, but should not be
982   * located there.
983   * @param movedServers    the servers that are moved to new group
984   * @param srcGrpServers   all servers in the source group, excluding the movedServers
985   * @param targetGroupName the target group
986   * @param sourceGroupName the source group
987   * @throws IOException if moving the server and tables fail
988   */
989  private void moveServerRegionsFromGroup(Set<Address> movedServers, Set<Address> srcGrpServers,
990    String targetGroupName, String sourceGroupName) throws IOException {
991    moveRegionsBetweenGroups(movedServers, srcGrpServers, targetGroupName, sourceGroupName,
992      rs -> getRegions(rs), info -> {
993        try {
994          String groupName = RSGroupUtil.getRSGroupInfo(masterServices, this, info.getTable())
995            .map(RSGroupInfo::getName).orElse(RSGroupInfo.DEFAULT_GROUP);
996          return groupName.equals(targetGroupName);
997        } catch (IOException e) {
998          LOG.warn("Failed to test group for region {} and target group {}", info, targetGroupName);
999          return false;
1000        }
1001      });
1002  }
1003
1004  private <T> void moveRegionsBetweenGroups(Set<T> regionsOwners, Set<Address> newRegionsOwners,
1005    String targetGroupName, String sourceGroupName, Function<T, List<RegionInfo>> getRegionsInfo,
1006    Function<RegionInfo, Boolean> validation) throws IOException {
1007    // Get server names corresponding to given Addresses
1008    List<ServerName> movedServerNames = new ArrayList<>(regionsOwners.size());
1009    List<ServerName> srcGrpServerNames = new ArrayList<>(newRegionsOwners.size());
1010    for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) {
1011      // In case region move failed in previous attempt, regionsOwners and newRegionsOwners
1012      // can have the same servers. So for all servers below both conditions to be checked
1013      if (newRegionsOwners.contains(serverName.getAddress())) {
1014        srcGrpServerNames.add(serverName);
1015      }
1016      if (regionsOwners.contains(serverName.getAddress())) {
1017        movedServerNames.add(serverName);
1018      }
1019    }
1020    List<Pair<RegionInfo, Future<byte[]>>> assignmentFutures = new ArrayList<>();
1021    int retry = 0;
1022    Set<String> failedRegions = new HashSet<>();
1023    IOException toThrow = null;
1024    do {
1025      assignmentFutures.clear();
1026      failedRegions.clear();
1027      for (ServerName owner : movedServerNames) {
1028        // Get regions that are associated with this server and filter regions by group tables.
1029        for (RegionInfo region : getRegionsInfo.apply((T) owner.getAddress())) {
1030          if (!validation.apply(region)) {
1031            LOG.info("Moving region {}, which does not belong to RSGroup {}",
1032              region.getShortNameToLog(), targetGroupName);
1033            // Move region back to source RSGroup servers
1034            ServerName dest =
1035              masterServices.getLoadBalancer().randomAssignment(region, srcGrpServerNames);
1036            if (dest == null) {
1037              failedRegions.add(region.getRegionNameAsString());
1038              continue;
1039            }
1040            RegionPlan rp = new RegionPlan(region, owner, dest);
1041            try {
1042              Future<byte[]> future = masterServices.getAssignmentManager().moveAsync(rp);
1043              assignmentFutures.add(Pair.newPair(region, future));
1044            } catch (IOException ioe) {
1045              failedRegions.add(region.getRegionNameAsString());
1046              LOG.debug("Move region {} failed, will retry, current retry time is {}",
1047                region.getShortNameToLog(), retry, ioe);
1048              toThrow = ioe;
1049            }
1050          }
1051        }
1052      }
1053      waitForRegionMovement(assignmentFutures, failedRegions, sourceGroupName, retry);
1054      if (failedRegions.isEmpty()) {
1055        LOG.info("All regions from {} are moved back to {}", movedServerNames, sourceGroupName);
1056        return;
1057      } else {
1058        try {
1059          wait(1000);
1060        } catch (InterruptedException e) {
1061          LOG.warn("Sleep interrupted", e);
1062          Thread.currentThread().interrupt();
1063        }
1064        retry++;
1065      }
1066    } while (
1067      !failedRegions.isEmpty() && retry <= masterServices.getConfiguration()
1068        .getInt(FAILED_MOVE_MAX_RETRY, DEFAULT_MAX_RETRY_VALUE)
1069    );
1070
1071    // has up to max retry time or there are no more regions to move
1072    if (!failedRegions.isEmpty()) {
1073      // print failed moved regions, for later process conveniently
1074      String msg = String.format("move regions for group %s failed, failed regions: %s",
1075        sourceGroupName, failedRegions);
1076      LOG.error(msg);
1077      throw new DoNotRetryIOException(
1078        msg + ", just record the last failed region's cause, more details in server log", toThrow);
1079    }
1080  }
1081
1082  /**
1083   * Wait for all the region move to complete. Keep waiting for other region movement completion
1084   * even if some region movement fails.
1085   */
1086  private void waitForRegionMovement(List<Pair<RegionInfo, Future<byte[]>>> regionMoveFutures,
1087    Set<String> failedRegions, String sourceGroupName, int retryCount) {
1088    LOG.info("Moving {} region(s) to group {}, current retry={}", regionMoveFutures.size(),
1089      sourceGroupName, retryCount);
1090    for (Pair<RegionInfo, Future<byte[]>> pair : regionMoveFutures) {
1091      try {
1092        pair.getSecond().get();
1093        if (
1094          masterServices.getAssignmentManager().getRegionStates().getRegionState(pair.getFirst())
1095            .isFailedOpen()
1096        ) {
1097          failedRegions.add(pair.getFirst().getRegionNameAsString());
1098        }
1099      } catch (InterruptedException e) {
1100        // Dont return form there lets wait for other regions to complete movement.
1101        failedRegions.add(pair.getFirst().getRegionNameAsString());
1102        LOG.warn("Sleep interrupted", e);
1103      } catch (Exception e) {
1104        failedRegions.add(pair.getFirst().getRegionNameAsString());
1105        LOG.error("Move region {} to group {} failed, will retry on next attempt",
1106          pair.getFirst().getShortNameToLog(), sourceGroupName, e);
1107      }
1108    }
1109  }
1110
1111  private boolean isTableInGroup(TableName tableName, String groupName,
1112    Set<TableName> tablesInGroupCache) throws IOException {
1113    if (tablesInGroupCache.contains(tableName)) {
1114      return true;
1115    }
1116    if (
1117      RSGroupUtil.getRSGroupInfo(masterServices, this, tableName).map(RSGroupInfo::getName)
1118        .orElse(RSGroupInfo.DEFAULT_GROUP).equals(groupName)
1119    ) {
1120      tablesInGroupCache.add(tableName);
1121      return true;
1122    }
1123    return false;
1124  }
1125
1126  private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName)
1127    throws IOException {
1128    Map<String, RegionState> rit = Maps.newTreeMap();
1129    Set<TableName> tablesInGroupCache = new HashSet<>();
1130    for (RegionStateNode regionNode : masterServices.getAssignmentManager()
1131      .getRegionsInTransition()) {
1132      TableName tn = regionNode.getTable();
1133      if (isTableInGroup(tn, groupName, tablesInGroupCache)) {
1134        rit.put(regionNode.getRegionInfo().getEncodedName(), regionNode.toRegionState());
1135      }
1136    }
1137    return rit;
1138  }
1139
1140  /**
1141   * This is an EXPENSIVE clone. Cloning though is the safest thing to do. Can't let out original
1142   * since it can change and at least the load balancer wants to iterate this exported list. Load
1143   * balancer should iterate over this list because cloned list will ignore disabled table and split
1144   * parent region cases. This method is invoked by {@link #balanceRSGroup}
1145   * @return A clone of current assignments for this group.
1146   */
1147  Map<TableName, Map<ServerName, List<RegionInfo>>> getRSGroupAssignmentsByTable(
1148    TableStateManager tableStateManager, String groupName) throws IOException {
1149    Map<TableName, Map<ServerName, List<RegionInfo>>> result = Maps.newHashMap();
1150    Set<TableName> tablesInGroupCache = new HashSet<>();
1151    for (Map.Entry<RegionInfo, ServerName> entry : masterServices.getAssignmentManager()
1152      .getRegionStates().getRegionAssignments().entrySet()) {
1153      RegionInfo region = entry.getKey();
1154      TableName tn = region.getTable();
1155      ServerName server = entry.getValue();
1156      if (isTableInGroup(tn, groupName, tablesInGroupCache)) {
1157        if (
1158          tableStateManager.isTableState(tn, TableState.State.DISABLED, TableState.State.DISABLING)
1159        ) {
1160          continue;
1161        }
1162        if (region.isSplitParent()) {
1163          continue;
1164        }
1165        result.computeIfAbsent(tn, k -> new HashMap<>())
1166          .computeIfAbsent(server, k -> new ArrayList<>()).add(region);
1167      }
1168    }
1169    RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName);
1170    for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) {
1171      if (rsGroupInfo.containsServer(serverName.getAddress())) {
1172        for (Map<ServerName, List<RegionInfo>> map : result.values()) {
1173          map.computeIfAbsent(serverName, k -> Collections.emptyList());
1174        }
1175      }
1176    }
1177    return result;
1178  }
1179
1180  @Override
1181  public BalanceResponse balanceRSGroup(String groupName, BalanceRequest request)
1182    throws IOException {
1183    ServerManager serverManager = masterServices.getServerManager();
1184    LoadBalancer balancer = masterServices.getLoadBalancer();
1185    getRSGroupInfo(groupName);
1186
1187    BalanceResponse.Builder responseBuilder = BalanceResponse.newBuilder();
1188
1189    synchronized (balancer) {
1190      // If balance not true, don't run balancer.
1191      if (!masterServices.isBalancerOn() && !request.isDryRun()) {
1192        return responseBuilder.build();
1193      }
1194
1195      // Only allow one balance run at at time.
1196      Map<String, RegionState> groupRIT = rsGroupGetRegionsInTransition(groupName);
1197      if (groupRIT.size() > 0 && !request.isIgnoreRegionsInTransition()) {
1198        LOG.debug("Not running balancer because {} region(s) in transition: {}", groupRIT.size(),
1199          StringUtils.abbreviate(masterServices.getAssignmentManager().getRegionStates()
1200            .getRegionsInTransition().toString(), 256));
1201        return responseBuilder.build();
1202      }
1203
1204      if (serverManager.areDeadServersInProgress()) {
1205        LOG.debug("Not running balancer because processing dead regionserver(s): {}",
1206          serverManager.getDeadServers());
1207        return responseBuilder.build();
1208      }
1209
1210      // We balance per group instead of per table
1211      Map<TableName, Map<ServerName, List<RegionInfo>>> assignmentsByTable =
1212        getRSGroupAssignmentsByTable(masterServices.getTableStateManager(), groupName);
1213      List<RegionPlan> plans = balancer.balanceCluster(assignmentsByTable);
1214      boolean balancerRan = !plans.isEmpty();
1215
1216      responseBuilder.setBalancerRan(balancerRan).setMovesCalculated(plans.size());
1217
1218      if (balancerRan && !request.isDryRun()) {
1219        LOG.info("RSGroup balance {} starting with plan count: {}", groupName, plans.size());
1220        List<RegionPlan> executed = masterServices.executeRegionPlansWithThrottling(plans);
1221        responseBuilder.setMovesExecuted(executed.size());
1222        LOG.info("RSGroup balance " + groupName + " completed");
1223      }
1224
1225      return responseBuilder.build();
1226    }
1227  }
1228
1229  private void moveTablesAndWait(Set<TableName> tables, String targetGroup) throws IOException {
1230    LOG.debug("Moving {} tables to target group {}", tables.size(), targetGroup);
1231    List<Long> procIds = new ArrayList<Long>();
1232    for (TableName tableName : tables) {
1233      TableDescriptor oldTd = masterServices.getTableDescriptors().get(tableName);
1234      if (oldTd == null) {
1235        continue;
1236      }
1237      TableDescriptor newTd =
1238        TableDescriptorBuilder.newBuilder(oldTd).setRegionServerGroup(targetGroup).build();
1239      procIds.add(
1240        masterServices.modifyTable(tableName, newTd, HConstants.NO_NONCE, HConstants.NO_NONCE));
1241    }
1242    for (long procId : procIds) {
1243      Procedure<?> proc = masterServices.getMasterProcedureExecutor().getProcedure(procId);
1244      if (proc == null) {
1245        continue;
1246      }
1247      ProcedureSyncWait.waitForProcedureToCompleteIOE(masterServices.getMasterProcedureExecutor(),
1248        proc, Long.MAX_VALUE);
1249    }
1250    LOG.info("Move tables done: moved {} tables to {}", tables.size(), targetGroup);
1251    if (LOG.isDebugEnabled()) {
1252      LOG.debug("Tables moved to {}: {}", targetGroup, tables);
1253    }
1254  }
1255
1256  @Override
1257  public void setRSGroup(Set<TableName> tables, String groupName) throws IOException {
1258    getRSGroupInfo(groupName);
1259    moveTablesAndWait(tables, groupName);
1260  }
1261
1262  public void moveServers(Set<Address> servers, String targetGroupName) throws IOException {
1263    if (servers == null) {
1264      throw new ConstraintException("The list of servers to move cannot be null.");
1265    }
1266    if (servers.isEmpty()) {
1267      // For some reason this difference between null servers and isEmpty is important distinction.
1268      // TODO. Why? Stuff breaks if I equate them.
1269      return;
1270    }
1271    if (StringUtils.isEmpty(targetGroupName)) {
1272      throw new ConstraintException("RSGroup cannot be null.");
1273    }
1274
1275    // Hold a lock on the manager instance while moving servers to prevent
1276    // another writer changing our state while we are working.
1277    synchronized (this) {
1278      // Presume first server's source group. Later ensure all servers are from this group.
1279      Address firstServer = servers.iterator().next();
1280      RSGroupInfo srcGrp = getRSGroupOfServer(firstServer);
1281      if (srcGrp == null) {
1282        // Be careful. This exception message is tested for in TestRSGroupAdmin2...
1283        throw new ConstraintException(
1284          "Server " + firstServer + " is either offline or it does not exist.");
1285      }
1286
1287      // Only move online servers (when moving from 'default') or servers from other
1288      // groups. This prevents bogus servers from entering groups
1289      if (RSGroupInfo.DEFAULT_GROUP.equals(srcGrp.getName())) {
1290        if (srcGrp.getServers().size() <= servers.size()) {
1291          throw new ConstraintException(KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE);
1292        }
1293        checkOnlineServersOnly(servers);
1294      }
1295      // Ensure all servers are of same rsgroup.
1296      for (Address server : servers) {
1297        String tmpGroup = getRSGroupOfServer(server).getName();
1298        if (!tmpGroup.equals(srcGrp.getName())) {
1299          throw new ConstraintException("Move server request should only come from one source "
1300            + "RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup);
1301        }
1302      }
1303      if (srcGrp.getServers().size() <= servers.size()) {
1304        // check if there are still tables reference this group
1305        for (TableDescriptor td : masterServices.getTableDescriptors().getAll().values()) {
1306          Optional<String> optGroupName = td.getRegionServerGroup();
1307          if (optGroupName.isPresent() && optGroupName.get().equals(srcGrp.getName())) {
1308            throw new ConstraintException(
1309              "Cannot leave a RSGroup " + srcGrp.getName() + " that contains tables('"
1310                + td.getTableName() + "' at least) without servers to host them.");
1311          }
1312        }
1313      }
1314
1315      // MovedServers may be < passed in 'servers'.
1316      Set<Address> movedServers = moveServers(servers, srcGrp.getName(), targetGroupName);
1317      moveServerRegionsFromGroup(movedServers, srcGrp.getServers(), targetGroupName,
1318        srcGrp.getName());
1319      LOG.info("Move servers done: moved {} servers from {} to {}", movedServers.size(),
1320        srcGrp.getName(), targetGroupName);
1321      if (LOG.isDebugEnabled()) {
1322        LOG.debug("Servers moved from {} to {}: {}", srcGrp.getName(), targetGroupName,
1323          movedServers);
1324      }
1325    }
1326  }
1327
1328  @Override
1329  public String determineRSGroupInfoForTable(TableName tableName) {
1330    return script.getRSGroup(tableName.getNamespaceAsString(), tableName.getQualifierAsString());
1331  }
1332
1333  @Override
1334  public synchronized void renameRSGroup(String oldName, String newName) throws IOException {
1335    if (oldName.equals(RSGroupInfo.DEFAULT_GROUP)) {
1336      throw new ConstraintException(RSGroupInfo.DEFAULT_GROUP + " can't be rename");
1337    }
1338    checkGroupName(newName);
1339    // getRSGroupInfo validates old RSGroup existence.
1340    RSGroupInfo oldRSG = getRSGroupInfo(oldName);
1341    Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
1342    if (rsGroupMap.containsKey(newName)) {
1343      throw new ConstraintException("Group already exists: " + newName);
1344    }
1345
1346    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
1347    newGroupMap.remove(oldRSG.getName());
1348    RSGroupInfo newRSG = new RSGroupInfo(newName, oldRSG.getServers());
1349    newGroupMap.put(newName, newRSG);
1350    flushConfig(newGroupMap);
1351    Set<TableName> updateTables = masterServices.getTableDescriptors().getAll().values().stream()
1352      .filter(t -> oldName.equals(t.getRegionServerGroup().orElse(null)))
1353      .map(TableDescriptor::getTableName).collect(Collectors.toSet());
1354    setRSGroup(updateTables, newName);
1355    LOG.info("Rename RSGroup done: {} => {}", oldName, newName);
1356  }
1357
1358  @Override
1359  public synchronized void updateRSGroupConfig(String groupName, Map<String, String> configuration)
1360    throws IOException {
1361    if (RSGroupInfo.DEFAULT_GROUP.equals(groupName)) {
1362      // We do not persist anything of default group, therefore, it is not supported to update
1363      // default group's configuration which lost once master down.
1364      throw new ConstraintException(
1365        "configuration of " + RSGroupInfo.DEFAULT_GROUP + " can't be stored persistently");
1366    }
1367    RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName);
1368    rsGroupInfo.getConfiguration().forEach((k, v) -> rsGroupInfo.removeConfiguration(k));
1369    configuration.forEach((k, v) -> rsGroupInfo.setConfiguration(k, v));
1370    flushConfig();
1371  }
1372}