001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rsgroup; 019 020import java.io.ByteArrayInputStream; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.OptionalLong; 032import java.util.Set; 033import java.util.SortedSet; 034import java.util.TreeSet; 035import java.util.concurrent.Future; 036import java.util.function.Function; 037import java.util.stream.Collectors; 038import org.apache.commons.lang3.StringUtils; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Coprocessor; 041import org.apache.hadoop.hbase.DoNotRetryIOException; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.NamespaceDescriptor; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.TableDescriptors; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.AsyncClusterConnection; 048import org.apache.hadoop.hbase.client.AsyncTable; 049import org.apache.hadoop.hbase.client.BalanceRequest; 050import org.apache.hadoop.hbase.client.BalanceResponse; 051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 052import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; 053import org.apache.hadoop.hbase.client.Delete; 054import org.apache.hadoop.hbase.client.Get; 055import org.apache.hadoop.hbase.client.Mutation; 056import org.apache.hadoop.hbase.client.Put; 057import org.apache.hadoop.hbase.client.RegionInfo; 058import org.apache.hadoop.hbase.client.Result; 059import org.apache.hadoop.hbase.client.ResultScanner; 060import org.apache.hadoop.hbase.client.TableDescriptor; 061import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 062import org.apache.hadoop.hbase.client.TableState; 063import org.apache.hadoop.hbase.constraint.ConstraintException; 064import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 065import org.apache.hadoop.hbase.exceptions.DeserializationException; 066import org.apache.hadoop.hbase.master.LoadBalancer; 067import org.apache.hadoop.hbase.master.MasterServices; 068import org.apache.hadoop.hbase.master.RegionPlan; 069import org.apache.hadoop.hbase.master.RegionState; 070import org.apache.hadoop.hbase.master.ServerListener; 071import org.apache.hadoop.hbase.master.ServerManager; 072import org.apache.hadoop.hbase.master.TableStateManager; 073import org.apache.hadoop.hbase.master.assignment.RegionStateNode; 074import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; 075import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 076import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 077import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 078import org.apache.hadoop.hbase.net.Address; 079import org.apache.hadoop.hbase.procedure2.Procedure; 080import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 081import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 082import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 083import org.apache.hadoop.hbase.util.Bytes; 084import org.apache.hadoop.hbase.util.FutureUtils; 085import org.apache.hadoop.hbase.util.Pair; 086import org.apache.hadoop.hbase.util.Threads; 087import org.apache.hadoop.hbase.zookeeper.ZKUtil; 088import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 089import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 090import org.apache.hadoop.util.Shell; 091import org.apache.yetus.audience.InterfaceAudience; 092import org.apache.zookeeper.KeeperException; 093import org.slf4j.Logger; 094import org.slf4j.LoggerFactory; 095 096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 097import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 098import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 099import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 100 101import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupProtos; 107 108/** 109 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the 110 * persistence store for the group information. It also makes use of zookeeper to store group 111 * information needed for bootstrapping during offline mode. 112 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached 113 * RSGroupInfo Map at {@link RSGroupInfoHolder#groupName2Group}. These Maps are persisted to the 114 * hbase:rsgroup table (and cached in zk) on each modification. 115 * <p/> 116 * Mutations on state are synchronized but reads can continue without having to wait on an instance 117 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of 118 * state are read-only, just-in-case (see flushConfig). 119 * <p/> 120 * Reads must not block else there is a danger we'll deadlock. 121 * <p/> 122 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act 123 * on the results of the query modifying cache in zookeeper without another thread making 124 * intermediate modifications. These clients synchronize on the 'this' instance so no other has 125 * access concurrently. Reads must be able to continue concurrently. 126 */ 127@InterfaceAudience.Private 128final class RSGroupInfoManagerImpl implements RSGroupInfoManager { 129 private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class); 130 131 // Assigned before user tables 132 static final TableName RSGROUP_TABLE_NAME = 133 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup"); 134 135 static final String KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE = 136 "should keep at least " + "one server in 'default' RSGroup."; 137 138 /** Define the config key of retries threshold when movements failed */ 139 static final String FAILED_MOVE_MAX_RETRY = "hbase.rsgroup.move.max.retry"; 140 141 /** Define the default number of retries */ 142 static final int DEFAULT_MAX_RETRY_VALUE = 50; 143 144 private static final String RS_GROUP_ZNODE = "rsgroup"; 145 146 static final byte[] META_FAMILY_BYTES = Bytes.toBytes("m"); 147 148 static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i"); 149 150 static final String MIGRATE_THREAD_NAME = "Migrate-RSGroup-Tables"; 151 152 private static final byte[] ROW_KEY = { 0 }; 153 154 /** Table descriptor for <code>hbase:rsgroup</code> catalog table */ 155 private static final TableDescriptor RSGROUP_TABLE_DESC; 156 static { 157 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME) 158 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES)) 159 .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName()); 160 try { 161 builder.setCoprocessor( 162 CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName()) 163 .setPriority(Coprocessor.PRIORITY_SYSTEM).build()); 164 } catch (IOException ex) { 165 throw new Error(ex); 166 } 167 RSGROUP_TABLE_DESC = builder.build(); 168 } 169 170 // There two Maps are immutable and wholesale replaced on each modification 171 // so are safe to access concurrently. See class comment. 172 private static final class RSGroupInfoHolder { 173 final ImmutableMap<String, RSGroupInfo> groupName2Group; 174 final ImmutableMap<TableName, RSGroupInfo> tableName2Group; 175 176 RSGroupInfoHolder() { 177 this(Collections.emptyMap()); 178 } 179 180 RSGroupInfoHolder(Map<String, RSGroupInfo> rsGroupMap) { 181 ImmutableMap.Builder<String, RSGroupInfo> group2Name2GroupBuilder = ImmutableMap.builder(); 182 ImmutableMap.Builder<TableName, RSGroupInfo> tableName2GroupBuilder = ImmutableMap.builder(); 183 rsGroupMap.forEach((groupName, rsGroupInfo) -> { 184 group2Name2GroupBuilder.put(groupName, rsGroupInfo); 185 if (!groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { 186 rsGroupInfo.getTables() 187 .forEach(tableName -> tableName2GroupBuilder.put(tableName, rsGroupInfo)); 188 } 189 }); 190 this.groupName2Group = group2Name2GroupBuilder.build(); 191 this.tableName2Group = tableName2GroupBuilder.build(); 192 } 193 } 194 195 private volatile RSGroupInfoHolder holder = new RSGroupInfoHolder(); 196 197 private final MasterServices masterServices; 198 private final AsyncClusterConnection conn; 199 private final ZKWatcher watcher; 200 private final RSGroupStartupWorker rsGroupStartupWorker; 201 // contains list of groups that were last flushed to persistent store 202 private Set<String> prevRSGroups = new HashSet<>(); 203 204 // Package visibility for testing 205 static class RSGroupMappingScript { 206 static final String RS_GROUP_MAPPING_SCRIPT = "hbase.rsgroup.table.mapping.script"; 207 static final String RS_GROUP_MAPPING_SCRIPT_TIMEOUT = 208 "hbase.rsgroup.table.mapping.script.timeout"; 209 210 private final String script; 211 private final long scriptTimeout; 212 213 RSGroupMappingScript(Configuration conf) { 214 script = conf.get(RS_GROUP_MAPPING_SCRIPT); 215 scriptTimeout = conf.getLong(RS_GROUP_MAPPING_SCRIPT_TIMEOUT, 5000); // 5 seconds 216 } 217 218 String getRSGroup(String namespace, String tablename) { 219 if (script == null || script.isEmpty()) { 220 return null; 221 } 222 Shell.ShellCommandExecutor rsgroupMappingScript = 223 new Shell.ShellCommandExecutor(new String[] { script, "", "" }, null, null, scriptTimeout); 224 225 String[] exec = rsgroupMappingScript.getExecString(); 226 exec[1] = namespace; 227 exec[2] = tablename; 228 try { 229 rsgroupMappingScript.execute(); 230 } catch (IOException e) { 231 // This exception may happen, like process doesn't have permission to run this script. 232 LOG.error("{}, placing {} back to default rsgroup", e.getMessage(), 233 TableName.valueOf(namespace, tablename)); 234 return RSGroupInfo.DEFAULT_GROUP; 235 } 236 return rsgroupMappingScript.getOutput().trim(); 237 } 238 } 239 240 private RSGroupMappingScript script; 241 242 private RSGroupInfoManagerImpl(MasterServices masterServices) { 243 this.masterServices = masterServices; 244 this.watcher = masterServices.getZooKeeper(); 245 this.conn = masterServices.getAsyncClusterConnection(); 246 this.rsGroupStartupWorker = new RSGroupStartupWorker(); 247 this.script = new RSGroupMappingScript(masterServices.getConfiguration()); 248 } 249 250 private synchronized void updateDefaultServers() { 251 LOG.info("Updating default servers."); 252 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group); 253 RSGroupInfo oldDefaultGroupInfo = getRSGroup(RSGroupInfo.DEFAULT_GROUP); 254 assert oldDefaultGroupInfo != null; 255 RSGroupInfo newDefaultGroupInfo = 256 new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers()); 257 newDefaultGroupInfo.addAllTables(oldDefaultGroupInfo.getTables()); 258 newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroupInfo); 259 // do not need to persist, as we do not persist default group. 260 resetRSGroupMap(newGroupMap); 261 LOG.info("Updated default servers, {} servers", newDefaultGroupInfo.getServers().size()); 262 if (LOG.isDebugEnabled()) { 263 LOG.debug("New default servers list: {}", newDefaultGroupInfo.getServers()); 264 } 265 } 266 267 private synchronized void init() throws IOException { 268 refresh(false); 269 masterServices.getServerManager().registerListener(new ServerListener() { 270 271 @Override 272 public void serverAdded(ServerName serverName) { 273 updateDefaultServers(); 274 } 275 276 @Override 277 public void serverRemoved(ServerName serverName) { 278 updateDefaultServers(); 279 } 280 }); 281 } 282 283 static RSGroupInfoManager getInstance(MasterServices masterServices) throws IOException { 284 RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(masterServices); 285 instance.init(); 286 return instance; 287 } 288 289 public void start() { 290 // create system table of rsgroup 291 rsGroupStartupWorker.start(); 292 } 293 294 @Override 295 public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException { 296 checkGroupName(rsGroupInfo.getName()); 297 Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group; 298 if ( 299 rsGroupMap.get(rsGroupInfo.getName()) != null 300 || rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP) 301 ) { 302 throw new ConstraintException("Group already exists: " + rsGroupInfo.getName()); 303 } 304 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 305 newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo); 306 flushConfig(newGroupMap); 307 LOG.info("Add group {} done.", rsGroupInfo.getName()); 308 } 309 310 private RSGroupInfo getRSGroupInfo(final String groupName) throws ConstraintException { 311 RSGroupInfo rsGroupInfo = holder.groupName2Group.get(groupName); 312 if (rsGroupInfo == null) { 313 throw new ConstraintException("RSGroup " + groupName + " does not exist"); 314 } 315 return rsGroupInfo; 316 } 317 318 /** Returns Set of online Servers named for their hostname and port (not ServerName). */ 319 private Set<Address> getOnlineServers() { 320 return masterServices.getServerManager().getOnlineServers().keySet().stream() 321 .map(ServerName::getAddress).collect(Collectors.toSet()); 322 } 323 324 public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup, 325 String dstGroup) throws IOException { 326 RSGroupInfo src = getRSGroupInfo(srcGroup); 327 RSGroupInfo dst = getRSGroupInfo(dstGroup); 328 Set<Address> movedServers = new HashSet<>(); 329 // If destination is 'default' rsgroup, only add servers that are online. If not online, drop 330 // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a 331 // rsgroup of dead servers that are to come back later). 332 Set<Address> onlineServers = 333 dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers() : null; 334 for (Address el : servers) { 335 src.removeServer(el); 336 if (onlineServers != null) { 337 if (!onlineServers.contains(el)) { 338 if (LOG.isDebugEnabled()) { 339 LOG.debug("Dropping " + el + " during move-to-default RSGroup because not online"); 340 } 341 continue; 342 } 343 } 344 dst.addServer(el); 345 movedServers.add(el); 346 } 347 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group); 348 newGroupMap.put(src.getName(), src); 349 newGroupMap.put(dst.getName(), dst); 350 flushConfig(newGroupMap); 351 return movedServers; 352 } 353 354 @Override 355 public RSGroupInfo getRSGroupOfServer(Address serverHostPort) { 356 for (RSGroupInfo info : holder.groupName2Group.values()) { 357 if (info.containsServer(serverHostPort)) { 358 return info; 359 } 360 } 361 return null; 362 } 363 364 @Override 365 public RSGroupInfo getRSGroup(String groupName) { 366 return holder.groupName2Group.get(groupName); 367 } 368 369 @Override 370 public synchronized void removeRSGroup(String groupName) throws IOException { 371 RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName); 372 int serverCount = rsGroupInfo.getServers().size(); 373 if (serverCount > 0) { 374 throw new ConstraintException("RSGroup " + groupName + " has " + serverCount 375 + " servers; you must remove these servers from the RSGroup before" 376 + " the RSGroup can be removed."); 377 } 378 for (TableDescriptor td : masterServices.getTableDescriptors().getAll().values()) { 379 if (td.getRegionServerGroup().map(groupName::equals).orElse(false)) { 380 throw new ConstraintException("RSGroup " + groupName + " is already referenced by " 381 + td.getTableName() + "; you must remove all the tables from the RSGroup before " 382 + "the RSGroup can be removed."); 383 } 384 } 385 for (NamespaceDescriptor ns : masterServices.getClusterSchema().getNamespaces()) { 386 String nsGroup = ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); 387 if (nsGroup != null && nsGroup.equals(groupName)) { 388 throw new ConstraintException( 389 "RSGroup " + groupName + " is referenced by namespace: " + ns.getName()); 390 } 391 } 392 Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group; 393 if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { 394 throw new ConstraintException( 395 "Group " + groupName + " does not exist or is a reserved " + "group"); 396 } 397 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 398 newGroupMap.remove(groupName); 399 flushConfig(newGroupMap); 400 LOG.info("Remove group {} done", groupName); 401 } 402 403 @Override 404 public List<RSGroupInfo> listRSGroups() { 405 return Lists.newArrayList(holder.groupName2Group.values()); 406 } 407 408 @Override 409 public boolean isOnline() { 410 return rsGroupStartupWorker.isOnline(); 411 } 412 413 @Override 414 public synchronized void removeServers(Set<Address> servers) throws IOException { 415 if (servers == null || servers.isEmpty()) { 416 throw new ConstraintException("The set of servers to remove cannot be null or empty."); 417 } 418 419 // check the set of servers 420 checkForDeadOrOnlineServers(servers); 421 422 Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>(); 423 for (Address el : servers) { 424 RSGroupInfo rsGroupInfo = getRSGroupOfServer(el); 425 if (rsGroupInfo != null) { 426 RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName()); 427 if (newRsGroupInfo == null) { 428 rsGroupInfo.removeServer(el); 429 rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo); 430 } else { 431 newRsGroupInfo.removeServer(el); 432 rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo); 433 } 434 } else { 435 LOG.warn("Server " + el + " does not belong to any rsgroup."); 436 } 437 } 438 439 if (rsGroupInfos.size() > 0) { 440 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group); 441 newGroupMap.putAll(rsGroupInfos); 442 flushConfig(newGroupMap); 443 } 444 LOG.info("Remove decommissioned servers {} from RSGroup done", servers); 445 } 446 447 private List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException { 448 List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList(); 449 AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME); 450 try (ResultScanner scanner = table.getScanner(META_FAMILY_BYTES, META_QUALIFIER_BYTES)) { 451 for (Result result;;) { 452 result = scanner.next(); 453 if (result == null) { 454 break; 455 } 456 RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo 457 .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES)); 458 rsGroupInfoList.add(ProtobufUtil.toGroupInfo(proto)); 459 } 460 } 461 return rsGroupInfoList; 462 } 463 464 private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException { 465 String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE); 466 List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList(); 467 // Overwrite any info stored by table, this takes precedence 468 try { 469 if (ZKUtil.checkExists(watcher, groupBasePath) != -1) { 470 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath); 471 if (children == null) { 472 return RSGroupInfoList; 473 } 474 for (String znode : children) { 475 byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode)); 476 if (data != null && data.length > 0) { 477 ProtobufUtil.expectPBMagicPrefix(data); 478 ByteArrayInputStream bis = 479 new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length); 480 RSGroupInfoList.add(ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); 481 } 482 } 483 LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size()); 484 } 485 } catch (KeeperException | DeserializationException | InterruptedException e) { 486 throw new IOException("Failed to read rsGroupZNode", e); 487 } 488 return RSGroupInfoList; 489 } 490 491 private void migrate(Collection<RSGroupInfo> groupList) { 492 TableDescriptors tds = masterServices.getTableDescriptors(); 493 ProcedureExecutor<MasterProcedureEnv> procExec = masterServices.getMasterProcedureExecutor(); 494 for (RSGroupInfo groupInfo : groupList) { 495 if (groupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 496 continue; 497 } 498 SortedSet<TableName> failedTables = new TreeSet<>(); 499 List<MigrateRSGroupProcedure> procs = new ArrayList<>(); 500 for (TableName tableName : groupInfo.getTables()) { 501 LOG.debug("Migrating {} in group {}", tableName, groupInfo.getName()); 502 TableDescriptor oldTd; 503 try { 504 oldTd = tds.get(tableName); 505 } catch (IOException e) { 506 LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e); 507 failedTables.add(tableName); 508 continue; 509 } 510 if (oldTd == null) { 511 continue; 512 } 513 if (oldTd.getRegionServerGroup().isPresent()) { 514 // either we have already migrated it or that user has set the rs group using the new 515 // code which will set the group directly on table descriptor, skip. 516 LOG.debug("Skip migrating {} since it is already in group {}", tableName, 517 oldTd.getRegionServerGroup().get()); 518 continue; 519 } 520 // This is a bit tricky. Since we know that the region server group config in 521 // TableDescriptor will only be used at master side, it is fine to just update the table 522 // descriptor on file system and also the cache, without reopening all the regions. This 523 // will be much faster than the normal modifyTable. And when upgrading, we will update 524 // master first and then region server, so after all the region servers has been reopened, 525 // the new TableDescriptor will be loaded. 526 MigrateRSGroupProcedure proc = 527 new MigrateRSGroupProcedure(procExec.getEnvironment(), tableName); 528 procExec.submitProcedure(proc); 529 procs.add(proc); 530 } 531 for (MigrateRSGroupProcedure proc : procs) { 532 try { 533 ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60000); 534 } catch (IOException e) { 535 LOG.warn("Failed to migrate rs group {} for table {}", groupInfo.getName(), 536 proc.getTableName()); 537 failedTables.add(proc.getTableName()); 538 } 539 } 540 LOG.debug("Done migrating {}, failed tables {}", groupInfo.getName(), failedTables); 541 synchronized (RSGroupInfoManagerImpl.this) { 542 Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group; 543 RSGroupInfo currentInfo = rsGroupMap.get(groupInfo.getName()); 544 if (currentInfo != null) { 545 RSGroupInfo newInfo = 546 new RSGroupInfo(currentInfo.getName(), currentInfo.getServers(), failedTables); 547 Map<String, RSGroupInfo> newGroupMap = new HashMap<>(rsGroupMap); 548 newGroupMap.put(groupInfo.getName(), newInfo); 549 try { 550 flushConfig(newGroupMap); 551 } catch (IOException e) { 552 LOG.warn("Failed to persist rs group {}", newInfo.getName(), e); 553 } 554 } 555 } 556 } 557 } 558 559 // Migrate the table rs group info from RSGroupInfo into the table descriptor 560 // Notice that we do not want to block the initialize so this will be done in background, and 561 // during the migrating, the rs group info maybe incomplete and cause region to be misplaced. 562 private void migrate() { 563 Thread migrateThread = new Thread(MIGRATE_THREAD_NAME) { 564 565 @Override 566 public void run() { 567 LOG.info("Start migrating table rs group config"); 568 while (!masterServices.isStopped()) { 569 Collection<RSGroupInfo> groups = holder.groupName2Group.values(); 570 boolean hasTables = groups.stream().anyMatch(r -> !r.getTables().isEmpty()); 571 if (!hasTables) { 572 break; 573 } 574 migrate(groups); 575 } 576 LOG.info("Done migrating table rs group info"); 577 } 578 }; 579 migrateThread.setDaemon(true); 580 migrateThread.start(); 581 } 582 583 /** 584 * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on 585 * startup of the manager. 586 */ 587 private synchronized void refresh(boolean forceOnline) throws IOException { 588 List<RSGroupInfo> groupList = new ArrayList<>(); 589 590 // Overwrite anything read from zk, group table is source of truth 591 // if online read from GROUP table 592 if (forceOnline || isOnline()) { 593 LOG.debug("Refreshing in Online mode."); 594 groupList.addAll(retrieveGroupListFromGroupTable()); 595 } else { 596 LOG.debug("Refreshing in Offline mode."); 597 groupList.addAll(retrieveGroupListFromZookeeper()); 598 } 599 600 // This is added to the last of the list so it overwrites the 'default' rsgroup loaded 601 // from region group table or zk 602 groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(groupList))); 603 604 // populate the data 605 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(); 606 for (RSGroupInfo group : groupList) { 607 newGroupMap.put(group.getName(), group); 608 } 609 resetRSGroupMap(newGroupMap); 610 updateCacheOfRSGroups(newGroupMap.keySet()); 611 } 612 613 private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOException { 614 List<Mutation> mutations = Lists.newArrayList(); 615 616 // populate deletes 617 for (String groupName : prevRSGroups) { 618 if (!groupMap.containsKey(groupName)) { 619 Delete d = new Delete(Bytes.toBytes(groupName)); 620 mutations.add(d); 621 } 622 } 623 624 // populate puts 625 for (RSGroupInfo gi : groupMap.values()) { 626 if (!gi.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 627 RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(gi); 628 Put p = new Put(Bytes.toBytes(gi.getName())); 629 p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray()); 630 mutations.add(p); 631 } 632 } 633 634 if (mutations.size() > 0) { 635 multiMutate(mutations); 636 } 637 } 638 639 private synchronized void flushConfig() throws IOException { 640 flushConfig(holder.groupName2Group); 641 } 642 643 private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException { 644 // For offline mode persistence is still unavailable 645 // We're refreshing in-memory state but only for servers in default group 646 if (!isOnline()) { 647 if (newGroupMap == holder.groupName2Group) { 648 // When newGroupMap is this.rsGroupMap itself, 649 // do not need to check default group and other groups as followed 650 return; 651 } 652 653 LOG.debug("Offline mode, cannot persist to {}", RSGROUP_TABLE_NAME); 654 655 Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(holder.groupName2Group); 656 RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 657 RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 658 if ( 659 !oldGroupMap.equals(newGroupMap) 660 /* compare both tables and servers in other groups */ || !oldDefaultGroup.getTables() 661 .equals(newDefaultGroup.getTables()) 662 /* compare tables in default group */ 663 ) { 664 throw new IOException("Only servers in default group can be updated during offline mode"); 665 } 666 667 // Restore newGroupMap by putting its default group back 668 newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup); 669 670 // Refresh rsGroupMap 671 // according to the inputted newGroupMap (an updated copy of rsGroupMap) 672 this.holder = new RSGroupInfoHolder(newGroupMap); 673 674 LOG.debug("New RSGroup map: {}", newGroupMap); 675 676 // Do not need to update tableMap 677 // because only the update on servers in default group is allowed above, 678 // or IOException will be thrown 679 return; 680 } 681 682 /* For online mode, persist to hbase:rsgroup and Zookeeper */ 683 LOG.debug("Online mode, persisting to {} and ZK", RSGROUP_TABLE_NAME); 684 flushConfigTable(newGroupMap); 685 686 // Make changes visible after having been persisted to the source of truth 687 resetRSGroupMap(newGroupMap); 688 saveRSGroupMapToZK(newGroupMap); 689 updateCacheOfRSGroups(newGroupMap.keySet()); 690 LOG.info("Flush config done, new RSGroup map: {}", newGroupMap); 691 } 692 693 private void saveRSGroupMapToZK(Map<String, RSGroupInfo> newGroupMap) throws IOException { 694 LOG.debug("Saving RSGroup info to ZK"); 695 try { 696 String groupBasePath = 697 ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE); 698 ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC); 699 700 List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size()); 701 for (String groupName : prevRSGroups) { 702 if (!newGroupMap.containsKey(groupName)) { 703 String znode = ZNodePaths.joinZNode(groupBasePath, groupName); 704 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 705 } 706 } 707 708 for (RSGroupInfo gi : newGroupMap.values()) { 709 if (!gi.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 710 String znode = ZNodePaths.joinZNode(groupBasePath, gi.getName()); 711 RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(gi); 712 LOG.debug("Updating znode: " + znode); 713 ZKUtil.createAndFailSilent(watcher, znode); 714 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 715 zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode, 716 ProtobufUtil.prependPBMagic(proto.toByteArray()))); 717 } 718 } 719 LOG.debug("Writing ZK GroupInfo count: " + zkOps.size()); 720 721 ZKUtil.multiOrSequential(watcher, zkOps, false); 722 } catch (KeeperException e) { 723 LOG.error("Failed to write to rsGroupZNode", e); 724 masterServices.abort("Failed to write to rsGroupZNode", e); 725 throw new IOException("Failed to write to rsGroupZNode", e); 726 } 727 } 728 729 /** 730 * Make changes visible. Caller must be synchronized on 'this'. 731 */ 732 private void resetRSGroupMap(Map<String, RSGroupInfo> newRSGroupMap) { 733 this.holder = new RSGroupInfoHolder(newRSGroupMap); 734 } 735 736 /** 737 * Update cache of rsgroups. Caller must be synchronized on 'this'. 738 * @param currentGroups Current list of Groups. 739 */ 740 private void updateCacheOfRSGroups(final Set<String> currentGroups) { 741 this.prevRSGroups.clear(); 742 this.prevRSGroups.addAll(currentGroups); 743 } 744 745 // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. 746 private SortedSet<Address> getDefaultServers() { 747 return getDefaultServers(listRSGroups()/* get from rsGroupMap */); 748 } 749 750 // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. 751 private SortedSet<Address> getDefaultServers(List<RSGroupInfo> rsGroupInfoList) { 752 // Build a list of servers in other groups than default group, from rsGroupMap 753 Set<Address> serversInOtherGroup = new HashSet<>(); 754 for (RSGroupInfo group : rsGroupInfoList) { 755 if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group 756 serversInOtherGroup.addAll(group.getServers()); 757 } 758 } 759 760 // Get all online servers from Zookeeper and find out servers in default group 761 SortedSet<Address> defaultServers = Sets.newTreeSet(); 762 for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) { 763 Address server = Address.fromParts(serverName.getHostname(), serverName.getPort()); 764 if (!serversInOtherGroup.contains(server)) { // not in other groups 765 defaultServers.add(server); 766 } 767 } 768 return defaultServers; 769 } 770 771 private class RSGroupStartupWorker extends Thread { 772 private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class); 773 private volatile boolean online = false; 774 775 RSGroupStartupWorker() { 776 super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName()); 777 setDaemon(true); 778 } 779 780 @Override 781 public void run() { 782 if (waitForGroupTableOnline()) { 783 LOG.info("GroupBasedLoadBalancer is now online"); 784 } else { 785 LOG.warn("Quit without making region group table online"); 786 } 787 } 788 789 private boolean waitForGroupTableOnline() { 790 while (isMasterRunning(masterServices)) { 791 try { 792 TableStateManager tsm = masterServices.getTableStateManager(); 793 if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) { 794 createRSGroupTable(); 795 } 796 // try reading from the table 797 FutureUtils.get(conn.getTable(RSGROUP_TABLE_NAME).get(new Get(ROW_KEY))); 798 LOG.info("RSGroup table={} is online, refreshing cached information", RSGROUP_TABLE_NAME); 799 RSGroupInfoManagerImpl.this.refresh(true); 800 online = true; 801 // flush any inconsistencies between ZK and HTable 802 RSGroupInfoManagerImpl.this.flushConfig(); 803 // migrate after we are online. 804 migrate(); 805 return true; 806 } catch (Exception e) { 807 LOG.warn("Failed to perform check", e); 808 // 100ms is short so let's just ignore the interrupt 809 Threads.sleepWithoutInterrupt(100); 810 } 811 } 812 return false; 813 } 814 815 private void createRSGroupTable() throws IOException { 816 OptionalLong optProcId = masterServices.getProcedures().stream() 817 .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p) 818 .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId) 819 .findFirst(); 820 long procId; 821 if (optProcId.isPresent()) { 822 procId = optProcId.getAsLong(); 823 } else { 824 LOG.debug("Creating group table {}", RSGROUP_TABLE_NAME); 825 procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC); 826 } 827 // wait for region to be online 828 int tries = 600; 829 while ( 830 !(masterServices.getMasterProcedureExecutor().isFinished(procId)) 831 && masterServices.getMasterProcedureExecutor().isRunning() && tries > 0 832 ) { 833 try { 834 Thread.sleep(100); 835 } catch (InterruptedException e) { 836 throw new IOException("Wait interrupted ", e); 837 } 838 tries--; 839 } 840 if (tries <= 0) { 841 throw new IOException("Failed to create group table in a given time."); 842 } else { 843 Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId); 844 if (result != null && result.isFailed()) { 845 throw new IOException( 846 "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result)); 847 } 848 } 849 } 850 851 public boolean isOnline() { 852 return online; 853 } 854 } 855 856 private static boolean isMasterRunning(MasterServices masterServices) { 857 return !masterServices.isAborted() && !masterServices.isStopped(); 858 } 859 860 private void multiMutate(List<Mutation> mutations) throws IOException { 861 MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder(); 862 for (Mutation mutation : mutations) { 863 if (mutation instanceof Put) { 864 builder 865 .addMutationRequest(ProtobufUtil.toMutation(MutationProto.MutationType.PUT, mutation)); 866 } else if (mutation instanceof Delete) { 867 builder 868 .addMutationRequest(ProtobufUtil.toMutation(MutationProto.MutationType.DELETE, mutation)); 869 } else { 870 throw new DoNotRetryIOException( 871 "multiMutate doesn't support " + mutation.getClass().getName()); 872 } 873 } 874 MutateRowsRequest request = builder.build(); 875 AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME); 876 LOG.debug("Multimutating {} with {} mutations", RSGROUP_TABLE_NAME, mutations.size()); 877 FutureUtils.get(table.<MultiRowMutationService, MutateRowsResponse> coprocessorService( 878 MultiRowMutationService::newStub, 879 (stub, controller, done) -> stub.mutateRows(controller, request, done), ROW_KEY)); 880 LOG.info("Multimutating {} with {} mutations done", RSGROUP_TABLE_NAME, mutations.size()); 881 } 882 883 private void checkGroupName(String groupName) throws ConstraintException { 884 if (!groupName.matches("[a-zA-Z0-9_]+")) { 885 throw new ConstraintException("RSGroup name should only contain alphanumeric characters"); 886 } 887 } 888 889 @Override 890 public RSGroupInfo getRSGroupForTable(TableName tableName) throws IOException { 891 return holder.tableName2Group.get(tableName); 892 } 893 894 /** 895 * Check if the set of servers are belong to dead servers list or online servers list. 896 * @param servers servers to remove 897 */ 898 private void checkForDeadOrOnlineServers(Set<Address> servers) throws IOException { 899 // This ugliness is because we only have Address, not ServerName. 900 Set<Address> onlineServers = new HashSet<>(); 901 List<ServerName> drainingServers = masterServices.getServerManager().getDrainingServersList(); 902 for (ServerName server : masterServices.getServerManager().getOnlineServers().keySet()) { 903 // Only online but not decommissioned servers are really online 904 if (!drainingServers.contains(server)) { 905 onlineServers.add(server.getAddress()); 906 } 907 } 908 909 Set<Address> deadServers = new HashSet<>(); 910 for (ServerName server : masterServices.getServerManager().getDeadServers().copyServerNames()) { 911 deadServers.add(server.getAddress()); 912 } 913 914 for (Address address : servers) { 915 if (onlineServers.contains(address)) { 916 throw new DoNotRetryIOException( 917 "Server " + address + " is an online server, not allowed to remove."); 918 } 919 if (deadServers.contains(address)) { 920 throw new DoNotRetryIOException("Server " + address + " is on the dead servers list," 921 + " Maybe it will come back again, not allowed to remove."); 922 } 923 } 924 } 925 926 private void checkOnlineServersOnly(Set<Address> servers) throws IOException { 927 // This uglyness is because we only have Address, not ServerName. 928 // Online servers are keyed by ServerName. 929 Set<Address> onlineServers = new HashSet<>(); 930 for (ServerName server : masterServices.getServerManager().getOnlineServers().keySet()) { 931 onlineServers.add(server.getAddress()); 932 } 933 for (Address address : servers) { 934 if (!onlineServers.contains(address)) { 935 throw new DoNotRetryIOException( 936 "Server " + address + " is not an online server in 'default' RSGroup."); 937 } 938 } 939 } 940 941 /** Returns List of Regions associated with this <code>server</code>. */ 942 private List<RegionInfo> getRegions(final Address server) { 943 LinkedList<RegionInfo> regions = new LinkedList<>(); 944 for (Map.Entry<RegionInfo, ServerName> el : masterServices.getAssignmentManager() 945 .getRegionStates().getRegionAssignments().entrySet()) { 946 if (el.getValue() == null) { 947 continue; 948 } 949 950 if (el.getValue().getAddress().equals(server)) { 951 addRegion(regions, el.getKey()); 952 } 953 } 954 for (RegionStateNode state : masterServices.getAssignmentManager().getRegionsInTransition()) { 955 if ( 956 state.getRegionLocation() != null && state.getRegionLocation().getAddress().equals(server) 957 ) { 958 addRegion(regions, state.getRegionInfo()); 959 } 960 } 961 return regions; 962 } 963 964 private void addRegion(final LinkedList<RegionInfo> regions, RegionInfo hri) { 965 // If meta, move it last otherwise other unassigns fail because meta is not 966 // online for them to update state in. This is dodgy. Needs to be made more 967 // robust. See TODO below. 968 if (hri.isMetaRegion()) { 969 regions.addLast(hri); 970 } else { 971 regions.addFirst(hri); 972 } 973 } 974 975 /** 976 * Move every region from servers which are currently located on these servers, but should not be 977 * located there. 978 * @param movedServers the servers that are moved to new group 979 * @param srcGrpServers all servers in the source group, excluding the movedServers 980 * @param targetGroupName the target group 981 * @param sourceGroupName the source group 982 * @throws IOException if moving the server and tables fail 983 */ 984 private void moveServerRegionsFromGroup(Set<Address> movedServers, Set<Address> srcGrpServers, 985 String targetGroupName, String sourceGroupName) throws IOException { 986 moveRegionsBetweenGroups(movedServers, srcGrpServers, targetGroupName, sourceGroupName, 987 rs -> getRegions(rs), info -> { 988 try { 989 String groupName = RSGroupUtil.getRSGroupInfo(masterServices, this, info.getTable()) 990 .map(RSGroupInfo::getName).orElse(RSGroupInfo.DEFAULT_GROUP); 991 return groupName.equals(targetGroupName); 992 } catch (IOException e) { 993 LOG.warn("Failed to test group for region {} and target group {}", info, targetGroupName); 994 return false; 995 } 996 }); 997 } 998 999 private <T> void moveRegionsBetweenGroups(Set<T> regionsOwners, Set<Address> newRegionsOwners, 1000 String targetGroupName, String sourceGroupName, Function<T, List<RegionInfo>> getRegionsInfo, 1001 Function<RegionInfo, Boolean> validation) throws IOException { 1002 // Get server names corresponding to given Addresses 1003 List<ServerName> movedServerNames = new ArrayList<>(regionsOwners.size()); 1004 List<ServerName> srcGrpServerNames = new ArrayList<>(newRegionsOwners.size()); 1005 for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) { 1006 // In case region move failed in previous attempt, regionsOwners and newRegionsOwners 1007 // can have the same servers. So for all servers below both conditions to be checked 1008 if (newRegionsOwners.contains(serverName.getAddress())) { 1009 srcGrpServerNames.add(serverName); 1010 } 1011 if (regionsOwners.contains(serverName.getAddress())) { 1012 movedServerNames.add(serverName); 1013 } 1014 } 1015 List<Pair<RegionInfo, Future<byte[]>>> assignmentFutures = new ArrayList<>(); 1016 int retry = 0; 1017 Set<String> failedRegions = new HashSet<>(); 1018 IOException toThrow = null; 1019 do { 1020 assignmentFutures.clear(); 1021 failedRegions.clear(); 1022 for (ServerName owner : movedServerNames) { 1023 // Get regions that are associated with this server and filter regions by group tables. 1024 for (RegionInfo region : getRegionsInfo.apply((T) owner.getAddress())) { 1025 if (!validation.apply(region)) { 1026 LOG.info("Moving region {}, which does not belong to RSGroup {}", 1027 region.getShortNameToLog(), targetGroupName); 1028 // Move region back to source RSGroup servers 1029 ServerName dest = 1030 masterServices.getLoadBalancer().randomAssignment(region, srcGrpServerNames); 1031 if (dest == null) { 1032 failedRegions.add(region.getRegionNameAsString()); 1033 continue; 1034 } 1035 RegionPlan rp = new RegionPlan(region, owner, dest); 1036 try { 1037 Future<byte[]> future = masterServices.getAssignmentManager().moveAsync(rp); 1038 assignmentFutures.add(Pair.newPair(region, future)); 1039 } catch (IOException ioe) { 1040 failedRegions.add(region.getRegionNameAsString()); 1041 LOG.debug("Move region {} failed, will retry, current retry time is {}", 1042 region.getShortNameToLog(), retry, ioe); 1043 toThrow = ioe; 1044 } 1045 } 1046 } 1047 } 1048 waitForRegionMovement(assignmentFutures, failedRegions, sourceGroupName, retry); 1049 if (failedRegions.isEmpty()) { 1050 LOG.info("All regions from {} are moved back to {}", movedServerNames, sourceGroupName); 1051 return; 1052 } else { 1053 try { 1054 wait(1000); 1055 } catch (InterruptedException e) { 1056 LOG.warn("Sleep interrupted", e); 1057 Thread.currentThread().interrupt(); 1058 } 1059 retry++; 1060 } 1061 } while ( 1062 !failedRegions.isEmpty() && retry <= masterServices.getConfiguration() 1063 .getInt(FAILED_MOVE_MAX_RETRY, DEFAULT_MAX_RETRY_VALUE) 1064 ); 1065 1066 // has up to max retry time or there are no more regions to move 1067 if (!failedRegions.isEmpty()) { 1068 // print failed moved regions, for later process conveniently 1069 String msg = String.format("move regions for group %s failed, failed regions: %s", 1070 sourceGroupName, failedRegions); 1071 LOG.error(msg); 1072 throw new DoNotRetryIOException( 1073 msg + ", just record the last failed region's cause, more details in server log", toThrow); 1074 } 1075 } 1076 1077 /** 1078 * Wait for all the region move to complete. Keep waiting for other region movement completion 1079 * even if some region movement fails. 1080 */ 1081 private void waitForRegionMovement(List<Pair<RegionInfo, Future<byte[]>>> regionMoveFutures, 1082 Set<String> failedRegions, String sourceGroupName, int retryCount) { 1083 LOG.info("Moving {} region(s) to group {}, current retry={}", regionMoveFutures.size(), 1084 sourceGroupName, retryCount); 1085 for (Pair<RegionInfo, Future<byte[]>> pair : regionMoveFutures) { 1086 try { 1087 pair.getSecond().get(); 1088 if ( 1089 masterServices.getAssignmentManager().getRegionStates().getRegionState(pair.getFirst()) 1090 .isFailedOpen() 1091 ) { 1092 failedRegions.add(pair.getFirst().getRegionNameAsString()); 1093 } 1094 } catch (InterruptedException e) { 1095 // Dont return form there lets wait for other regions to complete movement. 1096 failedRegions.add(pair.getFirst().getRegionNameAsString()); 1097 LOG.warn("Sleep interrupted", e); 1098 } catch (Exception e) { 1099 failedRegions.add(pair.getFirst().getRegionNameAsString()); 1100 LOG.error("Move region {} to group {} failed, will retry on next attempt", 1101 pair.getFirst().getShortNameToLog(), sourceGroupName, e); 1102 } 1103 } 1104 } 1105 1106 private boolean isTableInGroup(TableName tableName, String groupName, 1107 Set<TableName> tablesInGroupCache) throws IOException { 1108 if (tablesInGroupCache.contains(tableName)) { 1109 return true; 1110 } 1111 if ( 1112 RSGroupUtil.getRSGroupInfo(masterServices, this, tableName).map(RSGroupInfo::getName) 1113 .orElse(RSGroupInfo.DEFAULT_GROUP).equals(groupName) 1114 ) { 1115 tablesInGroupCache.add(tableName); 1116 return true; 1117 } 1118 return false; 1119 } 1120 1121 private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName) 1122 throws IOException { 1123 Map<String, RegionState> rit = Maps.newTreeMap(); 1124 Set<TableName> tablesInGroupCache = new HashSet<>(); 1125 for (RegionStateNode regionNode : masterServices.getAssignmentManager() 1126 .getRegionsInTransition()) { 1127 TableName tn = regionNode.getTable(); 1128 if (isTableInGroup(tn, groupName, tablesInGroupCache)) { 1129 rit.put(regionNode.getRegionInfo().getEncodedName(), regionNode.toRegionState()); 1130 } 1131 } 1132 return rit; 1133 } 1134 1135 /** 1136 * This is an EXPENSIVE clone. Cloning though is the safest thing to do. Can't let out original 1137 * since it can change and at least the load balancer wants to iterate this exported list. Load 1138 * balancer should iterate over this list because cloned list will ignore disabled table and split 1139 * parent region cases. This method is invoked by {@link #balanceRSGroup} 1140 * @return A clone of current assignments for this group. 1141 */ 1142 Map<TableName, Map<ServerName, List<RegionInfo>>> getRSGroupAssignmentsByTable( 1143 TableStateManager tableStateManager, String groupName) throws IOException { 1144 Map<TableName, Map<ServerName, List<RegionInfo>>> result = Maps.newHashMap(); 1145 Set<TableName> tablesInGroupCache = new HashSet<>(); 1146 for (Map.Entry<RegionInfo, ServerName> entry : masterServices.getAssignmentManager() 1147 .getRegionStates().getRegionAssignments().entrySet()) { 1148 RegionInfo region = entry.getKey(); 1149 TableName tn = region.getTable(); 1150 ServerName server = entry.getValue(); 1151 if (isTableInGroup(tn, groupName, tablesInGroupCache)) { 1152 if ( 1153 tableStateManager.isTableState(tn, TableState.State.DISABLED, TableState.State.DISABLING) 1154 ) { 1155 continue; 1156 } 1157 if (region.isSplitParent()) { 1158 continue; 1159 } 1160 result.computeIfAbsent(tn, k -> new HashMap<>()) 1161 .computeIfAbsent(server, k -> new ArrayList<>()).add(region); 1162 } 1163 } 1164 RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName); 1165 for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) { 1166 if (rsGroupInfo.containsServer(serverName.getAddress())) { 1167 for (Map<ServerName, List<RegionInfo>> map : result.values()) { 1168 map.computeIfAbsent(serverName, k -> Collections.emptyList()); 1169 } 1170 } 1171 } 1172 return result; 1173 } 1174 1175 @Override 1176 public BalanceResponse balanceRSGroup(String groupName, BalanceRequest request) 1177 throws IOException { 1178 ServerManager serverManager = masterServices.getServerManager(); 1179 LoadBalancer balancer = masterServices.getLoadBalancer(); 1180 getRSGroupInfo(groupName); 1181 1182 BalanceResponse.Builder responseBuilder = BalanceResponse.newBuilder(); 1183 1184 synchronized (balancer) { 1185 // If balance not true, don't run balancer. 1186 if (!masterServices.isBalancerOn() && !request.isDryRun()) { 1187 return responseBuilder.build(); 1188 } 1189 1190 // Only allow one balance run at at time. 1191 Map<String, RegionState> groupRIT = rsGroupGetRegionsInTransition(groupName); 1192 if (groupRIT.size() > 0 && !request.isIgnoreRegionsInTransition()) { 1193 LOG.debug("Not running balancer because {} region(s) in transition: {}", groupRIT.size(), 1194 StringUtils.abbreviate(masterServices.getAssignmentManager().getRegionStates() 1195 .getRegionsInTransition().toString(), 256)); 1196 return responseBuilder.build(); 1197 } 1198 1199 if (serverManager.areDeadServersInProgress()) { 1200 LOG.debug("Not running balancer because processing dead regionserver(s): {}", 1201 serverManager.getDeadServers()); 1202 return responseBuilder.build(); 1203 } 1204 1205 // We balance per group instead of per table 1206 Map<TableName, Map<ServerName, List<RegionInfo>>> assignmentsByTable = 1207 getRSGroupAssignmentsByTable(masterServices.getTableStateManager(), groupName); 1208 List<RegionPlan> plans = balancer.balanceCluster(assignmentsByTable); 1209 boolean balancerRan = !plans.isEmpty(); 1210 1211 responseBuilder.setBalancerRan(balancerRan).setMovesCalculated(plans.size()); 1212 1213 if (balancerRan && !request.isDryRun()) { 1214 LOG.info("RSGroup balance {} starting with plan count: {}", groupName, plans.size()); 1215 List<RegionPlan> executed = masterServices.executeRegionPlansWithThrottling(plans); 1216 responseBuilder.setMovesExecuted(executed.size()); 1217 LOG.info("RSGroup balance " + groupName + " completed"); 1218 } 1219 1220 return responseBuilder.build(); 1221 } 1222 } 1223 1224 private void moveTablesAndWait(Set<TableName> tables, String targetGroup) throws IOException { 1225 LOG.debug("Moving {} tables to target group {}", tables.size(), targetGroup); 1226 List<Long> procIds = new ArrayList<Long>(); 1227 for (TableName tableName : tables) { 1228 TableDescriptor oldTd = masterServices.getTableDescriptors().get(tableName); 1229 if (oldTd == null) { 1230 continue; 1231 } 1232 TableDescriptor newTd = 1233 TableDescriptorBuilder.newBuilder(oldTd).setRegionServerGroup(targetGroup).build(); 1234 procIds.add( 1235 masterServices.modifyTable(tableName, newTd, HConstants.NO_NONCE, HConstants.NO_NONCE)); 1236 } 1237 for (long procId : procIds) { 1238 Procedure<?> proc = masterServices.getMasterProcedureExecutor().getProcedure(procId); 1239 if (proc == null) { 1240 continue; 1241 } 1242 ProcedureSyncWait.waitForProcedureToCompleteIOE(masterServices.getMasterProcedureExecutor(), 1243 proc, Long.MAX_VALUE); 1244 } 1245 LOG.info("Move tables done: moved {} tables to {}", tables.size(), targetGroup); 1246 if (LOG.isDebugEnabled()) { 1247 LOG.debug("Tables moved to {}: {}", targetGroup, tables); 1248 } 1249 } 1250 1251 @Override 1252 public void setRSGroup(Set<TableName> tables, String groupName) throws IOException { 1253 getRSGroupInfo(groupName); 1254 moveTablesAndWait(tables, groupName); 1255 } 1256 1257 public void moveServers(Set<Address> servers, String targetGroupName) throws IOException { 1258 if (servers == null) { 1259 throw new ConstraintException("The list of servers to move cannot be null."); 1260 } 1261 if (servers.isEmpty()) { 1262 // For some reason this difference between null servers and isEmpty is important distinction. 1263 // TODO. Why? Stuff breaks if I equate them. 1264 return; 1265 } 1266 if (StringUtils.isEmpty(targetGroupName)) { 1267 throw new ConstraintException("RSGroup cannot be null."); 1268 } 1269 1270 // Hold a lock on the manager instance while moving servers to prevent 1271 // another writer changing our state while we are working. 1272 synchronized (this) { 1273 // Presume first server's source group. Later ensure all servers are from this group. 1274 Address firstServer = servers.iterator().next(); 1275 RSGroupInfo srcGrp = getRSGroupOfServer(firstServer); 1276 if (srcGrp == null) { 1277 // Be careful. This exception message is tested for in TestRSGroupAdmin2... 1278 throw new ConstraintException( 1279 "Server " + firstServer + " is either offline or it does not exist."); 1280 } 1281 1282 // Only move online servers (when moving from 'default') or servers from other 1283 // groups. This prevents bogus servers from entering groups 1284 if (RSGroupInfo.DEFAULT_GROUP.equals(srcGrp.getName())) { 1285 if (srcGrp.getServers().size() <= servers.size()) { 1286 throw new ConstraintException(KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE); 1287 } 1288 checkOnlineServersOnly(servers); 1289 } 1290 // Ensure all servers are of same rsgroup. 1291 for (Address server : servers) { 1292 String tmpGroup = getRSGroupOfServer(server).getName(); 1293 if (!tmpGroup.equals(srcGrp.getName())) { 1294 throw new ConstraintException("Move server request should only come from one source " 1295 + "RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup); 1296 } 1297 } 1298 if (srcGrp.getServers().size() <= servers.size()) { 1299 // check if there are still tables reference this group 1300 for (TableDescriptor td : masterServices.getTableDescriptors().getAll().values()) { 1301 Optional<String> optGroupName = td.getRegionServerGroup(); 1302 if (optGroupName.isPresent() && optGroupName.get().equals(srcGrp.getName())) { 1303 throw new ConstraintException( 1304 "Cannot leave a RSGroup " + srcGrp.getName() + " that contains tables('" 1305 + td.getTableName() + "' at least) without servers to host them."); 1306 } 1307 } 1308 } 1309 1310 // MovedServers may be < passed in 'servers'. 1311 Set<Address> movedServers = moveServers(servers, srcGrp.getName(), targetGroupName); 1312 moveServerRegionsFromGroup(movedServers, srcGrp.getServers(), targetGroupName, 1313 srcGrp.getName()); 1314 LOG.info("Move servers done: moved {} servers from {} to {}", movedServers.size(), 1315 srcGrp.getName(), targetGroupName); 1316 if (LOG.isDebugEnabled()) { 1317 LOG.debug("Servers moved from {} to {}: {}", srcGrp.getName(), targetGroupName, 1318 movedServers); 1319 } 1320 } 1321 } 1322 1323 @Override 1324 public String determineRSGroupInfoForTable(TableName tableName) { 1325 return script.getRSGroup(tableName.getNamespaceAsString(), tableName.getQualifierAsString()); 1326 } 1327 1328 @Override 1329 public synchronized void renameRSGroup(String oldName, String newName) throws IOException { 1330 if (oldName.equals(RSGroupInfo.DEFAULT_GROUP)) { 1331 throw new ConstraintException(RSGroupInfo.DEFAULT_GROUP + " can't be rename"); 1332 } 1333 checkGroupName(newName); 1334 // getRSGroupInfo validates old RSGroup existence. 1335 RSGroupInfo oldRSG = getRSGroupInfo(oldName); 1336 Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group; 1337 if (rsGroupMap.containsKey(newName)) { 1338 throw new ConstraintException("Group already exists: " + newName); 1339 } 1340 1341 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 1342 newGroupMap.remove(oldRSG.getName()); 1343 RSGroupInfo newRSG = new RSGroupInfo(newName, oldRSG.getServers()); 1344 newGroupMap.put(newName, newRSG); 1345 flushConfig(newGroupMap); 1346 Set<TableName> updateTables = masterServices.getTableDescriptors().getAll().values().stream() 1347 .filter(t -> oldName.equals(t.getRegionServerGroup().orElse(null))) 1348 .map(TableDescriptor::getTableName).collect(Collectors.toSet()); 1349 setRSGroup(updateTables, newName); 1350 LOG.info("Rename RSGroup done: {} => {}", oldName, newName); 1351 } 1352 1353 @Override 1354 public synchronized void updateRSGroupConfig(String groupName, Map<String, String> configuration) 1355 throws IOException { 1356 if (RSGroupInfo.DEFAULT_GROUP.equals(groupName)) { 1357 // We do not persist anything of default group, therefore, it is not supported to update 1358 // default group's configuration which lost once master down. 1359 throw new ConstraintException( 1360 "configuration of " + RSGroupInfo.DEFAULT_GROUP + " can't be stored persistently"); 1361 } 1362 RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName); 1363 rsGroupInfo.getConfiguration().forEach((k, v) -> rsGroupInfo.removeConfiguration(k)); 1364 configuration.forEach((k, v) -> rsGroupInfo.setConfiguration(k, v)); 1365 flushConfig(); 1366 } 1367}