001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rsgroup; 019 020import java.io.ByteArrayInputStream; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.OptionalLong; 032import java.util.Set; 033import java.util.SortedSet; 034import java.util.TreeSet; 035import java.util.concurrent.Future; 036import java.util.function.Function; 037import java.util.stream.Collectors; 038import org.apache.commons.lang3.StringUtils; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Coprocessor; 041import org.apache.hadoop.hbase.DoNotRetryIOException; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.NamespaceDescriptor; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.TableDescriptors; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.AsyncClusterConnection; 048import org.apache.hadoop.hbase.client.AsyncTable; 049import org.apache.hadoop.hbase.client.BalanceRequest; 050import org.apache.hadoop.hbase.client.BalanceResponse; 051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 052import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; 053import org.apache.hadoop.hbase.client.Delete; 054import org.apache.hadoop.hbase.client.Get; 055import org.apache.hadoop.hbase.client.Mutation; 056import org.apache.hadoop.hbase.client.Put; 057import org.apache.hadoop.hbase.client.RegionInfo; 058import org.apache.hadoop.hbase.client.Result; 059import org.apache.hadoop.hbase.client.ResultScanner; 060import org.apache.hadoop.hbase.client.TableDescriptor; 061import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 062import org.apache.hadoop.hbase.client.TableState; 063import org.apache.hadoop.hbase.constraint.ConstraintException; 064import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 065import org.apache.hadoop.hbase.exceptions.DeserializationException; 066import org.apache.hadoop.hbase.master.LoadBalancer; 067import org.apache.hadoop.hbase.master.MasterServices; 068import org.apache.hadoop.hbase.master.RegionPlan; 069import org.apache.hadoop.hbase.master.RegionState; 070import org.apache.hadoop.hbase.master.ServerListener; 071import org.apache.hadoop.hbase.master.ServerManager; 072import org.apache.hadoop.hbase.master.TableStateManager; 073import org.apache.hadoop.hbase.master.assignment.RegionStateNode; 074import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; 075import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 076import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 077import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 078import org.apache.hadoop.hbase.net.Address; 079import org.apache.hadoop.hbase.procedure2.Procedure; 080import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 081import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 082import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 083import org.apache.hadoop.hbase.util.Bytes; 084import org.apache.hadoop.hbase.util.FutureUtils; 085import org.apache.hadoop.hbase.util.Pair; 086import org.apache.hadoop.hbase.util.Threads; 087import org.apache.hadoop.hbase.zookeeper.ZKUtil; 088import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 089import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 090import org.apache.hadoop.util.Shell; 091import org.apache.yetus.audience.InterfaceAudience; 092import org.apache.zookeeper.KeeperException; 093import org.slf4j.Logger; 094import org.slf4j.LoggerFactory; 095 096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 097import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 098import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 099import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 100 101import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupProtos; 107 108/** 109 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the 110 * persistence store for the group information. It also makes use of zookeeper to store group 111 * information needed for bootstrapping during offline mode. 112 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached 113 * RSGroupInfo Map at {@link RSGroupInfoHolder#groupName2Group}. These Maps are persisted to the 114 * hbase:rsgroup table (and cached in zk) on each modification. 115 * <p/> 116 * Mutations on state are synchronized but reads can continue without having to wait on an instance 117 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of 118 * state are read-only, just-in-case (see flushConfig). 119 * <p/> 120 * Reads must not block else there is a danger we'll deadlock. 121 * <p/> 122 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act 123 * on the results of the query modifying cache in zookeeper without another thread making 124 * intermediate modifications. These clients synchronize on the 'this' instance so no other has 125 * access concurrently. Reads must be able to continue concurrently. 126 */ 127@InterfaceAudience.Private 128final class RSGroupInfoManagerImpl implements RSGroupInfoManager { 129 private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class); 130 131 // Assigned before user tables 132 static final TableName RSGROUP_TABLE_NAME = 133 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup"); 134 135 static final String KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE = 136 "should keep at least " + "one server in 'default' RSGroup."; 137 138 /** Define the config key of retries threshold when movements failed */ 139 static final String FAILED_MOVE_MAX_RETRY = "hbase.rsgroup.move.max.retry"; 140 141 /** Define the default number of retries */ 142 static final int DEFAULT_MAX_RETRY_VALUE = 50; 143 144 private static final String RS_GROUP_ZNODE = "rsgroup"; 145 146 static final byte[] META_FAMILY_BYTES = Bytes.toBytes("m"); 147 148 static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i"); 149 150 static final String MIGRATE_THREAD_NAME = "Migrate-RSGroup-Tables"; 151 152 private static final byte[] ROW_KEY = { 0 }; 153 154 /** Table descriptor for <code>hbase:rsgroup</code> catalog table */ 155 private static final TableDescriptor RSGROUP_TABLE_DESC; 156 static { 157 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME) 158 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES)) 159 .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName()); 160 try { 161 builder.setCoprocessor( 162 CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName()) 163 .setPriority(Coprocessor.PRIORITY_SYSTEM).build()); 164 } catch (IOException ex) { 165 throw new Error(ex); 166 } 167 RSGROUP_TABLE_DESC = builder.build(); 168 } 169 170 // There two Maps are immutable and wholesale replaced on each modification 171 // so are safe to access concurrently. See class comment. 172 private static final class RSGroupInfoHolder { 173 final ImmutableMap<String, RSGroupInfo> groupName2Group; 174 final ImmutableMap<TableName, RSGroupInfo> tableName2Group; 175 176 RSGroupInfoHolder() { 177 this(Collections.emptyMap()); 178 } 179 180 RSGroupInfoHolder(Map<String, RSGroupInfo> rsGroupMap) { 181 ImmutableMap.Builder<String, RSGroupInfo> group2Name2GroupBuilder = ImmutableMap.builder(); 182 ImmutableMap.Builder<TableName, RSGroupInfo> tableName2GroupBuilder = ImmutableMap.builder(); 183 rsGroupMap.forEach((groupName, rsGroupInfo) -> { 184 group2Name2GroupBuilder.put(groupName, rsGroupInfo); 185 if (!groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { 186 rsGroupInfo.getTables() 187 .forEach(tableName -> tableName2GroupBuilder.put(tableName, rsGroupInfo)); 188 } 189 }); 190 this.groupName2Group = group2Name2GroupBuilder.build(); 191 this.tableName2Group = tableName2GroupBuilder.build(); 192 } 193 } 194 195 private volatile RSGroupInfoHolder holder = new RSGroupInfoHolder(); 196 197 private final MasterServices masterServices; 198 private final AsyncClusterConnection conn; 199 private final ZKWatcher watcher; 200 private final RSGroupStartupWorker rsGroupStartupWorker; 201 // contains list of groups that were last flushed to persistent store 202 private Set<String> prevRSGroups = new HashSet<>(); 203 204 // Package visibility for testing 205 static class RSGroupMappingScript { 206 static final String RS_GROUP_MAPPING_SCRIPT = "hbase.rsgroup.table.mapping.script"; 207 static final String RS_GROUP_MAPPING_SCRIPT_TIMEOUT = 208 "hbase.rsgroup.table.mapping.script.timeout"; 209 private Shell.ShellCommandExecutor rsgroupMappingScript; 210 211 RSGroupMappingScript(Configuration conf) { 212 String script = conf.get(RS_GROUP_MAPPING_SCRIPT); 213 if (script == null || script.isEmpty()) { 214 return; 215 } 216 217 rsgroupMappingScript = new Shell.ShellCommandExecutor(new String[] { script, "", "" }, null, 218 null, conf.getLong(RS_GROUP_MAPPING_SCRIPT_TIMEOUT, 5000) // 5 seconds 219 ); 220 } 221 222 String getRSGroup(String namespace, String tablename) { 223 if (rsgroupMappingScript == null) { 224 return null; 225 } 226 String[] exec = rsgroupMappingScript.getExecString(); 227 exec[1] = namespace; 228 exec[2] = tablename; 229 try { 230 rsgroupMappingScript.execute(); 231 } catch (IOException e) { 232 // This exception may happen, like process doesn't have permission to run this script. 233 LOG.error("{}, placing {} back to default rsgroup", e.getMessage(), 234 TableName.valueOf(namespace, tablename)); 235 return RSGroupInfo.DEFAULT_GROUP; 236 } 237 return rsgroupMappingScript.getOutput().trim(); 238 } 239 } 240 241 private RSGroupMappingScript script; 242 243 private RSGroupInfoManagerImpl(MasterServices masterServices) { 244 this.masterServices = masterServices; 245 this.watcher = masterServices.getZooKeeper(); 246 this.conn = masterServices.getAsyncClusterConnection(); 247 this.rsGroupStartupWorker = new RSGroupStartupWorker(); 248 this.script = new RSGroupMappingScript(masterServices.getConfiguration()); 249 } 250 251 private synchronized void updateDefaultServers() { 252 LOG.info("Updating default servers."); 253 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group); 254 RSGroupInfo oldDefaultGroupInfo = getRSGroup(RSGroupInfo.DEFAULT_GROUP); 255 assert oldDefaultGroupInfo != null; 256 RSGroupInfo newDefaultGroupInfo = 257 new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers()); 258 newDefaultGroupInfo.addAllTables(oldDefaultGroupInfo.getTables()); 259 newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroupInfo); 260 // do not need to persist, as we do not persist default group. 261 resetRSGroupMap(newGroupMap); 262 LOG.info("Updated default servers, {} servers", newDefaultGroupInfo.getServers().size()); 263 if (LOG.isDebugEnabled()) { 264 LOG.debug("New default servers list: {}", newDefaultGroupInfo.getServers()); 265 } 266 } 267 268 private synchronized void init() throws IOException { 269 refresh(false); 270 masterServices.getServerManager().registerListener(new ServerListener() { 271 272 @Override 273 public void serverAdded(ServerName serverName) { 274 updateDefaultServers(); 275 } 276 277 @Override 278 public void serverRemoved(ServerName serverName) { 279 updateDefaultServers(); 280 } 281 }); 282 } 283 284 static RSGroupInfoManager getInstance(MasterServices masterServices) throws IOException { 285 RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(masterServices); 286 instance.init(); 287 return instance; 288 } 289 290 public void start() { 291 // create system table of rsgroup 292 rsGroupStartupWorker.start(); 293 } 294 295 @Override 296 public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException { 297 checkGroupName(rsGroupInfo.getName()); 298 Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group; 299 if ( 300 rsGroupMap.get(rsGroupInfo.getName()) != null 301 || rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP) 302 ) { 303 throw new ConstraintException("Group already exists: " + rsGroupInfo.getName()); 304 } 305 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 306 newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo); 307 flushConfig(newGroupMap); 308 LOG.info("Add group {} done.", rsGroupInfo.getName()); 309 } 310 311 private RSGroupInfo getRSGroupInfo(final String groupName) throws ConstraintException { 312 RSGroupInfo rsGroupInfo = holder.groupName2Group.get(groupName); 313 if (rsGroupInfo == null) { 314 throw new ConstraintException("RSGroup " + groupName + " does not exist"); 315 } 316 return rsGroupInfo; 317 } 318 319 /** 320 * @return Set of online Servers named for their hostname and port (not ServerName). 321 */ 322 private Set<Address> getOnlineServers() { 323 return masterServices.getServerManager().getOnlineServers().keySet().stream() 324 .map(ServerName::getAddress).collect(Collectors.toSet()); 325 } 326 327 public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup, 328 String dstGroup) throws IOException { 329 RSGroupInfo src = getRSGroupInfo(srcGroup); 330 RSGroupInfo dst = getRSGroupInfo(dstGroup); 331 Set<Address> movedServers = new HashSet<>(); 332 // If destination is 'default' rsgroup, only add servers that are online. If not online, drop 333 // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a 334 // rsgroup of dead servers that are to come back later). 335 Set<Address> onlineServers = 336 dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers() : null; 337 for (Address el : servers) { 338 src.removeServer(el); 339 if (onlineServers != null) { 340 if (!onlineServers.contains(el)) { 341 if (LOG.isDebugEnabled()) { 342 LOG.debug("Dropping " + el + " during move-to-default RSGroup because not online"); 343 } 344 continue; 345 } 346 } 347 dst.addServer(el); 348 movedServers.add(el); 349 } 350 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group); 351 newGroupMap.put(src.getName(), src); 352 newGroupMap.put(dst.getName(), dst); 353 flushConfig(newGroupMap); 354 return movedServers; 355 } 356 357 @Override 358 public RSGroupInfo getRSGroupOfServer(Address serverHostPort) { 359 for (RSGroupInfo info : holder.groupName2Group.values()) { 360 if (info.containsServer(serverHostPort)) { 361 return info; 362 } 363 } 364 return null; 365 } 366 367 @Override 368 public RSGroupInfo getRSGroup(String groupName) { 369 return holder.groupName2Group.get(groupName); 370 } 371 372 @Override 373 public synchronized void removeRSGroup(String groupName) throws IOException { 374 RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName); 375 int serverCount = rsGroupInfo.getServers().size(); 376 if (serverCount > 0) { 377 throw new ConstraintException("RSGroup " + groupName + " has " + serverCount 378 + " servers; you must remove these servers from the RSGroup before" 379 + " the RSGroup can be removed."); 380 } 381 for (TableDescriptor td : masterServices.getTableDescriptors().getAll().values()) { 382 if (td.getRegionServerGroup().map(groupName::equals).orElse(false)) { 383 throw new ConstraintException("RSGroup " + groupName + " is already referenced by " 384 + td.getTableName() + "; you must remove all the tables from the RSGroup before " 385 + "the RSGroup can be removed."); 386 } 387 } 388 for (NamespaceDescriptor ns : masterServices.getClusterSchema().getNamespaces()) { 389 String nsGroup = ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); 390 if (nsGroup != null && nsGroup.equals(groupName)) { 391 throw new ConstraintException( 392 "RSGroup " + groupName + " is referenced by namespace: " + ns.getName()); 393 } 394 } 395 Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group; 396 if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { 397 throw new ConstraintException( 398 "Group " + groupName + " does not exist or is a reserved " + "group"); 399 } 400 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 401 newGroupMap.remove(groupName); 402 flushConfig(newGroupMap); 403 LOG.info("Remove group {} done", groupName); 404 } 405 406 @Override 407 public List<RSGroupInfo> listRSGroups() { 408 return Lists.newArrayList(holder.groupName2Group.values()); 409 } 410 411 @Override 412 public boolean isOnline() { 413 return rsGroupStartupWorker.isOnline(); 414 } 415 416 @Override 417 public synchronized void removeServers(Set<Address> servers) throws IOException { 418 if (servers == null || servers.isEmpty()) { 419 throw new ConstraintException("The set of servers to remove cannot be null or empty."); 420 } 421 422 // check the set of servers 423 checkForDeadOrOnlineServers(servers); 424 425 Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>(); 426 for (Address el : servers) { 427 RSGroupInfo rsGroupInfo = getRSGroupOfServer(el); 428 if (rsGroupInfo != null) { 429 RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName()); 430 if (newRsGroupInfo == null) { 431 rsGroupInfo.removeServer(el); 432 rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo); 433 } else { 434 newRsGroupInfo.removeServer(el); 435 rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo); 436 } 437 } else { 438 LOG.warn("Server " + el + " does not belong to any rsgroup."); 439 } 440 } 441 442 if (rsGroupInfos.size() > 0) { 443 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group); 444 newGroupMap.putAll(rsGroupInfos); 445 flushConfig(newGroupMap); 446 } 447 LOG.info("Remove decommissioned servers {} from RSGroup done", servers); 448 } 449 450 private List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException { 451 List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList(); 452 AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME); 453 try (ResultScanner scanner = table.getScanner(META_FAMILY_BYTES, META_QUALIFIER_BYTES)) { 454 for (Result result;;) { 455 result = scanner.next(); 456 if (result == null) { 457 break; 458 } 459 RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo 460 .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES)); 461 rsGroupInfoList.add(ProtobufUtil.toGroupInfo(proto)); 462 } 463 } 464 return rsGroupInfoList; 465 } 466 467 private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException { 468 String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE); 469 List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList(); 470 // Overwrite any info stored by table, this takes precedence 471 try { 472 if (ZKUtil.checkExists(watcher, groupBasePath) != -1) { 473 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath); 474 if (children == null) { 475 return RSGroupInfoList; 476 } 477 for (String znode : children) { 478 byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode)); 479 if (data != null && data.length > 0) { 480 ProtobufUtil.expectPBMagicPrefix(data); 481 ByteArrayInputStream bis = 482 new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length); 483 RSGroupInfoList.add(ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); 484 } 485 } 486 LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size()); 487 } 488 } catch (KeeperException | DeserializationException | InterruptedException e) { 489 throw new IOException("Failed to read rsGroupZNode", e); 490 } 491 return RSGroupInfoList; 492 } 493 494 private void migrate(Collection<RSGroupInfo> groupList) { 495 TableDescriptors tds = masterServices.getTableDescriptors(); 496 ProcedureExecutor<MasterProcedureEnv> procExec = masterServices.getMasterProcedureExecutor(); 497 for (RSGroupInfo groupInfo : groupList) { 498 if (groupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 499 continue; 500 } 501 SortedSet<TableName> failedTables = new TreeSet<>(); 502 List<MigrateRSGroupProcedure> procs = new ArrayList<>(); 503 for (TableName tableName : groupInfo.getTables()) { 504 LOG.debug("Migrating {} in group {}", tableName, groupInfo.getName()); 505 TableDescriptor oldTd; 506 try { 507 oldTd = tds.get(tableName); 508 } catch (IOException e) { 509 LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e); 510 failedTables.add(tableName); 511 continue; 512 } 513 if (oldTd == null) { 514 continue; 515 } 516 if (oldTd.getRegionServerGroup().isPresent()) { 517 // either we have already migrated it or that user has set the rs group using the new 518 // code which will set the group directly on table descriptor, skip. 519 LOG.debug("Skip migrating {} since it is already in group {}", tableName, 520 oldTd.getRegionServerGroup().get()); 521 continue; 522 } 523 // This is a bit tricky. Since we know that the region server group config in 524 // TableDescriptor will only be used at master side, it is fine to just update the table 525 // descriptor on file system and also the cache, without reopening all the regions. This 526 // will be much faster than the normal modifyTable. And when upgrading, we will update 527 // master first and then region server, so after all the region servers has been reopened, 528 // the new TableDescriptor will be loaded. 529 MigrateRSGroupProcedure proc = 530 new MigrateRSGroupProcedure(procExec.getEnvironment(), tableName); 531 procExec.submitProcedure(proc); 532 procs.add(proc); 533 } 534 for (MigrateRSGroupProcedure proc : procs) { 535 try { 536 ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60000); 537 } catch (IOException e) { 538 LOG.warn("Failed to migrate rs group {} for table {}", groupInfo.getName(), 539 proc.getTableName()); 540 failedTables.add(proc.getTableName()); 541 } 542 } 543 LOG.debug("Done migrating {}, failed tables {}", groupInfo.getName(), failedTables); 544 synchronized (RSGroupInfoManagerImpl.this) { 545 Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group; 546 RSGroupInfo currentInfo = rsGroupMap.get(groupInfo.getName()); 547 if (currentInfo != null) { 548 RSGroupInfo newInfo = 549 new RSGroupInfo(currentInfo.getName(), currentInfo.getServers(), failedTables); 550 Map<String, RSGroupInfo> newGroupMap = new HashMap<>(rsGroupMap); 551 newGroupMap.put(groupInfo.getName(), newInfo); 552 try { 553 flushConfig(newGroupMap); 554 } catch (IOException e) { 555 LOG.warn("Failed to persist rs group {}", newInfo.getName(), e); 556 } 557 } 558 } 559 } 560 } 561 562 // Migrate the table rs group info from RSGroupInfo into the table descriptor 563 // Notice that we do not want to block the initialize so this will be done in background, and 564 // during the migrating, the rs group info maybe incomplete and cause region to be misplaced. 565 private void migrate() { 566 Thread migrateThread = new Thread(MIGRATE_THREAD_NAME) { 567 568 @Override 569 public void run() { 570 LOG.info("Start migrating table rs group config"); 571 while (!masterServices.isStopped()) { 572 Collection<RSGroupInfo> groups = holder.groupName2Group.values(); 573 boolean hasTables = groups.stream().anyMatch(r -> !r.getTables().isEmpty()); 574 if (!hasTables) { 575 break; 576 } 577 migrate(groups); 578 } 579 LOG.info("Done migrating table rs group info"); 580 } 581 }; 582 migrateThread.setDaemon(true); 583 migrateThread.start(); 584 } 585 586 /** 587 * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on 588 * startup of the manager. 589 */ 590 private synchronized void refresh(boolean forceOnline) throws IOException { 591 List<RSGroupInfo> groupList = new ArrayList<>(); 592 593 // Overwrite anything read from zk, group table is source of truth 594 // if online read from GROUP table 595 if (forceOnline || isOnline()) { 596 LOG.debug("Refreshing in Online mode."); 597 groupList.addAll(retrieveGroupListFromGroupTable()); 598 } else { 599 LOG.debug("Refreshing in Offline mode."); 600 groupList.addAll(retrieveGroupListFromZookeeper()); 601 } 602 603 // This is added to the last of the list so it overwrites the 'default' rsgroup loaded 604 // from region group table or zk 605 groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(groupList))); 606 607 // populate the data 608 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(); 609 for (RSGroupInfo group : groupList) { 610 newGroupMap.put(group.getName(), group); 611 } 612 resetRSGroupMap(newGroupMap); 613 updateCacheOfRSGroups(newGroupMap.keySet()); 614 } 615 616 private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOException { 617 List<Mutation> mutations = Lists.newArrayList(); 618 619 // populate deletes 620 for (String groupName : prevRSGroups) { 621 if (!groupMap.containsKey(groupName)) { 622 Delete d = new Delete(Bytes.toBytes(groupName)); 623 mutations.add(d); 624 } 625 } 626 627 // populate puts 628 for (RSGroupInfo gi : groupMap.values()) { 629 if (!gi.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 630 RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(gi); 631 Put p = new Put(Bytes.toBytes(gi.getName())); 632 p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray()); 633 mutations.add(p); 634 } 635 } 636 637 if (mutations.size() > 0) { 638 multiMutate(mutations); 639 } 640 } 641 642 private synchronized void flushConfig() throws IOException { 643 flushConfig(holder.groupName2Group); 644 } 645 646 private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException { 647 // For offline mode persistence is still unavailable 648 // We're refreshing in-memory state but only for servers in default group 649 if (!isOnline()) { 650 if (newGroupMap == holder.groupName2Group) { 651 // When newGroupMap is this.rsGroupMap itself, 652 // do not need to check default group and other groups as followed 653 return; 654 } 655 656 LOG.debug("Offline mode, cannot persist to {}", RSGROUP_TABLE_NAME); 657 658 Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(holder.groupName2Group); 659 RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 660 RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 661 if ( 662 !oldGroupMap.equals(newGroupMap) 663 /* compare both tables and servers in other groups */ || !oldDefaultGroup.getTables() 664 .equals(newDefaultGroup.getTables()) 665 /* compare tables in default group */ 666 ) { 667 throw new IOException("Only servers in default group can be updated during offline mode"); 668 } 669 670 // Restore newGroupMap by putting its default group back 671 newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup); 672 673 // Refresh rsGroupMap 674 // according to the inputted newGroupMap (an updated copy of rsGroupMap) 675 this.holder = new RSGroupInfoHolder(newGroupMap); 676 677 LOG.debug("New RSGroup map: {}", newGroupMap); 678 679 // Do not need to update tableMap 680 // because only the update on servers in default group is allowed above, 681 // or IOException will be thrown 682 return; 683 } 684 685 /* For online mode, persist to hbase:rsgroup and Zookeeper */ 686 LOG.debug("Online mode, persisting to {} and ZK", RSGROUP_TABLE_NAME); 687 flushConfigTable(newGroupMap); 688 689 // Make changes visible after having been persisted to the source of truth 690 resetRSGroupMap(newGroupMap); 691 saveRSGroupMapToZK(newGroupMap); 692 updateCacheOfRSGroups(newGroupMap.keySet()); 693 LOG.info("Flush config done, new RSGroup map: {}", newGroupMap); 694 } 695 696 private void saveRSGroupMapToZK(Map<String, RSGroupInfo> newGroupMap) throws IOException { 697 LOG.debug("Saving RSGroup info to ZK"); 698 try { 699 String groupBasePath = 700 ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE); 701 ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC); 702 703 List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size()); 704 for (String groupName : prevRSGroups) { 705 if (!newGroupMap.containsKey(groupName)) { 706 String znode = ZNodePaths.joinZNode(groupBasePath, groupName); 707 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 708 } 709 } 710 711 for (RSGroupInfo gi : newGroupMap.values()) { 712 if (!gi.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 713 String znode = ZNodePaths.joinZNode(groupBasePath, gi.getName()); 714 RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(gi); 715 LOG.debug("Updating znode: " + znode); 716 ZKUtil.createAndFailSilent(watcher, znode); 717 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 718 zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode, 719 ProtobufUtil.prependPBMagic(proto.toByteArray()))); 720 } 721 } 722 LOG.debug("Writing ZK GroupInfo count: " + zkOps.size()); 723 724 ZKUtil.multiOrSequential(watcher, zkOps, false); 725 } catch (KeeperException e) { 726 LOG.error("Failed to write to rsGroupZNode", e); 727 masterServices.abort("Failed to write to rsGroupZNode", e); 728 throw new IOException("Failed to write to rsGroupZNode", e); 729 } 730 } 731 732 /** 733 * Make changes visible. Caller must be synchronized on 'this'. 734 */ 735 private void resetRSGroupMap(Map<String, RSGroupInfo> newRSGroupMap) { 736 this.holder = new RSGroupInfoHolder(newRSGroupMap); 737 } 738 739 /** 740 * Update cache of rsgroups. Caller must be synchronized on 'this'. 741 * @param currentGroups Current list of Groups. 742 */ 743 private void updateCacheOfRSGroups(final Set<String> currentGroups) { 744 this.prevRSGroups.clear(); 745 this.prevRSGroups.addAll(currentGroups); 746 } 747 748 // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. 749 private SortedSet<Address> getDefaultServers() { 750 return getDefaultServers(listRSGroups()/* get from rsGroupMap */); 751 } 752 753 // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. 754 private SortedSet<Address> getDefaultServers(List<RSGroupInfo> rsGroupInfoList) { 755 // Build a list of servers in other groups than default group, from rsGroupMap 756 Set<Address> serversInOtherGroup = new HashSet<>(); 757 for (RSGroupInfo group : rsGroupInfoList) { 758 if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group 759 serversInOtherGroup.addAll(group.getServers()); 760 } 761 } 762 763 // Get all online servers from Zookeeper and find out servers in default group 764 SortedSet<Address> defaultServers = Sets.newTreeSet(); 765 for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) { 766 Address server = Address.fromParts(serverName.getHostname(), serverName.getPort()); 767 if (!serversInOtherGroup.contains(server)) { // not in other groups 768 defaultServers.add(server); 769 } 770 } 771 return defaultServers; 772 } 773 774 private class RSGroupStartupWorker extends Thread { 775 private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class); 776 private volatile boolean online = false; 777 778 RSGroupStartupWorker() { 779 super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName()); 780 setDaemon(true); 781 } 782 783 @Override 784 public void run() { 785 if (waitForGroupTableOnline()) { 786 LOG.info("GroupBasedLoadBalancer is now online"); 787 } else { 788 LOG.warn("Quit without making region group table online"); 789 } 790 } 791 792 private boolean waitForGroupTableOnline() { 793 while (isMasterRunning(masterServices)) { 794 try { 795 TableStateManager tsm = masterServices.getTableStateManager(); 796 if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) { 797 createRSGroupTable(); 798 } 799 // try reading from the table 800 FutureUtils.get(conn.getTable(RSGROUP_TABLE_NAME).get(new Get(ROW_KEY))); 801 LOG.info("RSGroup table={} is online, refreshing cached information", RSGROUP_TABLE_NAME); 802 RSGroupInfoManagerImpl.this.refresh(true); 803 online = true; 804 // flush any inconsistencies between ZK and HTable 805 RSGroupInfoManagerImpl.this.flushConfig(); 806 // migrate after we are online. 807 migrate(); 808 return true; 809 } catch (Exception e) { 810 LOG.warn("Failed to perform check", e); 811 // 100ms is short so let's just ignore the interrupt 812 Threads.sleepWithoutInterrupt(100); 813 } 814 } 815 return false; 816 } 817 818 private void createRSGroupTable() throws IOException { 819 OptionalLong optProcId = masterServices.getProcedures().stream() 820 .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p) 821 .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId) 822 .findFirst(); 823 long procId; 824 if (optProcId.isPresent()) { 825 procId = optProcId.getAsLong(); 826 } else { 827 LOG.debug("Creating group table {}", RSGROUP_TABLE_NAME); 828 procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC); 829 } 830 // wait for region to be online 831 int tries = 600; 832 while ( 833 !(masterServices.getMasterProcedureExecutor().isFinished(procId)) 834 && masterServices.getMasterProcedureExecutor().isRunning() && tries > 0 835 ) { 836 try { 837 Thread.sleep(100); 838 } catch (InterruptedException e) { 839 throw new IOException("Wait interrupted ", e); 840 } 841 tries--; 842 } 843 if (tries <= 0) { 844 throw new IOException("Failed to create group table in a given time."); 845 } else { 846 Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId); 847 if (result != null && result.isFailed()) { 848 throw new IOException( 849 "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result)); 850 } 851 } 852 } 853 854 public boolean isOnline() { 855 return online; 856 } 857 } 858 859 private static boolean isMasterRunning(MasterServices masterServices) { 860 return !masterServices.isAborted() && !masterServices.isStopped(); 861 } 862 863 private void multiMutate(List<Mutation> mutations) throws IOException { 864 MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder(); 865 for (Mutation mutation : mutations) { 866 if (mutation instanceof Put) { 867 builder 868 .addMutationRequest(ProtobufUtil.toMutation(MutationProto.MutationType.PUT, mutation)); 869 } else if (mutation instanceof Delete) { 870 builder 871 .addMutationRequest(ProtobufUtil.toMutation(MutationProto.MutationType.DELETE, mutation)); 872 } else { 873 throw new DoNotRetryIOException( 874 "multiMutate doesn't support " + mutation.getClass().getName()); 875 } 876 } 877 MutateRowsRequest request = builder.build(); 878 AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME); 879 LOG.debug("Multimutating {} with {} mutations", RSGROUP_TABLE_NAME, mutations.size()); 880 FutureUtils.get(table.<MultiRowMutationService, MutateRowsResponse> coprocessorService( 881 MultiRowMutationService::newStub, 882 (stub, controller, done) -> stub.mutateRows(controller, request, done), ROW_KEY)); 883 LOG.info("Multimutating {} with {} mutations done", RSGROUP_TABLE_NAME, mutations.size()); 884 } 885 886 private void checkGroupName(String groupName) throws ConstraintException { 887 if (!groupName.matches("[a-zA-Z0-9_]+")) { 888 throw new ConstraintException("RSGroup name should only contain alphanumeric characters"); 889 } 890 } 891 892 @Override 893 public RSGroupInfo getRSGroupForTable(TableName tableName) throws IOException { 894 return holder.tableName2Group.get(tableName); 895 } 896 897 /** 898 * Check if the set of servers are belong to dead servers list or online servers list. 899 * @param servers servers to remove 900 */ 901 private void checkForDeadOrOnlineServers(Set<Address> servers) throws IOException { 902 // This ugliness is because we only have Address, not ServerName. 903 Set<Address> onlineServers = new HashSet<>(); 904 List<ServerName> drainingServers = masterServices.getServerManager().getDrainingServersList(); 905 for (ServerName server : masterServices.getServerManager().getOnlineServers().keySet()) { 906 // Only online but not decommissioned servers are really online 907 if (!drainingServers.contains(server)) { 908 onlineServers.add(server.getAddress()); 909 } 910 } 911 912 Set<Address> deadServers = new HashSet<>(); 913 for (ServerName server : masterServices.getServerManager().getDeadServers().copyServerNames()) { 914 deadServers.add(server.getAddress()); 915 } 916 917 for (Address address : servers) { 918 if (onlineServers.contains(address)) { 919 throw new DoNotRetryIOException( 920 "Server " + address + " is an online server, not allowed to remove."); 921 } 922 if (deadServers.contains(address)) { 923 throw new DoNotRetryIOException("Server " + address + " is on the dead servers list," 924 + " Maybe it will come back again, not allowed to remove."); 925 } 926 } 927 } 928 929 private void checkOnlineServersOnly(Set<Address> servers) throws IOException { 930 // This uglyness is because we only have Address, not ServerName. 931 // Online servers are keyed by ServerName. 932 Set<Address> onlineServers = new HashSet<>(); 933 for (ServerName server : masterServices.getServerManager().getOnlineServers().keySet()) { 934 onlineServers.add(server.getAddress()); 935 } 936 for (Address address : servers) { 937 if (!onlineServers.contains(address)) { 938 throw new DoNotRetryIOException( 939 "Server " + address + " is not an online server in 'default' RSGroup."); 940 } 941 } 942 } 943 944 /** 945 * @return List of Regions associated with this <code>server</code>. 946 */ 947 private List<RegionInfo> getRegions(final Address server) { 948 LinkedList<RegionInfo> regions = new LinkedList<>(); 949 for (Map.Entry<RegionInfo, ServerName> el : masterServices.getAssignmentManager() 950 .getRegionStates().getRegionAssignments().entrySet()) { 951 if (el.getValue() == null) { 952 continue; 953 } 954 955 if (el.getValue().getAddress().equals(server)) { 956 addRegion(regions, el.getKey()); 957 } 958 } 959 for (RegionStateNode state : masterServices.getAssignmentManager().getRegionsInTransition()) { 960 if ( 961 state.getRegionLocation() != null && state.getRegionLocation().getAddress().equals(server) 962 ) { 963 addRegion(regions, state.getRegionInfo()); 964 } 965 } 966 return regions; 967 } 968 969 private void addRegion(final LinkedList<RegionInfo> regions, RegionInfo hri) { 970 // If meta, move it last otherwise other unassigns fail because meta is not 971 // online for them to update state in. This is dodgy. Needs to be made more 972 // robust. See TODO below. 973 if (hri.isMetaRegion()) { 974 regions.addLast(hri); 975 } else { 976 regions.addFirst(hri); 977 } 978 } 979 980 /** 981 * Move every region from servers which are currently located on these servers, but should not be 982 * located there. 983 * @param movedServers the servers that are moved to new group 984 * @param srcGrpServers all servers in the source group, excluding the movedServers 985 * @param targetGroupName the target group 986 * @param sourceGroupName the source group 987 * @throws IOException if moving the server and tables fail 988 */ 989 private void moveServerRegionsFromGroup(Set<Address> movedServers, Set<Address> srcGrpServers, 990 String targetGroupName, String sourceGroupName) throws IOException { 991 moveRegionsBetweenGroups(movedServers, srcGrpServers, targetGroupName, sourceGroupName, 992 rs -> getRegions(rs), info -> { 993 try { 994 String groupName = RSGroupUtil.getRSGroupInfo(masterServices, this, info.getTable()) 995 .map(RSGroupInfo::getName).orElse(RSGroupInfo.DEFAULT_GROUP); 996 return groupName.equals(targetGroupName); 997 } catch (IOException e) { 998 LOG.warn("Failed to test group for region {} and target group {}", info, targetGroupName); 999 return false; 1000 } 1001 }); 1002 } 1003 1004 private <T> void moveRegionsBetweenGroups(Set<T> regionsOwners, Set<Address> newRegionsOwners, 1005 String targetGroupName, String sourceGroupName, Function<T, List<RegionInfo>> getRegionsInfo, 1006 Function<RegionInfo, Boolean> validation) throws IOException { 1007 // Get server names corresponding to given Addresses 1008 List<ServerName> movedServerNames = new ArrayList<>(regionsOwners.size()); 1009 List<ServerName> srcGrpServerNames = new ArrayList<>(newRegionsOwners.size()); 1010 for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) { 1011 // In case region move failed in previous attempt, regionsOwners and newRegionsOwners 1012 // can have the same servers. So for all servers below both conditions to be checked 1013 if (newRegionsOwners.contains(serverName.getAddress())) { 1014 srcGrpServerNames.add(serverName); 1015 } 1016 if (regionsOwners.contains(serverName.getAddress())) { 1017 movedServerNames.add(serverName); 1018 } 1019 } 1020 List<Pair<RegionInfo, Future<byte[]>>> assignmentFutures = new ArrayList<>(); 1021 int retry = 0; 1022 Set<String> failedRegions = new HashSet<>(); 1023 IOException toThrow = null; 1024 do { 1025 assignmentFutures.clear(); 1026 failedRegions.clear(); 1027 for (ServerName owner : movedServerNames) { 1028 // Get regions that are associated with this server and filter regions by group tables. 1029 for (RegionInfo region : getRegionsInfo.apply((T) owner.getAddress())) { 1030 if (!validation.apply(region)) { 1031 LOG.info("Moving region {}, which does not belong to RSGroup {}", 1032 region.getShortNameToLog(), targetGroupName); 1033 // Move region back to source RSGroup servers 1034 ServerName dest = 1035 masterServices.getLoadBalancer().randomAssignment(region, srcGrpServerNames); 1036 if (dest == null) { 1037 failedRegions.add(region.getRegionNameAsString()); 1038 continue; 1039 } 1040 RegionPlan rp = new RegionPlan(region, owner, dest); 1041 try { 1042 Future<byte[]> future = masterServices.getAssignmentManager().moveAsync(rp); 1043 assignmentFutures.add(Pair.newPair(region, future)); 1044 } catch (IOException ioe) { 1045 failedRegions.add(region.getRegionNameAsString()); 1046 LOG.debug("Move region {} failed, will retry, current retry time is {}", 1047 region.getShortNameToLog(), retry, ioe); 1048 toThrow = ioe; 1049 } 1050 } 1051 } 1052 } 1053 waitForRegionMovement(assignmentFutures, failedRegions, sourceGroupName, retry); 1054 if (failedRegions.isEmpty()) { 1055 LOG.info("All regions from {} are moved back to {}", movedServerNames, sourceGroupName); 1056 return; 1057 } else { 1058 try { 1059 wait(1000); 1060 } catch (InterruptedException e) { 1061 LOG.warn("Sleep interrupted", e); 1062 Thread.currentThread().interrupt(); 1063 } 1064 retry++; 1065 } 1066 } while ( 1067 !failedRegions.isEmpty() && retry <= masterServices.getConfiguration() 1068 .getInt(FAILED_MOVE_MAX_RETRY, DEFAULT_MAX_RETRY_VALUE) 1069 ); 1070 1071 // has up to max retry time or there are no more regions to move 1072 if (!failedRegions.isEmpty()) { 1073 // print failed moved regions, for later process conveniently 1074 String msg = String.format("move regions for group %s failed, failed regions: %s", 1075 sourceGroupName, failedRegions); 1076 LOG.error(msg); 1077 throw new DoNotRetryIOException( 1078 msg + ", just record the last failed region's cause, more details in server log", toThrow); 1079 } 1080 } 1081 1082 /** 1083 * Wait for all the region move to complete. Keep waiting for other region movement completion 1084 * even if some region movement fails. 1085 */ 1086 private void waitForRegionMovement(List<Pair<RegionInfo, Future<byte[]>>> regionMoveFutures, 1087 Set<String> failedRegions, String sourceGroupName, int retryCount) { 1088 LOG.info("Moving {} region(s) to group {}, current retry={}", regionMoveFutures.size(), 1089 sourceGroupName, retryCount); 1090 for (Pair<RegionInfo, Future<byte[]>> pair : regionMoveFutures) { 1091 try { 1092 pair.getSecond().get(); 1093 if ( 1094 masterServices.getAssignmentManager().getRegionStates().getRegionState(pair.getFirst()) 1095 .isFailedOpen() 1096 ) { 1097 failedRegions.add(pair.getFirst().getRegionNameAsString()); 1098 } 1099 } catch (InterruptedException e) { 1100 // Dont return form there lets wait for other regions to complete movement. 1101 failedRegions.add(pair.getFirst().getRegionNameAsString()); 1102 LOG.warn("Sleep interrupted", e); 1103 } catch (Exception e) { 1104 failedRegions.add(pair.getFirst().getRegionNameAsString()); 1105 LOG.error("Move region {} to group {} failed, will retry on next attempt", 1106 pair.getFirst().getShortNameToLog(), sourceGroupName, e); 1107 } 1108 } 1109 } 1110 1111 private boolean isTableInGroup(TableName tableName, String groupName, 1112 Set<TableName> tablesInGroupCache) throws IOException { 1113 if (tablesInGroupCache.contains(tableName)) { 1114 return true; 1115 } 1116 if ( 1117 RSGroupUtil.getRSGroupInfo(masterServices, this, tableName).map(RSGroupInfo::getName) 1118 .orElse(RSGroupInfo.DEFAULT_GROUP).equals(groupName) 1119 ) { 1120 tablesInGroupCache.add(tableName); 1121 return true; 1122 } 1123 return false; 1124 } 1125 1126 private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName) 1127 throws IOException { 1128 Map<String, RegionState> rit = Maps.newTreeMap(); 1129 Set<TableName> tablesInGroupCache = new HashSet<>(); 1130 for (RegionStateNode regionNode : masterServices.getAssignmentManager() 1131 .getRegionsInTransition()) { 1132 TableName tn = regionNode.getTable(); 1133 if (isTableInGroup(tn, groupName, tablesInGroupCache)) { 1134 rit.put(regionNode.getRegionInfo().getEncodedName(), regionNode.toRegionState()); 1135 } 1136 } 1137 return rit; 1138 } 1139 1140 /** 1141 * This is an EXPENSIVE clone. Cloning though is the safest thing to do. Can't let out original 1142 * since it can change and at least the load balancer wants to iterate this exported list. Load 1143 * balancer should iterate over this list because cloned list will ignore disabled table and split 1144 * parent region cases. This method is invoked by {@link #balanceRSGroup} 1145 * @return A clone of current assignments for this group. 1146 */ 1147 Map<TableName, Map<ServerName, List<RegionInfo>>> getRSGroupAssignmentsByTable( 1148 TableStateManager tableStateManager, String groupName) throws IOException { 1149 Map<TableName, Map<ServerName, List<RegionInfo>>> result = Maps.newHashMap(); 1150 Set<TableName> tablesInGroupCache = new HashSet<>(); 1151 for (Map.Entry<RegionInfo, ServerName> entry : masterServices.getAssignmentManager() 1152 .getRegionStates().getRegionAssignments().entrySet()) { 1153 RegionInfo region = entry.getKey(); 1154 TableName tn = region.getTable(); 1155 ServerName server = entry.getValue(); 1156 if (isTableInGroup(tn, groupName, tablesInGroupCache)) { 1157 if ( 1158 tableStateManager.isTableState(tn, TableState.State.DISABLED, TableState.State.DISABLING) 1159 ) { 1160 continue; 1161 } 1162 if (region.isSplitParent()) { 1163 continue; 1164 } 1165 result.computeIfAbsent(tn, k -> new HashMap<>()) 1166 .computeIfAbsent(server, k -> new ArrayList<>()).add(region); 1167 } 1168 } 1169 RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName); 1170 for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) { 1171 if (rsGroupInfo.containsServer(serverName.getAddress())) { 1172 for (Map<ServerName, List<RegionInfo>> map : result.values()) { 1173 map.computeIfAbsent(serverName, k -> Collections.emptyList()); 1174 } 1175 } 1176 } 1177 return result; 1178 } 1179 1180 @Override 1181 public BalanceResponse balanceRSGroup(String groupName, BalanceRequest request) 1182 throws IOException { 1183 ServerManager serverManager = masterServices.getServerManager(); 1184 LoadBalancer balancer = masterServices.getLoadBalancer(); 1185 getRSGroupInfo(groupName); 1186 1187 BalanceResponse.Builder responseBuilder = BalanceResponse.newBuilder(); 1188 1189 synchronized (balancer) { 1190 // If balance not true, don't run balancer. 1191 if (!masterServices.isBalancerOn() && !request.isDryRun()) { 1192 return responseBuilder.build(); 1193 } 1194 1195 // Only allow one balance run at at time. 1196 Map<String, RegionState> groupRIT = rsGroupGetRegionsInTransition(groupName); 1197 if (groupRIT.size() > 0 && !request.isIgnoreRegionsInTransition()) { 1198 LOG.debug("Not running balancer because {} region(s) in transition: {}", groupRIT.size(), 1199 StringUtils.abbreviate(masterServices.getAssignmentManager().getRegionStates() 1200 .getRegionsInTransition().toString(), 256)); 1201 return responseBuilder.build(); 1202 } 1203 1204 if (serverManager.areDeadServersInProgress()) { 1205 LOG.debug("Not running balancer because processing dead regionserver(s): {}", 1206 serverManager.getDeadServers()); 1207 return responseBuilder.build(); 1208 } 1209 1210 // We balance per group instead of per table 1211 Map<TableName, Map<ServerName, List<RegionInfo>>> assignmentsByTable = 1212 getRSGroupAssignmentsByTable(masterServices.getTableStateManager(), groupName); 1213 List<RegionPlan> plans = balancer.balanceCluster(assignmentsByTable); 1214 boolean balancerRan = !plans.isEmpty(); 1215 1216 responseBuilder.setBalancerRan(balancerRan).setMovesCalculated(plans.size()); 1217 1218 if (balancerRan && !request.isDryRun()) { 1219 LOG.info("RSGroup balance {} starting with plan count: {}", groupName, plans.size()); 1220 List<RegionPlan> executed = masterServices.executeRegionPlansWithThrottling(plans); 1221 responseBuilder.setMovesExecuted(executed.size()); 1222 LOG.info("RSGroup balance " + groupName + " completed"); 1223 } 1224 1225 return responseBuilder.build(); 1226 } 1227 } 1228 1229 private void moveTablesAndWait(Set<TableName> tables, String targetGroup) throws IOException { 1230 LOG.debug("Moving {} tables to target group {}", tables.size(), targetGroup); 1231 List<Long> procIds = new ArrayList<Long>(); 1232 for (TableName tableName : tables) { 1233 TableDescriptor oldTd = masterServices.getTableDescriptors().get(tableName); 1234 if (oldTd == null) { 1235 continue; 1236 } 1237 TableDescriptor newTd = 1238 TableDescriptorBuilder.newBuilder(oldTd).setRegionServerGroup(targetGroup).build(); 1239 procIds.add( 1240 masterServices.modifyTable(tableName, newTd, HConstants.NO_NONCE, HConstants.NO_NONCE)); 1241 } 1242 for (long procId : procIds) { 1243 Procedure<?> proc = masterServices.getMasterProcedureExecutor().getProcedure(procId); 1244 if (proc == null) { 1245 continue; 1246 } 1247 ProcedureSyncWait.waitForProcedureToCompleteIOE(masterServices.getMasterProcedureExecutor(), 1248 proc, Long.MAX_VALUE); 1249 } 1250 LOG.info("Move tables done: moved {} tables to {}", tables.size(), targetGroup); 1251 if (LOG.isDebugEnabled()) { 1252 LOG.debug("Tables moved to {}: {}", targetGroup, tables); 1253 } 1254 } 1255 1256 @Override 1257 public void setRSGroup(Set<TableName> tables, String groupName) throws IOException { 1258 getRSGroupInfo(groupName); 1259 moveTablesAndWait(tables, groupName); 1260 } 1261 1262 public void moveServers(Set<Address> servers, String targetGroupName) throws IOException { 1263 if (servers == null) { 1264 throw new ConstraintException("The list of servers to move cannot be null."); 1265 } 1266 if (servers.isEmpty()) { 1267 // For some reason this difference between null servers and isEmpty is important distinction. 1268 // TODO. Why? Stuff breaks if I equate them. 1269 return; 1270 } 1271 if (StringUtils.isEmpty(targetGroupName)) { 1272 throw new ConstraintException("RSGroup cannot be null."); 1273 } 1274 1275 // Hold a lock on the manager instance while moving servers to prevent 1276 // another writer changing our state while we are working. 1277 synchronized (this) { 1278 // Presume first server's source group. Later ensure all servers are from this group. 1279 Address firstServer = servers.iterator().next(); 1280 RSGroupInfo srcGrp = getRSGroupOfServer(firstServer); 1281 if (srcGrp == null) { 1282 // Be careful. This exception message is tested for in TestRSGroupAdmin2... 1283 throw new ConstraintException( 1284 "Server " + firstServer + " is either offline or it does not exist."); 1285 } 1286 1287 // Only move online servers (when moving from 'default') or servers from other 1288 // groups. This prevents bogus servers from entering groups 1289 if (RSGroupInfo.DEFAULT_GROUP.equals(srcGrp.getName())) { 1290 if (srcGrp.getServers().size() <= servers.size()) { 1291 throw new ConstraintException(KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE); 1292 } 1293 checkOnlineServersOnly(servers); 1294 } 1295 // Ensure all servers are of same rsgroup. 1296 for (Address server : servers) { 1297 String tmpGroup = getRSGroupOfServer(server).getName(); 1298 if (!tmpGroup.equals(srcGrp.getName())) { 1299 throw new ConstraintException("Move server request should only come from one source " 1300 + "RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup); 1301 } 1302 } 1303 if (srcGrp.getServers().size() <= servers.size()) { 1304 // check if there are still tables reference this group 1305 for (TableDescriptor td : masterServices.getTableDescriptors().getAll().values()) { 1306 Optional<String> optGroupName = td.getRegionServerGroup(); 1307 if (optGroupName.isPresent() && optGroupName.get().equals(srcGrp.getName())) { 1308 throw new ConstraintException( 1309 "Cannot leave a RSGroup " + srcGrp.getName() + " that contains tables('" 1310 + td.getTableName() + "' at least) without servers to host them."); 1311 } 1312 } 1313 } 1314 1315 // MovedServers may be < passed in 'servers'. 1316 Set<Address> movedServers = moveServers(servers, srcGrp.getName(), targetGroupName); 1317 moveServerRegionsFromGroup(movedServers, srcGrp.getServers(), targetGroupName, 1318 srcGrp.getName()); 1319 LOG.info("Move servers done: moved {} servers from {} to {}", movedServers.size(), 1320 srcGrp.getName(), targetGroupName); 1321 if (LOG.isDebugEnabled()) { 1322 LOG.debug("Servers moved from {} to {}: {}", srcGrp.getName(), targetGroupName, 1323 movedServers); 1324 } 1325 } 1326 } 1327 1328 @Override 1329 public String determineRSGroupInfoForTable(TableName tableName) { 1330 return script.getRSGroup(tableName.getNamespaceAsString(), tableName.getQualifierAsString()); 1331 } 1332 1333 @Override 1334 public synchronized void renameRSGroup(String oldName, String newName) throws IOException { 1335 if (oldName.equals(RSGroupInfo.DEFAULT_GROUP)) { 1336 throw new ConstraintException(RSGroupInfo.DEFAULT_GROUP + " can't be rename"); 1337 } 1338 checkGroupName(newName); 1339 // getRSGroupInfo validates old RSGroup existence. 1340 RSGroupInfo oldRSG = getRSGroupInfo(oldName); 1341 Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group; 1342 if (rsGroupMap.containsKey(newName)) { 1343 throw new ConstraintException("Group already exists: " + newName); 1344 } 1345 1346 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 1347 newGroupMap.remove(oldRSG.getName()); 1348 RSGroupInfo newRSG = new RSGroupInfo(newName, oldRSG.getServers()); 1349 newGroupMap.put(newName, newRSG); 1350 flushConfig(newGroupMap); 1351 Set<TableName> updateTables = masterServices.getTableDescriptors().getAll().values().stream() 1352 .filter(t -> oldName.equals(t.getRegionServerGroup().orElse(null))) 1353 .map(TableDescriptor::getTableName).collect(Collectors.toSet()); 1354 setRSGroup(updateTables, newName); 1355 LOG.info("Rename RSGroup done: {} => {}", oldName, newName); 1356 } 1357 1358 @Override 1359 public synchronized void updateRSGroupConfig(String groupName, Map<String, String> configuration) 1360 throws IOException { 1361 if (RSGroupInfo.DEFAULT_GROUP.equals(groupName)) { 1362 // We do not persist anything of default group, therefore, it is not supported to update 1363 // default group's configuration which lost once master down. 1364 throw new ConstraintException( 1365 "configuration of " + RSGroupInfo.DEFAULT_GROUP + " can't be stored persistently"); 1366 } 1367 RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName); 1368 rsGroupInfo.getConfiguration().forEach((k, v) -> rsGroupInfo.removeConfiguration(k)); 1369 configuration.forEach((k, v) -> rsGroupInfo.setConfiguration(k, v)); 1370 flushConfig(); 1371 } 1372}