001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.BufferedInputStream; 021import java.io.BufferedOutputStream; 022import java.io.Closeable; 023import java.io.DataInputStream; 024import java.io.DataOutputStream; 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.net.InetAddress; 030import java.nio.file.Files; 031import java.nio.file.Paths; 032import java.util.ArrayList; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.EnumSet; 036import java.util.HashSet; 037import java.util.Iterator; 038import java.util.List; 039import java.util.Locale; 040import java.util.Optional; 041import java.util.Set; 042import java.util.concurrent.Callable; 043import java.util.concurrent.CancellationException; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.Future; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.TimeoutException; 050import java.util.function.Predicate; 051import org.apache.commons.io.IOUtils; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.hbase.ClusterMetrics.Option; 054import org.apache.hadoop.hbase.HBaseConfiguration; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.ServerName; 057import org.apache.hadoop.hbase.UnknownRegionException; 058import org.apache.hadoop.hbase.client.Admin; 059import org.apache.hadoop.hbase.client.Connection; 060import org.apache.hadoop.hbase.client.ConnectionFactory; 061import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 062import org.apache.hadoop.hbase.client.RegionInfo; 063import org.apache.hadoop.hbase.master.RackManager; 064import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 065import org.apache.hadoop.hbase.net.Address; 066import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 067import org.apache.yetus.audience.InterfaceAudience; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses; 072import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 073import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 074 075/** 076 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command 077 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode 078 * acknowledges if regions are online after movement while noAck mode is best effort mode that 079 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck 080 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it 081 * anyways. This can also be used by constructiong an Object using the builder and then calling 082 * {@link #load()} or {@link #unload()} methods for the desired operations. 083 */ 084@InterfaceAudience.Public 085public class RegionMover extends AbstractHBaseTool implements Closeable { 086 public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max"; 087 public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max"; 088 public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max"; 089 public static final int DEFAULT_MOVE_RETRIES_MAX = 5; 090 public static final int DEFAULT_MOVE_WAIT_MAX = 60; 091 public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180; 092 093 private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class); 094 095 private RegionMoverBuilder rmbuilder; 096 private boolean ack = true; 097 private int maxthreads = 1; 098 private int timeout; 099 private String loadUnload; 100 private String hostname; 101 private String filename; 102 private String excludeFile; 103 private String designatedFile; 104 private int port; 105 private Connection conn; 106 private Admin admin; 107 private RackManager rackManager; 108 109 private RegionMover(RegionMoverBuilder builder) throws IOException { 110 this.hostname = builder.hostname; 111 this.filename = builder.filename; 112 this.excludeFile = builder.excludeFile; 113 this.designatedFile = builder.designatedFile; 114 this.maxthreads = builder.maxthreads; 115 this.ack = builder.ack; 116 this.port = builder.port; 117 this.timeout = builder.timeout; 118 setConf(builder.conf); 119 this.conn = ConnectionFactory.createConnection(conf); 120 this.admin = conn.getAdmin(); 121 122 // if the hostname of master is ip, it indicates that the master/RS has enabled use-ip, we need 123 // to resolve the current hostname to ip to ensure that the RegionMover logic can be executed 124 // normally, see HBASE-27304 for details. 125 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); 126 if (InetAddresses.isInetAddress(master.getHostname())) { 127 if (!InetAddresses.isInetAddress(this.hostname)) { 128 this.hostname = InetAddress.getByName(this.hostname).getHostAddress(); 129 } 130 } 131 132 // Only while running unit tests, builder.rackManager will not be null for the convenience of 133 // providing custom rackManager. Otherwise for regular workflow/user triggered action, 134 // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is 135 // provided as @InterfaceAudience.Private and it is commented that this is just 136 // to be used by unit test. 137 rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager; 138 } 139 140 private RegionMover() { 141 } 142 143 @Override 144 public void close() { 145 IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e)); 146 IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e)); 147 } 148 149 /** 150 * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has 151 * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)}, 152 * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set 153 * the corresponding options. 154 */ 155 public static class RegionMoverBuilder { 156 private boolean ack = true; 157 private int maxthreads = 1; 158 private int timeout = Integer.MAX_VALUE; 159 private String hostname; 160 private String filename; 161 private String excludeFile = null; 162 private String designatedFile = null; 163 private String defaultDir = System.getProperty("java.io.tmpdir"); 164 @InterfaceAudience.Private 165 final int port; 166 private final Configuration conf; 167 private RackManager rackManager; 168 169 public RegionMoverBuilder(String hostname) { 170 this(hostname, createConf()); 171 } 172 173 /** 174 * Creates a new configuration and sets region mover specific overrides 175 */ 176 private static Configuration createConf() { 177 Configuration conf = HBaseConfiguration.create(); 178 conf.setInt("hbase.client.prefetch.limit", 1); 179 conf.setInt("hbase.client.pause", 500); 180 conf.setInt("hbase.client.retries.number", 100); 181 return conf; 182 } 183 184 /** 185 * @param hostname Hostname to unload regions from or load regions to. Can be either hostname or 186 * hostname:port. 187 * @param conf Configuration object 188 */ 189 public RegionMoverBuilder(String hostname, Configuration conf) { 190 String[] splitHostname = hostname.toLowerCase().split(":"); 191 this.hostname = splitHostname[0]; 192 if (splitHostname.length == 2) { 193 this.port = Integer.parseInt(splitHostname[1]); 194 } else { 195 this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); 196 } 197 this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname 198 + ":" + Integer.toString(this.port); 199 this.conf = conf; 200 } 201 202 /** 203 * Path of file where regions will be written to during unloading/read from during loading 204 * @return RegionMoverBuilder object 205 */ 206 public RegionMoverBuilder filename(String filename) { 207 this.filename = filename; 208 return this; 209 } 210 211 /** 212 * Set the max number of threads that will be used to move regions 213 */ 214 public RegionMoverBuilder maxthreads(int threads) { 215 this.maxthreads = threads; 216 return this; 217 } 218 219 /** 220 * Path of file containing hostnames to be excluded during region movement. Exclude file should 221 * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single 222 * host. 223 */ 224 public RegionMoverBuilder excludeFile(String excludefile) { 225 this.excludeFile = excludefile; 226 return this; 227 } 228 229 /** 230 * Set the designated file. Designated file contains hostnames where region moves. Designated 231 * file should have 'host:port' per line. Port is mandatory here as we can have many RS running 232 * on a single host. 233 * @param designatedFile The designated file 234 * @return RegionMoverBuilder object 235 */ 236 public RegionMoverBuilder designatedFile(String designatedFile) { 237 this.designatedFile = designatedFile; 238 return this; 239 } 240 241 /** 242 * Set ack/noAck mode. 243 * <p> 244 * In ack mode regions are acknowledged before and after moving and the move is retried 245 * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best 246 * effort mode,each region movement is tried once.This can be used during graceful shutdown as 247 * even if we have a stuck region,upon shutdown it'll be reassigned anyway. 248 * <p> 249 * @return RegionMoverBuilder object 250 */ 251 public RegionMoverBuilder ack(boolean ack) { 252 this.ack = ack; 253 return this; 254 } 255 256 /** 257 * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for 258 * movers also have a separate time which is hbase.move.wait.max * number of regions to 259 * load/unload 260 * @param timeout in seconds 261 * @return RegionMoverBuilder object 262 */ 263 public RegionMoverBuilder timeout(int timeout) { 264 this.timeout = timeout; 265 return this; 266 } 267 268 /** 269 * Set specific rackManager implementation. This setter method is for testing purpose only. 270 * @param rackManager rackManager impl 271 * @return RegionMoverBuilder object 272 */ 273 @InterfaceAudience.Private 274 public RegionMoverBuilder rackManager(RackManager rackManager) { 275 this.rackManager = rackManager; 276 return this; 277 } 278 279 /** 280 * This method builds the appropriate RegionMover object which can then be used to load/unload 281 * using load and unload methods 282 * @return RegionMover object 283 */ 284 public RegionMover build() throws IOException { 285 return new RegionMover(this); 286 } 287 } 288 289 /** 290 * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover 291 * Object has to be created using {@link #RegionMover(RegionMoverBuilder)} 292 * @return true if loading succeeded, false otherwise 293 */ 294 public boolean load() throws ExecutionException, InterruptedException, TimeoutException { 295 ExecutorService loadPool = Executors.newFixedThreadPool(1); 296 Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan()); 297 boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading"); 298 if (!isMetaMoved) { 299 return false; 300 } 301 loadPool = Executors.newFixedThreadPool(1); 302 loadTask = loadPool.submit(getNonMetaRegionsMovePlan()); 303 return waitTaskToFinish(loadPool, loadTask, "loading"); 304 } 305 306 private Callable<Boolean> getMetaRegionMovePlan() { 307 return getRegionsMovePlan(true); 308 } 309 310 private Callable<Boolean> getNonMetaRegionsMovePlan() { 311 return getRegionsMovePlan(false); 312 } 313 314 private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) { 315 return () -> { 316 try { 317 List<RegionInfo> regionsToMove = readRegionsFromFile(filename); 318 if (regionsToMove.isEmpty()) { 319 LOG.info("No regions to load.Exiting"); 320 return true; 321 } 322 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove); 323 if (moveMetaRegion) { 324 if (metaRegion.isPresent()) { 325 loadRegions(Collections.singletonList(metaRegion.get())); 326 } 327 } else { 328 metaRegion.ifPresent(regionsToMove::remove); 329 loadRegions(regionsToMove); 330 } 331 } catch (Exception e) { 332 LOG.error("Error while loading regions to " + hostname, e); 333 return false; 334 } 335 return true; 336 }; 337 } 338 339 private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) { 340 return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst(); 341 } 342 343 private void loadRegions(List<RegionInfo> regionsToMove) throws Exception { 344 ServerName server = getTargetServer(); 345 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 346 LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using " 347 + this.maxthreads + " threads.Ack mode:" + this.ack); 348 349 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 350 List<Future<Boolean>> taskList = new ArrayList<>(); 351 int counter = 0; 352 while (counter < regionsToMove.size()) { 353 RegionInfo region = regionsToMove.get(counter); 354 ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn); 355 if (currentServer == null) { 356 LOG 357 .warn("Could not get server for Region:" + region.getRegionNameAsString() + " moving on"); 358 counter++; 359 continue; 360 } else if (server.equals(currentServer)) { 361 LOG.info( 362 "Region " + region.getRegionNameAsString() + " is already on target server=" + server); 363 counter++; 364 continue; 365 } 366 if (ack) { 367 Future<Boolean> task = moveRegionsPool 368 .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions)); 369 taskList.add(task); 370 } else { 371 Future<Boolean> task = moveRegionsPool 372 .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions)); 373 taskList.add(task); 374 } 375 counter++; 376 } 377 378 moveRegionsPool.shutdown(); 379 long timeoutInSeconds = regionsToMove.size() 380 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 381 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 382 } 383 384 /** 385 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 386 * noAck mode we do not make sure that region is successfully online on the target region 387 * server,hence it is best effort.We do not unload regions to hostnames given in 388 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 389 * to hostnames provided in {@link #designatedFile} 390 * @return true if unloading succeeded, false otherwise 391 */ 392 public boolean unload() throws InterruptedException, ExecutionException, TimeoutException { 393 return unloadRegions(false); 394 } 395 396 /** 397 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 398 * noAck mode we do not make sure that region is successfully online on the target region 399 * server,hence it is best effort.We do not unload regions to hostnames given in 400 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 401 * to hostnames provided in {@link #designatedFile}. While unloading regions, destination 402 * RegionServers are selected from different rack i.e regions should not move to any RegionServers 403 * that belong to same rack as source RegionServer. 404 * @return true if unloading succeeded, false otherwise 405 */ 406 public boolean unloadFromRack() 407 throws InterruptedException, ExecutionException, TimeoutException { 408 return unloadRegions(true); 409 } 410 411 private boolean unloadRegions(boolean unloadFromRack) 412 throws InterruptedException, ExecutionException, TimeoutException { 413 deleteFile(this.filename); 414 ExecutorService unloadPool = Executors.newFixedThreadPool(1); 415 Future<Boolean> unloadTask = unloadPool.submit(() -> { 416 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 417 try { 418 // Get Online RegionServers 419 List<ServerName> regionServers = new ArrayList<>(); 420 RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port)); 421 LOG.info("{} belongs to {}", hostname, rsgroup.getName()); 422 regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers())); 423 // Remove the host Region server from target Region Servers list 424 ServerName server = stripServer(regionServers, hostname, port); 425 if (server == null) { 426 LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.", 427 hostname, port); 428 LOG.debug("List of region servers: {}", regionServers); 429 return false; 430 } 431 // Remove RS not present in the designated file 432 includeExcludeRegionServers(designatedFile, regionServers, true); 433 434 // Remove RS present in the exclude file 435 includeExcludeRegionServers(excludeFile, regionServers, false); 436 437 if (unloadFromRack) { 438 // remove regionServers that belong to same rack (as source host) since the goal is to 439 // unload regions from source regionServer to destination regionServers 440 // that belong to different rack only. 441 String sourceRack = rackManager.getRack(server); 442 List<String> racks = rackManager.getRack(regionServers); 443 Iterator<ServerName> iterator = regionServers.iterator(); 444 int i = 0; 445 while (iterator.hasNext()) { 446 iterator.next(); 447 if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) { 448 iterator.remove(); 449 } 450 i++; 451 } 452 } 453 454 // Remove decommissioned RS 455 Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers()); 456 if (CollectionUtils.isNotEmpty(decommissionedRS)) { 457 regionServers.removeIf(decommissionedRS::contains); 458 LOG.debug("Excluded RegionServers from unloading regions to because they " 459 + "are marked as decommissioned. Servers: {}", decommissionedRS); 460 } 461 462 stripMaster(regionServers); 463 if (regionServers.isEmpty()) { 464 LOG.warn("No Regions were moved - no servers available"); 465 return false; 466 } else { 467 LOG.info("Available servers {}", regionServers); 468 } 469 unloadRegions(server, regionServers, movedRegions); 470 } catch (Exception e) { 471 LOG.error("Error while unloading regions ", e); 472 return false; 473 } finally { 474 if (movedRegions != null) { 475 writeFile(filename, movedRegions); 476 } 477 } 478 return true; 479 }); 480 return waitTaskToFinish(unloadPool, unloadTask, "unloading"); 481 } 482 483 @InterfaceAudience.Private 484 Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup, 485 Collection<ServerName> onlineServers) { 486 if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 487 return onlineServers; 488 } 489 List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size()); 490 for (ServerName server : onlineServers) { 491 Address address = Address.fromParts(server.getHostname(), server.getPort()); 492 if (rsgroup.containsServer(address)) { 493 serverLists.add(server); 494 } 495 } 496 return serverLists; 497 } 498 499 private void unloadRegions(ServerName server, List<ServerName> regionServers, 500 List<RegionInfo> movedRegions) throws Exception { 501 while (true) { 502 List<RegionInfo> regionsToMove = admin.getRegions(server); 503 regionsToMove.removeAll(movedRegions); 504 if (regionsToMove.isEmpty()) { 505 LOG.info("No Regions to move....Quitting now"); 506 break; 507 } 508 LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}", 509 regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack); 510 511 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove); 512 if (metaRegion.isPresent()) { 513 RegionInfo meta = metaRegion.get(); 514 submitRegionMovesWhileUnloading(server, regionServers, movedRegions, 515 Collections.singletonList(meta)); 516 regionsToMove.remove(meta); 517 } 518 submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove); 519 } 520 } 521 522 private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers, 523 List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove) throws Exception { 524 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 525 List<Future<Boolean>> taskList = new ArrayList<>(); 526 int serverIndex = 0; 527 for (RegionInfo regionToMove : regionsToMove) { 528 if (ack) { 529 Future<Boolean> task = moveRegionsPool.submit(new MoveWithAck(conn, regionToMove, server, 530 regionServers.get(serverIndex), movedRegions)); 531 taskList.add(task); 532 } else { 533 Future<Boolean> task = moveRegionsPool.submit(new MoveWithoutAck(admin, regionToMove, 534 server, regionServers.get(serverIndex), movedRegions)); 535 taskList.add(task); 536 } 537 serverIndex = (serverIndex + 1) % regionServers.size(); 538 } 539 moveRegionsPool.shutdown(); 540 long timeoutInSeconds = regionsToMove.size() 541 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 542 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 543 } 544 545 private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation) 546 throws TimeoutException, InterruptedException, ExecutionException { 547 pool.shutdown(); 548 try { 549 if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { 550 LOG.warn("Timed out before finishing the " + operation + " operation. Timeout: " 551 + this.timeout + "sec"); 552 pool.shutdownNow(); 553 } 554 } catch (InterruptedException e) { 555 pool.shutdownNow(); 556 Thread.currentThread().interrupt(); 557 } 558 try { 559 return task.get(5, TimeUnit.SECONDS); 560 } catch (InterruptedException e) { 561 LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e); 562 throw e; 563 } catch (ExecutionException e) { 564 LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e); 565 throw e; 566 } 567 } 568 569 private void waitMoveTasksToFinish(ExecutorService moveRegionsPool, 570 List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception { 571 try { 572 if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { 573 moveRegionsPool.shutdownNow(); 574 } 575 } catch (InterruptedException e) { 576 moveRegionsPool.shutdownNow(); 577 Thread.currentThread().interrupt(); 578 } 579 for (Future<Boolean> future : taskList) { 580 try { 581 // if even after shutdownNow threads are stuck we wait for 5 secs max 582 if (!future.get(5, TimeUnit.SECONDS)) { 583 LOG.error("Was Not able to move region....Exiting Now"); 584 throw new Exception("Could not move region Exception"); 585 } 586 } catch (InterruptedException e) { 587 LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); 588 throw e; 589 } catch (ExecutionException e) { 590 boolean ignoreFailure = ignoreRegionMoveFailure(e); 591 if (ignoreFailure) { 592 LOG.debug("Ignore region move failure, it might have been split/merged.", e); 593 } else { 594 LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e); 595 throw e; 596 } 597 } catch (CancellationException e) { 598 LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds 599 + "secs", e); 600 throw e; 601 } 602 } 603 } 604 605 private boolean ignoreRegionMoveFailure(ExecutionException e) { 606 boolean ignoreFailure = false; 607 if (e.getCause() instanceof UnknownRegionException) { 608 // region does not exist anymore 609 ignoreFailure = true; 610 } else if ( 611 e.getCause() instanceof DoNotRetryRegionException && e.getCause().getMessage() != null 612 && e.getCause().getMessage() 613 .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,") 614 ) { 615 // region is recently split 616 ignoreFailure = true; 617 } 618 return ignoreFailure; 619 } 620 621 private ServerName getTargetServer() throws Exception { 622 ServerName server = null; 623 int maxWaitInSeconds = 624 admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX); 625 long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000; 626 while (EnvironmentEdgeManager.currentTime() < maxWait) { 627 try { 628 List<ServerName> regionServers = new ArrayList<>(); 629 regionServers.addAll(admin.getRegionServers()); 630 // Remove the host Region server from target Region Servers list 631 server = stripServer(regionServers, hostname, port); 632 if (server != null) { 633 break; 634 } else { 635 LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting"); 636 } 637 } catch (IOException e) { 638 LOG.warn("Could not get list of region servers", e); 639 } 640 Thread.sleep(500); 641 } 642 if (server == null) { 643 LOG.error("Server " + hostname + ":" + port + " is not up. Giving up."); 644 throw new Exception("Server " + hostname + ":" + port + " to load regions not online"); 645 } 646 return server; 647 } 648 649 private List<RegionInfo> readRegionsFromFile(String filename) throws IOException { 650 List<RegionInfo> regions = new ArrayList<>(); 651 File f = new File(filename); 652 if (!f.exists()) { 653 return regions; 654 } 655 try ( 656 DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(f)))) { 657 int numRegions = dis.readInt(); 658 int index = 0; 659 while (index < numRegions) { 660 regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis))); 661 index++; 662 } 663 } catch (IOException e) { 664 LOG.error("Error while reading regions from file:" + filename, e); 665 throw e; 666 } 667 return regions; 668 } 669 670 /** 671 * Write the number of regions moved in the first line followed by regions moved in subsequent 672 * lines 673 */ 674 private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException { 675 try (DataOutputStream dos = 676 new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filename)))) { 677 dos.writeInt(movedRegions.size()); 678 for (RegionInfo region : movedRegions) { 679 Bytes.writeByteArray(dos, RegionInfo.toByteArray(region)); 680 } 681 } catch (IOException e) { 682 LOG.error("ERROR: Was Not able to write regions moved to output file but moved " 683 + movedRegions.size() + " regions", e); 684 throw e; 685 } 686 } 687 688 private void deleteFile(String filename) { 689 File f = new File(filename); 690 if (f.exists()) { 691 f.delete(); 692 } 693 } 694 695 /** 696 * @param filename The file should have 'host:port' per line 697 * @return List of servers from the file in format 'hostname:port'. 698 */ 699 private List<String> readServersFromFile(String filename) throws IOException { 700 List<String> servers = new ArrayList<>(); 701 if (filename != null) { 702 try { 703 Files.readAllLines(Paths.get(filename)).stream().map(String::trim) 704 .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase) 705 .forEach(servers::add); 706 } catch (IOException e) { 707 LOG.error("Exception while reading servers from file,", e); 708 throw e; 709 } 710 } 711 return servers; 712 } 713 714 /** 715 * Designates or excludes the servername whose hostname and port portion matches the list given in 716 * the file. Example:<br> 717 * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and 718 * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and RS3 719 * are removed from regionServers list so that regions can move to only RS1. If you want to 720 * exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. When we call 721 * includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from regionServers 722 * list so that regions can move to only RS2 and RS3. 723 */ 724 private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers, 725 boolean isInclude) throws IOException { 726 if (fileName != null) { 727 List<String> servers = readServersFromFile(fileName); 728 if (servers.isEmpty()) { 729 LOG.warn("No servers provided in the file: {}." + fileName); 730 return; 731 } 732 Iterator<ServerName> i = regionServers.iterator(); 733 while (i.hasNext()) { 734 String rs = i.next().getServerName(); 735 String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" 736 + rs.split(ServerName.SERVERNAME_SEPARATOR)[1]; 737 if (isInclude != servers.contains(rsPort)) { 738 i.remove(); 739 } 740 } 741 } 742 } 743 744 /** 745 * Exclude master from list of RSs to move regions to 746 */ 747 private void stripMaster(List<ServerName> regionServers) throws IOException { 748 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); 749 stripServer(regionServers, master.getHostname(), master.getPort()); 750 } 751 752 /** 753 * Remove the servername whose hostname and port portion matches from the passed array of servers. 754 * Returns as side-effect the servername removed. 755 * @return server removed from list of Region Servers 756 */ 757 private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) { 758 for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) { 759 ServerName server = iter.next(); 760 if ( 761 server.getAddress().getHostName().equalsIgnoreCase(hostname) 762 && server.getAddress().getPort() == port 763 ) { 764 iter.remove(); 765 return server; 766 } 767 } 768 return null; 769 } 770 771 @Override 772 protected void addOptions() { 773 this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>"); 774 this.addRequiredOptWithArg("o", "operation", "Expected: load/unload/unload_from_rack"); 775 this.addOptWithArg("m", "maxthreads", 776 "Define the maximum number of threads to use to unload and reload the regions"); 777 this.addOptWithArg("x", "excludefile", 778 "File with <hostname:port> per line to exclude as unload targets; default excludes only " 779 + "target host; useful for rack decommisioning."); 780 this.addOptWithArg("d", "designatedfile", 781 "File with <hostname:port> per line as unload targets;" + "default is all online hosts"); 782 this.addOptWithArg("f", "filename", 783 "File to save regions list into unloading, or read from loading; " 784 + "default /tmp/<usernamehostname:port>"); 785 this.addOptNoArg("n", "noack", 786 "Turn on No-Ack mode(default: false) which won't check if region is online on target " 787 + "RegionServer, hence best effort. This is more performant in unloading and loading " 788 + "but might lead to region being unavailable for some time till master reassigns it " 789 + "in case the move failed"); 790 this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit " 791 + "irrespective of whether it finished or not;default Integer.MAX_VALUE"); 792 } 793 794 @Override 795 protected void processOptions(CommandLine cmd) { 796 String hostname = cmd.getOptionValue("r"); 797 rmbuilder = new RegionMoverBuilder(hostname); 798 if (cmd.hasOption('m')) { 799 rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m'))); 800 } 801 if (cmd.hasOption('n')) { 802 rmbuilder.ack(false); 803 } 804 if (cmd.hasOption('f')) { 805 rmbuilder.filename(cmd.getOptionValue('f')); 806 } 807 if (cmd.hasOption('x')) { 808 rmbuilder.excludeFile(cmd.getOptionValue('x')); 809 } 810 if (cmd.hasOption('d')) { 811 rmbuilder.designatedFile(cmd.getOptionValue('d')); 812 } 813 if (cmd.hasOption('t')) { 814 rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t'))); 815 } 816 this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT); 817 } 818 819 @Override 820 protected int doWork() throws Exception { 821 boolean success; 822 try (RegionMover rm = rmbuilder.build()) { 823 if (loadUnload.equalsIgnoreCase("load")) { 824 success = rm.load(); 825 } else if (loadUnload.equalsIgnoreCase("unload")) { 826 success = rm.unload(); 827 } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) { 828 success = rm.unloadFromRack(); 829 } else { 830 printUsage(); 831 success = false; 832 } 833 } 834 return (success ? 0 : 1); 835 } 836 837 public static void main(String[] args) { 838 try (RegionMover mover = new RegionMover()) { 839 mover.doStaticMain(args); 840 } 841 } 842}