001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.BufferedInputStream; 021import java.io.BufferedOutputStream; 022import java.io.Closeable; 023import java.io.DataInputStream; 024import java.io.DataOutputStream; 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.net.InetAddress; 030import java.nio.file.Files; 031import java.nio.file.Paths; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Collection; 035import java.util.Collections; 036import java.util.EnumSet; 037import java.util.HashSet; 038import java.util.Iterator; 039import java.util.List; 040import java.util.Locale; 041import java.util.Optional; 042import java.util.Set; 043import java.util.concurrent.Callable; 044import java.util.concurrent.CancellationException; 045import java.util.concurrent.ExecutionException; 046import java.util.concurrent.ExecutorService; 047import java.util.concurrent.Executors; 048import java.util.concurrent.Future; 049import java.util.concurrent.TimeUnit; 050import java.util.concurrent.TimeoutException; 051import java.util.function.Predicate; 052import org.apache.commons.io.IOUtils; 053import org.apache.hadoop.conf.Configuration; 054import org.apache.hadoop.hbase.ClusterMetrics.Option; 055import org.apache.hadoop.hbase.HBaseConfiguration; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HRegionLocation; 058import org.apache.hadoop.hbase.MetaTableAccessor; 059import org.apache.hadoop.hbase.ServerName; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.UnknownRegionException; 062import org.apache.hadoop.hbase.client.Admin; 063import org.apache.hadoop.hbase.client.Connection; 064import org.apache.hadoop.hbase.client.ConnectionFactory; 065import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 066import org.apache.hadoop.hbase.client.RegionInfo; 067import org.apache.hadoop.hbase.client.RegionInfoBuilder; 068import org.apache.hadoop.hbase.client.Result; 069import org.apache.hadoop.hbase.master.RackManager; 070import org.apache.hadoop.hbase.master.RegionState; 071import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 072import org.apache.hadoop.hbase.net.Address; 073import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 074import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 075import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 076import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses; 082import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 083import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 084 085/** 086 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command 087 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode 088 * acknowledges if regions are online after movement while noAck mode is best effort mode that 089 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck 090 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it 091 * anyways. This can also be used by constructiong an Object using the builder and then calling 092 * {@link #load()} or {@link #unload()} methods for the desired operations. 093 */ 094@InterfaceAudience.Public 095public class RegionMover extends AbstractHBaseTool implements Closeable { 096 public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max"; 097 public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max"; 098 public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max"; 099 public static final int DEFAULT_MOVE_RETRIES_MAX = 5; 100 public static final int DEFAULT_MOVE_WAIT_MAX = 60; 101 public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180; 102 103 private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class); 104 105 private RegionMoverBuilder rmbuilder; 106 private boolean ack = true; 107 private int maxthreads = 1; 108 private int timeout; 109 private List<String> isolateRegionIdArray; 110 private String loadUnload; 111 private String hostname; 112 private String filename; 113 private String excludeFile; 114 private String designatedFile; 115 private int port; 116 private Connection conn; 117 private Admin admin; 118 private RackManager rackManager; 119 120 private RegionMover(RegionMoverBuilder builder) throws IOException { 121 this.hostname = builder.hostname; 122 this.filename = builder.filename; 123 this.excludeFile = builder.excludeFile; 124 this.designatedFile = builder.designatedFile; 125 this.maxthreads = builder.maxthreads; 126 this.isolateRegionIdArray = builder.isolateRegionIdArray; 127 this.ack = builder.ack; 128 this.port = builder.port; 129 this.timeout = builder.timeout; 130 setConf(builder.conf); 131 this.conn = ConnectionFactory.createConnection(conf); 132 this.admin = conn.getAdmin(); 133 134 // if the hostname of master is ip, it indicates that the master/RS has enabled use-ip, we need 135 // to resolve the current hostname to ip to ensure that the RegionMover logic can be executed 136 // normally, see HBASE-27304 for details. 137 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); 138 if (InetAddresses.isInetAddress(master.getHostname())) { 139 if (!InetAddresses.isInetAddress(this.hostname)) { 140 this.hostname = InetAddress.getByName(this.hostname).getHostAddress(); 141 } 142 } 143 144 // Only while running unit tests, builder.rackManager will not be null for the convenience of 145 // providing custom rackManager. Otherwise for regular workflow/user triggered action, 146 // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is 147 // provided as @InterfaceAudience.Private and it is commented that this is just 148 // to be used by unit test. 149 rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager; 150 } 151 152 private RegionMover() { 153 } 154 155 @Override 156 public void close() { 157 IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e)); 158 IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e)); 159 } 160 161 /** 162 * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has 163 * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)}, 164 * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set 165 * the corresponding options. 166 */ 167 public static class RegionMoverBuilder { 168 private boolean ack = true; 169 private int maxthreads = 1; 170 private int timeout = Integer.MAX_VALUE; 171 private List<String> isolateRegionIdArray = new ArrayList<>(); 172 private String hostname; 173 private String filename; 174 private String excludeFile = null; 175 private String designatedFile = null; 176 private String defaultDir = System.getProperty("java.io.tmpdir"); 177 @InterfaceAudience.Private 178 final int port; 179 private final Configuration conf; 180 private RackManager rackManager; 181 182 public RegionMoverBuilder(String hostname) { 183 this(hostname, createConf()); 184 } 185 186 /** 187 * Creates a new configuration and sets region mover specific overrides 188 */ 189 private static Configuration createConf() { 190 Configuration conf = HBaseConfiguration.create(); 191 conf.setInt("hbase.client.prefetch.limit", 1); 192 conf.setInt("hbase.client.pause", 500); 193 conf.setInt("hbase.client.retries.number", 100); 194 return conf; 195 } 196 197 /** 198 * @param hostname Hostname to unload regions from or load regions to. Can be either hostname or 199 * hostname:port. 200 * @param conf Configuration object 201 */ 202 public RegionMoverBuilder(String hostname, Configuration conf) { 203 String[] splitHostname = hostname.toLowerCase().split(":"); 204 this.hostname = splitHostname[0]; 205 if (splitHostname.length == 2) { 206 this.port = Integer.parseInt(splitHostname[1]); 207 } else { 208 this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); 209 } 210 this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname 211 + ":" + Integer.toString(this.port); 212 this.conf = conf; 213 } 214 215 /** 216 * Path of file where regions will be written to during unloading/read from during loading 217 * @return RegionMoverBuilder object 218 */ 219 public RegionMoverBuilder filename(String filename) { 220 this.filename = filename; 221 return this; 222 } 223 224 /** 225 * Set the max number of threads that will be used to move regions 226 */ 227 public RegionMoverBuilder maxthreads(int threads) { 228 this.maxthreads = threads; 229 return this; 230 } 231 232 /** 233 * Set the region ID to isolate on the region server. 234 */ 235 public RegionMoverBuilder isolateRegionIdArray(List<String> isolateRegionIdArray) { 236 this.isolateRegionIdArray = isolateRegionIdArray; 237 return this; 238 } 239 240 /** 241 * Path of file containing hostnames to be excluded during region movement. Exclude file should 242 * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single 243 * host. 244 */ 245 public RegionMoverBuilder excludeFile(String excludefile) { 246 this.excludeFile = excludefile; 247 return this; 248 } 249 250 /** 251 * Set the designated file. Designated file contains hostnames where region moves. Designated 252 * file should have 'host:port' per line. Port is mandatory here as we can have many RS running 253 * on a single host. 254 * @param designatedFile The designated file 255 * @return RegionMoverBuilder object 256 */ 257 public RegionMoverBuilder designatedFile(String designatedFile) { 258 this.designatedFile = designatedFile; 259 return this; 260 } 261 262 /** 263 * Set ack/noAck mode. 264 * <p> 265 * In ack mode regions are acknowledged before and after moving and the move is retried 266 * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best 267 * effort mode,each region movement is tried once.This can be used during graceful shutdown as 268 * even if we have a stuck region,upon shutdown it'll be reassigned anyway. 269 * <p> 270 * @return RegionMoverBuilder object 271 */ 272 public RegionMoverBuilder ack(boolean ack) { 273 this.ack = ack; 274 return this; 275 } 276 277 /** 278 * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for 279 * movers also have a separate time which is hbase.move.wait.max * number of regions to 280 * load/unload 281 * @param timeout in seconds 282 * @return RegionMoverBuilder object 283 */ 284 public RegionMoverBuilder timeout(int timeout) { 285 this.timeout = timeout; 286 return this; 287 } 288 289 /** 290 * Set specific rackManager implementation. This setter method is for testing purpose only. 291 * @param rackManager rackManager impl 292 * @return RegionMoverBuilder object 293 */ 294 @InterfaceAudience.Private 295 public RegionMoverBuilder rackManager(RackManager rackManager) { 296 this.rackManager = rackManager; 297 return this; 298 } 299 300 /** 301 * This method builds the appropriate RegionMover object which can then be used to load/unload 302 * using load and unload methods 303 * @return RegionMover object 304 */ 305 public RegionMover build() throws IOException { 306 return new RegionMover(this); 307 } 308 } 309 310 /** 311 * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover 312 * Object has to be created using {@link #RegionMover(RegionMoverBuilder)} 313 * @return true if loading succeeded, false otherwise 314 */ 315 public boolean load() throws ExecutionException, InterruptedException, TimeoutException { 316 ExecutorService loadPool = Executors.newFixedThreadPool(1); 317 Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan()); 318 boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading"); 319 if (!isMetaMoved) { 320 return false; 321 } 322 loadPool = Executors.newFixedThreadPool(1); 323 loadTask = loadPool.submit(getNonMetaRegionsMovePlan()); 324 return waitTaskToFinish(loadPool, loadTask, "loading"); 325 } 326 327 private Callable<Boolean> getMetaRegionMovePlan() { 328 return getRegionsMovePlan(true); 329 } 330 331 private Callable<Boolean> getNonMetaRegionsMovePlan() { 332 return getRegionsMovePlan(false); 333 } 334 335 private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) { 336 return () -> { 337 try { 338 List<RegionInfo> regionsToMove = readRegionsFromFile(filename); 339 if (regionsToMove.isEmpty()) { 340 LOG.info("No regions to load.Exiting"); 341 return true; 342 } 343 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove); 344 if (moveMetaRegion) { 345 if (metaRegion.isPresent()) { 346 loadRegions(Collections.singletonList(metaRegion.get())); 347 } 348 } else { 349 metaRegion.ifPresent(regionsToMove::remove); 350 loadRegions(regionsToMove); 351 } 352 } catch (Exception e) { 353 LOG.error("Error while loading regions to " + hostname, e); 354 return false; 355 } 356 return true; 357 }; 358 } 359 360 private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) { 361 return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst(); 362 } 363 364 private void loadRegions(List<RegionInfo> regionsToMove) throws Exception { 365 ServerName server = getTargetServer(); 366 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 367 LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using " 368 + this.maxthreads + " threads.Ack mode:" + this.ack); 369 370 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 371 List<Future<Boolean>> taskList = new ArrayList<>(); 372 int counter = 0; 373 while (counter < regionsToMove.size()) { 374 RegionInfo region = regionsToMove.get(counter); 375 ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn); 376 if (currentServer == null) { 377 LOG 378 .warn("Could not get server for Region:" + region.getRegionNameAsString() + " moving on"); 379 counter++; 380 continue; 381 } else if (server.equals(currentServer)) { 382 LOG.info( 383 "Region " + region.getRegionNameAsString() + " is already on target server=" + server); 384 counter++; 385 continue; 386 } 387 if (ack) { 388 Future<Boolean> task = moveRegionsPool 389 .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions)); 390 taskList.add(task); 391 } else { 392 Future<Boolean> task = moveRegionsPool 393 .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions)); 394 taskList.add(task); 395 } 396 counter++; 397 } 398 399 moveRegionsPool.shutdown(); 400 long timeoutInSeconds = regionsToMove.size() 401 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 402 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 403 } 404 405 /** 406 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 407 * noAck mode we do not make sure that region is successfully online on the target region 408 * server,hence it is best effort.We do not unload regions to hostnames given in 409 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 410 * to hostnames provided in {@link #designatedFile} 411 * @return true if unloading succeeded, false otherwise 412 */ 413 public boolean unload() throws InterruptedException, ExecutionException, TimeoutException { 414 return unloadRegions(false); 415 } 416 417 /** 418 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 419 * noAck mode we do not make sure that region is successfully online on the target region 420 * server,hence it is best effort.We do not unload regions to hostnames given in 421 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 422 * to hostnames provided in {@link #designatedFile}. While unloading regions, destination 423 * RegionServers are selected from different rack i.e regions should not move to any RegionServers 424 * that belong to same rack as source RegionServer. 425 * @return true if unloading succeeded, false otherwise 426 */ 427 public boolean unloadFromRack() 428 throws InterruptedException, ExecutionException, TimeoutException { 429 return unloadRegions(true); 430 } 431 432 private boolean unloadRegions(boolean unloadFromRack) 433 throws ExecutionException, InterruptedException, TimeoutException { 434 return unloadRegions(unloadFromRack, null); 435 } 436 437 /** 438 * Isolated regions specified in {@link #isolateRegionIdArray} on {@link #hostname} in ack Mode 439 * and Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}. 440 * In noAck mode we do not make sure that region is successfully online on the target region 441 * server,hence it is the best effort. We do not unload regions to hostnames given in 442 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 443 * to hostnames provided in {@link #designatedFile} 444 * @return true if region isolation succeeded, false otherwise 445 */ 446 public boolean isolateRegions() 447 throws ExecutionException, InterruptedException, TimeoutException { 448 return unloadRegions(false, isolateRegionIdArray); 449 } 450 451 private boolean unloadRegions(boolean unloadFromRack, List<String> isolateRegionIdArray) 452 throws InterruptedException, ExecutionException, TimeoutException { 453 deleteFile(this.filename); 454 ExecutorService unloadPool = Executors.newFixedThreadPool(1); 455 Future<Boolean> unloadTask = unloadPool.submit(() -> { 456 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 457 try { 458 // Get Online RegionServers 459 List<ServerName> regionServers = new ArrayList<>(); 460 RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port)); 461 LOG.info("{} belongs to {}", hostname, rsgroup.getName()); 462 regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers())); 463 // Remove the host Region server from target Region Servers list 464 ServerName server = stripServer(regionServers, hostname, port); 465 if (server == null) { 466 LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.", 467 hostname, port); 468 LOG.debug("List of region servers: {}", regionServers); 469 return false; 470 } 471 // Remove RS not present in the designated file 472 includeExcludeRegionServers(designatedFile, regionServers, true); 473 474 // Remove RS present in the exclude file 475 includeExcludeRegionServers(excludeFile, regionServers, false); 476 477 if (unloadFromRack) { 478 // remove regionServers that belong to same rack (as source host) since the goal is to 479 // unload regions from source regionServer to destination regionServers 480 // that belong to different rack only. 481 String sourceRack = rackManager.getRack(server); 482 List<String> racks = rackManager.getRack(regionServers); 483 Iterator<ServerName> iterator = regionServers.iterator(); 484 int i = 0; 485 while (iterator.hasNext()) { 486 iterator.next(); 487 if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) { 488 iterator.remove(); 489 } 490 i++; 491 } 492 } 493 494 // Remove decommissioned RS 495 Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers()); 496 if (CollectionUtils.isNotEmpty(decommissionedRS)) { 497 regionServers.removeIf(decommissionedRS::contains); 498 LOG.debug("Excluded RegionServers from unloading regions to because they " 499 + "are marked as decommissioned. Servers: {}", decommissionedRS); 500 } 501 502 stripMaster(regionServers); 503 if (regionServers.isEmpty()) { 504 LOG.warn("No Regions were moved - no servers available"); 505 return false; 506 } else { 507 LOG.info("Available servers {}", regionServers); 508 } 509 unloadRegions(server, regionServers, movedRegions, isolateRegionIdArray); 510 } catch (Exception e) { 511 LOG.error("Error while unloading regions ", e); 512 return false; 513 } finally { 514 if (movedRegions != null) { 515 writeFile(filename, movedRegions); 516 } 517 } 518 return true; 519 }); 520 return waitTaskToFinish(unloadPool, unloadTask, "unloading"); 521 } 522 523 @InterfaceAudience.Private 524 Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup, 525 Collection<ServerName> onlineServers) { 526 if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 527 return onlineServers; 528 } 529 List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size()); 530 for (ServerName server : onlineServers) { 531 Address address = Address.fromParts(server.getHostname(), server.getPort()); 532 if (rsgroup.containsServer(address)) { 533 serverLists.add(server); 534 } 535 } 536 return serverLists; 537 } 538 539 private void unloadRegions(ServerName server, List<ServerName> regionServers, 540 List<RegionInfo> movedRegions, List<String> isolateRegionIdArray) throws Exception { 541 while (true) { 542 List<RegionInfo> isolateRegionInfoList = Collections.synchronizedList(new ArrayList<>()); 543 RegionInfo isolateRegionInfo = null; 544 if (isolateRegionIdArray != null && !isolateRegionIdArray.isEmpty()) { 545 // Region will be moved to target region server with Ack mode. 546 final ExecutorService isolateRegionPool = Executors.newFixedThreadPool(maxthreads); 547 List<Future<Boolean>> isolateRegionTaskList = new ArrayList<>(); 548 List<RegionInfo> recentlyIsolatedRegion = Collections.synchronizedList(new ArrayList<>()); 549 boolean allRegionOpsSuccessful = true; 550 boolean isMetaIsolated = false; 551 RegionInfo metaRegionInfo = RegionInfoBuilder.FIRST_META_REGIONINFO; 552 List<HRegionLocation> hRegionLocationRegionIsolation = 553 Collections.synchronizedList(new ArrayList<>()); 554 for (String isolateRegionId : isolateRegionIdArray) { 555 if (isolateRegionId.equalsIgnoreCase(metaRegionInfo.getEncodedName())) { 556 isMetaIsolated = true; 557 continue; 558 } 559 Result result = MetaTableAccessor.scanByRegionEncodedName(conn, isolateRegionId); 560 HRegionLocation hRegionLocation = 561 MetaTableAccessor.getRegionLocation(conn, result.getRow()); 562 if (hRegionLocation != null) { 563 hRegionLocationRegionIsolation.add(hRegionLocation); 564 } else { 565 LOG.error("Region " + isolateRegionId + " doesn't exists/can't fetch from" 566 + " meta...Quitting now"); 567 // We only move the regions if all the regions were found. 568 allRegionOpsSuccessful = false; 569 break; 570 } 571 } 572 573 if (!allRegionOpsSuccessful) { 574 break; 575 } 576 // If hbase:meta region was isolated, then it needs to be part of isolateRegionInfoList. 577 if (isMetaIsolated) { 578 ZKWatcher zkWatcher = new ZKWatcher(conf, null, null); 579 List<HRegionLocation> result = new ArrayList<>(); 580 for (String znode : zkWatcher.getMetaReplicaNodes()) { 581 String path = ZNodePaths.joinZNode(zkWatcher.getZNodePaths().baseZNode, znode); 582 int replicaId = zkWatcher.getZNodePaths().getMetaReplicaIdFromPath(path); 583 RegionState state = MetaTableLocator.getMetaRegionState(zkWatcher, replicaId); 584 result.add(new HRegionLocation(state.getRegion(), state.getServerName())); 585 } 586 ServerName metaSeverName = result.get(0).getServerName(); 587 // For isolating hbase:meta, it should move explicitly in Ack mode, 588 // hence the forceMoveRegionByAck = true. 589 if (!metaSeverName.equals(server)) { 590 LOG.info("Region of {} {} is on server {} moving to {}", TableName.META_TABLE_NAME, 591 metaRegionInfo.getEncodedName(), metaSeverName, server); 592 submitRegionMovesWhileUnloading(metaSeverName, Collections.singletonList(server), 593 movedRegions, Collections.singletonList(metaRegionInfo), true); 594 } else { 595 LOG.info("Region of {} {} already exists on server: {}", TableName.META_TABLE_NAME, 596 metaRegionInfo.getEncodedName(), server); 597 } 598 isolateRegionInfoList.add(RegionInfoBuilder.FIRST_META_REGIONINFO); 599 } 600 601 if (!hRegionLocationRegionIsolation.isEmpty()) { 602 for (HRegionLocation hRegionLocation : hRegionLocationRegionIsolation) { 603 isolateRegionInfo = hRegionLocation.getRegion(); 604 isolateRegionInfoList.add(isolateRegionInfo); 605 if (hRegionLocation.getServerName() == server) { 606 LOG.info("Region " + hRegionLocation.getRegion().getEncodedName() + " already exists" 607 + " on server : " + server.getHostname()); 608 } else { 609 Future<Boolean> isolateRegionTask = 610 isolateRegionPool.submit(new MoveWithAck(conn, isolateRegionInfo, 611 hRegionLocation.getServerName(), server, recentlyIsolatedRegion)); 612 isolateRegionTaskList.add(isolateRegionTask); 613 } 614 } 615 } 616 617 if (!isolateRegionTaskList.isEmpty()) { 618 isolateRegionPool.shutdown(); 619 // Now that we have fetched all the region's regionInfo, we can move them. 620 waitMoveTasksToFinish(isolateRegionPool, isolateRegionTaskList, 621 admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX)); 622 623 Set<RegionInfo> currentRegionsOnTheServer = new HashSet<>(admin.getRegions(server)); 624 if (!currentRegionsOnTheServer.containsAll(isolateRegionInfoList)) { 625 // If all the regions are not online on the target server, 626 // we don't put RS in decommission mode and exit from here. 627 LOG.error("One of the Region move failed OR stuck in transition...Quitting now"); 628 break; 629 } 630 } else { 631 LOG.info("All regions already exists on server : " + server.getHostname()); 632 } 633 // Once region has been moved to target RS, put the target RS into decommission mode, 634 // so master doesn't assign new region to the target RS while we unload the target RS. 635 // Also pass 'offload' flag as false since we don't want master to offload the target RS. 636 List<ServerName> listOfServer = new ArrayList<>(); 637 listOfServer.add(server); 638 LOG.info("Putting server : " + server.getHostname() + " in decommission/draining mode"); 639 admin.decommissionRegionServers(listOfServer, false); 640 } 641 List<RegionInfo> regionsToMove = admin.getRegions(server); 642 // Remove all the regions from the online Region list, that we just isolated. 643 // This will also include hbase:meta if it was isolated. 644 regionsToMove.removeAll(isolateRegionInfoList); 645 regionsToMove.removeAll(movedRegions); 646 if (regionsToMove.isEmpty()) { 647 LOG.info("No Regions to move....Quitting now"); 648 break; 649 } 650 LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}", 651 regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack); 652 653 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove); 654 if (metaRegion.isPresent()) { 655 RegionInfo meta = metaRegion.get(); 656 // hbase:meta should move explicitly in Ack mode. 657 submitRegionMovesWhileUnloading(server, regionServers, movedRegions, 658 Collections.singletonList(meta), true); 659 regionsToMove.remove(meta); 660 } 661 submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove, false); 662 } 663 } 664 665 private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers, 666 List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove, boolean forceMoveRegionByAck) 667 throws Exception { 668 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 669 List<Future<Boolean>> taskList = new ArrayList<>(); 670 int serverIndex = 0; 671 for (RegionInfo regionToMove : regionsToMove) { 672 // To move/isolate hbase:meta on a server, it should happen explicitly by Ack mode, hence the 673 // forceMoveRegionByAck = true. 674 if (ack || forceMoveRegionByAck) { 675 Future<Boolean> task = moveRegionsPool.submit(new MoveWithAck(conn, regionToMove, server, 676 regionServers.get(serverIndex), movedRegions)); 677 taskList.add(task); 678 } else { 679 Future<Boolean> task = moveRegionsPool.submit(new MoveWithoutAck(admin, regionToMove, 680 server, regionServers.get(serverIndex), movedRegions)); 681 taskList.add(task); 682 } 683 serverIndex = (serverIndex + 1) % regionServers.size(); 684 } 685 moveRegionsPool.shutdown(); 686 long timeoutInSeconds = regionsToMove.size() 687 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 688 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 689 } 690 691 private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation) 692 throws TimeoutException, InterruptedException, ExecutionException { 693 pool.shutdown(); 694 try { 695 if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { 696 LOG.warn("Timed out before finishing the " + operation + " operation. Timeout: " 697 + this.timeout + "sec"); 698 pool.shutdownNow(); 699 } 700 } catch (InterruptedException e) { 701 pool.shutdownNow(); 702 Thread.currentThread().interrupt(); 703 } 704 try { 705 return task.get(5, TimeUnit.SECONDS); 706 } catch (InterruptedException e) { 707 LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e); 708 throw e; 709 } catch (ExecutionException e) { 710 LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e); 711 throw e; 712 } 713 } 714 715 private void waitMoveTasksToFinish(ExecutorService moveRegionsPool, 716 List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception { 717 try { 718 if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { 719 moveRegionsPool.shutdownNow(); 720 } 721 } catch (InterruptedException e) { 722 moveRegionsPool.shutdownNow(); 723 Thread.currentThread().interrupt(); 724 } 725 for (Future<Boolean> future : taskList) { 726 try { 727 // if even after shutdownNow threads are stuck we wait for 5 secs max 728 if (!future.get(5, TimeUnit.SECONDS)) { 729 LOG.error("Was Not able to move region....Exiting Now"); 730 throw new Exception("Could not move region Exception"); 731 } 732 } catch (InterruptedException e) { 733 LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); 734 throw e; 735 } catch (ExecutionException e) { 736 boolean ignoreFailure = ignoreRegionMoveFailure(e); 737 if (ignoreFailure) { 738 LOG.debug("Ignore region move failure, it might have been split/merged.", e); 739 } else { 740 LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e); 741 throw e; 742 } 743 } catch (CancellationException e) { 744 LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds 745 + "secs", e); 746 throw e; 747 } 748 } 749 } 750 751 private boolean ignoreRegionMoveFailure(ExecutionException e) { 752 boolean ignoreFailure = false; 753 if (e.getCause() instanceof UnknownRegionException) { 754 // region does not exist anymore 755 ignoreFailure = true; 756 } else if ( 757 e.getCause() instanceof DoNotRetryRegionException && e.getCause().getMessage() != null 758 && e.getCause().getMessage() 759 .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,") 760 ) { 761 // region is recently split 762 ignoreFailure = true; 763 } 764 return ignoreFailure; 765 } 766 767 private ServerName getTargetServer() throws Exception { 768 ServerName server = null; 769 int maxWaitInSeconds = 770 admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX); 771 long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000; 772 while (EnvironmentEdgeManager.currentTime() < maxWait) { 773 try { 774 List<ServerName> regionServers = new ArrayList<>(); 775 regionServers.addAll(admin.getRegionServers()); 776 // Remove the host Region server from target Region Servers list 777 server = stripServer(regionServers, hostname, port); 778 if (server != null) { 779 break; 780 } else { 781 LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting"); 782 } 783 } catch (IOException e) { 784 LOG.warn("Could not get list of region servers", e); 785 } 786 Thread.sleep(500); 787 } 788 if (server == null) { 789 LOG.error("Server " + hostname + ":" + port + " is not up. Giving up."); 790 throw new Exception("Server " + hostname + ":" + port + " to load regions not online"); 791 } 792 return server; 793 } 794 795 private List<RegionInfo> readRegionsFromFile(String filename) throws IOException { 796 List<RegionInfo> regions = new ArrayList<>(); 797 File f = new File(filename); 798 if (!f.exists()) { 799 return regions; 800 } 801 try ( 802 DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(f)))) { 803 int numRegions = dis.readInt(); 804 int index = 0; 805 while (index < numRegions) { 806 regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis))); 807 index++; 808 } 809 } catch (IOException e) { 810 LOG.error("Error while reading regions from file:" + filename, e); 811 throw e; 812 } 813 return regions; 814 } 815 816 /** 817 * Write the number of regions moved in the first line followed by regions moved in subsequent 818 * lines 819 */ 820 private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException { 821 try (DataOutputStream dos = 822 new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filename)))) { 823 dos.writeInt(movedRegions.size()); 824 for (RegionInfo region : movedRegions) { 825 Bytes.writeByteArray(dos, RegionInfo.toByteArray(region)); 826 } 827 } catch (IOException e) { 828 LOG.error("ERROR: Was Not able to write regions moved to output file but moved " 829 + movedRegions.size() + " regions", e); 830 throw e; 831 } 832 } 833 834 private void deleteFile(String filename) { 835 File f = new File(filename); 836 if (f.exists()) { 837 f.delete(); 838 } 839 } 840 841 /** 842 * @param filename The file should have 'host:port' per line 843 * @return List of servers from the file in format 'hostname:port'. 844 */ 845 private List<String> readServersFromFile(String filename) throws IOException { 846 List<String> servers = new ArrayList<>(); 847 if (filename != null) { 848 try { 849 Files.readAllLines(Paths.get(filename)).stream().map(String::trim) 850 .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase) 851 .forEach(servers::add); 852 } catch (IOException e) { 853 LOG.error("Exception while reading servers from file,", e); 854 throw e; 855 } 856 } 857 return servers; 858 } 859 860 /** 861 * Designates or excludes the servername whose hostname and port portion matches the list given in 862 * the file. Example:<br> 863 * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and 864 * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and RS3 865 * are removed from regionServers list so that regions can move to only RS1. If you want to 866 * exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. When we call 867 * includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from regionServers 868 * list so that regions can move to only RS2 and RS3. 869 */ 870 private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers, 871 boolean isInclude) throws IOException { 872 if (fileName != null) { 873 List<String> servers = readServersFromFile(fileName); 874 if (servers.isEmpty()) { 875 LOG.warn("No servers provided in the file: {}." + fileName); 876 return; 877 } 878 Iterator<ServerName> i = regionServers.iterator(); 879 while (i.hasNext()) { 880 String rs = i.next().getServerName(); 881 String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" 882 + rs.split(ServerName.SERVERNAME_SEPARATOR)[1]; 883 if (isInclude != servers.contains(rsPort)) { 884 i.remove(); 885 } 886 } 887 } 888 } 889 890 /** 891 * Exclude master from list of RSs to move regions to 892 */ 893 private void stripMaster(List<ServerName> regionServers) throws IOException { 894 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); 895 stripServer(regionServers, master.getHostname(), master.getPort()); 896 } 897 898 /** 899 * Remove the servername whose hostname and port portion matches from the passed array of servers. 900 * Returns as side-effect the servername removed. 901 * @return server removed from list of Region Servers 902 */ 903 private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) { 904 for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) { 905 ServerName server = iter.next(); 906 if ( 907 server.getAddress().getHostName().equalsIgnoreCase(hostname) 908 && server.getAddress().getPort() == port 909 ) { 910 iter.remove(); 911 return server; 912 } 913 } 914 return null; 915 } 916 917 @Override 918 protected void addOptions() { 919 this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>"); 920 this.addRequiredOptWithArg("o", "operation", 921 "Expected: load/unload/unload_from_rack/isolate_regions"); 922 this.addOptWithArg("m", "maxthreads", 923 "Define the maximum number of threads to use to unload and reload the regions"); 924 this.addOptWithArg("i", "isolateRegionIds", 925 "Comma separated list of Region IDs hash to isolate on a RegionServer and put region server" 926 + " in draining mode. This option should only be used with '-o isolate_regions'." 927 + " By putting region server in decommission/draining mode, master can't assign any" 928 + " new region on this server. If one or more regions are not found OR failed to isolate" 929 + " successfully, utility will exist without putting RS in draining/decommission mode." 930 + " Ex. --isolateRegionIds id1,id2,id3 OR -i id1,id2,id3"); 931 this.addOptWithArg("x", "excludefile", 932 "File with <hostname:port> per line to exclude as unload targets; default excludes only " 933 + "target host; useful for rack decommisioning."); 934 this.addOptWithArg("d", "designatedfile", 935 "File with <hostname:port> per line as unload targets;" + "default is all online hosts"); 936 this.addOptWithArg("f", "filename", 937 "File to save regions list into unloading, or read from loading; " 938 + "default /tmp/<usernamehostname:port>"); 939 this.addOptNoArg("n", "noack", 940 "Turn on No-Ack mode(default: false) which won't check if region is online on target " 941 + "RegionServer, hence best effort. This is more performant in unloading and loading " 942 + "but might lead to region being unavailable for some time till master reassigns it " 943 + "in case the move failed"); 944 this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit " 945 + "irrespective of whether it finished or not;default Integer.MAX_VALUE"); 946 } 947 948 @Override 949 protected void processOptions(CommandLine cmd) { 950 String hostname = cmd.getOptionValue("r"); 951 rmbuilder = new RegionMoverBuilder(hostname); 952 this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT); 953 if (cmd.hasOption('m')) { 954 rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m'))); 955 } 956 if (this.loadUnload.equals("isolate_regions") && cmd.hasOption("isolateRegionIds")) { 957 rmbuilder 958 .isolateRegionIdArray(Arrays.asList(cmd.getOptionValue("isolateRegionIds").split(","))); 959 } 960 if (cmd.hasOption('n')) { 961 rmbuilder.ack(false); 962 } 963 if (cmd.hasOption('f')) { 964 rmbuilder.filename(cmd.getOptionValue('f')); 965 } 966 if (cmd.hasOption('x')) { 967 rmbuilder.excludeFile(cmd.getOptionValue('x')); 968 } 969 if (cmd.hasOption('d')) { 970 rmbuilder.designatedFile(cmd.getOptionValue('d')); 971 } 972 if (cmd.hasOption('t')) { 973 rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t'))); 974 } 975 } 976 977 @Override 978 protected int doWork() throws Exception { 979 boolean success; 980 try (RegionMover rm = rmbuilder.build()) { 981 if (loadUnload.equalsIgnoreCase("load")) { 982 success = rm.load(); 983 } else if (loadUnload.equalsIgnoreCase("unload")) { 984 success = rm.unload(); 985 } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) { 986 success = rm.unloadFromRack(); 987 } else if (loadUnload.equalsIgnoreCase("isolate_regions")) { 988 if (rm.isolateRegionIdArray != null && !rm.isolateRegionIdArray.isEmpty()) { 989 success = rm.isolateRegions(); 990 } else { 991 LOG.error("Missing -i/--isolate_regions option with '-o isolate_regions' option"); 992 LOG.error("Use -h or --help for usage instructions"); 993 printUsage(); 994 success = false; 995 } 996 } else { 997 printUsage(); 998 success = false; 999 } 1000 } 1001 return (success ? 0 : 1); 1002 } 1003 1004 public static void main(String[] args) { 1005 try (RegionMover mover = new RegionMover()) { 1006 mover.doStaticMain(args); 1007 } 1008 } 1009}