001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.BufferedInputStream; 021import java.io.BufferedOutputStream; 022import java.io.Closeable; 023import java.io.DataInputStream; 024import java.io.DataOutputStream; 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.nio.file.Files; 030import java.nio.file.Paths; 031import java.util.ArrayList; 032import java.util.Collection; 033import java.util.Collections; 034import java.util.EnumSet; 035import java.util.HashSet; 036import java.util.Iterator; 037import java.util.List; 038import java.util.Locale; 039import java.util.Optional; 040import java.util.Set; 041import java.util.concurrent.Callable; 042import java.util.concurrent.CancellationException; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Executors; 046import java.util.concurrent.Future; 047import java.util.concurrent.TimeUnit; 048import java.util.concurrent.TimeoutException; 049import java.util.function.Predicate; 050import org.apache.commons.io.IOUtils; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.hbase.ClusterMetrics.Option; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HConstants; 055import org.apache.hadoop.hbase.ServerName; 056import org.apache.hadoop.hbase.UnknownRegionException; 057import org.apache.hadoop.hbase.client.Admin; 058import org.apache.hadoop.hbase.client.Connection; 059import org.apache.hadoop.hbase.client.ConnectionFactory; 060import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 061import org.apache.hadoop.hbase.client.RegionInfo; 062import org.apache.hadoop.hbase.master.RackManager; 063import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 064import org.apache.hadoop.hbase.net.Address; 065import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 066import org.apache.yetus.audience.InterfaceAudience; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 071import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 072 073/** 074 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command 075 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode 076 * acknowledges if regions are online after movement while noAck mode is best effort mode that 077 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck 078 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it 079 * anyways. This can also be used by constructiong an Object using the builder and then calling 080 * {@link #load()} or {@link #unload()} methods for the desired operations. 081 */ 082@InterfaceAudience.Public 083public class RegionMover extends AbstractHBaseTool implements Closeable { 084 public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max"; 085 public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max"; 086 public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max"; 087 public static final int DEFAULT_MOVE_RETRIES_MAX = 5; 088 public static final int DEFAULT_MOVE_WAIT_MAX = 60; 089 public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180; 090 091 private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class); 092 093 private RegionMoverBuilder rmbuilder; 094 private boolean ack = true; 095 private int maxthreads = 1; 096 private int timeout; 097 private String loadUnload; 098 private String hostname; 099 private String filename; 100 private String excludeFile; 101 private String designatedFile; 102 private int port; 103 private Connection conn; 104 private Admin admin; 105 private RackManager rackManager; 106 107 private RegionMover(RegionMoverBuilder builder) throws IOException { 108 this.hostname = builder.hostname; 109 this.filename = builder.filename; 110 this.excludeFile = builder.excludeFile; 111 this.designatedFile = builder.designatedFile; 112 this.maxthreads = builder.maxthreads; 113 this.ack = builder.ack; 114 this.port = builder.port; 115 this.timeout = builder.timeout; 116 setConf(builder.conf); 117 this.conn = ConnectionFactory.createConnection(conf); 118 this.admin = conn.getAdmin(); 119 // Only while running unit tests, builder.rackManager will not be null for the convenience of 120 // providing custom rackManager. Otherwise for regular workflow/user triggered action, 121 // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is 122 // provided as @InterfaceAudience.Private and it is commented that this is just 123 // to be used by unit test. 124 rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager; 125 } 126 127 private RegionMover() { 128 } 129 130 @Override 131 public void close() { 132 IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e)); 133 IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e)); 134 } 135 136 /** 137 * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has 138 * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)}, 139 * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set 140 * the corresponding options. 141 */ 142 public static class RegionMoverBuilder { 143 private boolean ack = true; 144 private int maxthreads = 1; 145 private int timeout = Integer.MAX_VALUE; 146 private String hostname; 147 private String filename; 148 private String excludeFile = null; 149 private String designatedFile = null; 150 private String defaultDir = System.getProperty("java.io.tmpdir"); 151 @InterfaceAudience.Private 152 final int port; 153 private final Configuration conf; 154 private RackManager rackManager; 155 156 public RegionMoverBuilder(String hostname) { 157 this(hostname, createConf()); 158 } 159 160 /** 161 * Creates a new configuration and sets region mover specific overrides 162 */ 163 private static Configuration createConf() { 164 Configuration conf = HBaseConfiguration.create(); 165 conf.setInt("hbase.client.prefetch.limit", 1); 166 conf.setInt("hbase.client.pause", 500); 167 conf.setInt("hbase.client.retries.number", 100); 168 return conf; 169 } 170 171 /** 172 * @param hostname Hostname to unload regions from or load regions to. Can be either hostname or 173 * hostname:port. 174 * @param conf Configuration object 175 */ 176 public RegionMoverBuilder(String hostname, Configuration conf) { 177 String[] splitHostname = hostname.toLowerCase().split(":"); 178 this.hostname = splitHostname[0]; 179 if (splitHostname.length == 2) { 180 this.port = Integer.parseInt(splitHostname[1]); 181 } else { 182 this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); 183 } 184 this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname 185 + ":" + Integer.toString(this.port); 186 this.conf = conf; 187 } 188 189 /** 190 * Path of file where regions will be written to during unloading/read from during loading n 191 * * @return RegionMoverBuilder object 192 */ 193 public RegionMoverBuilder filename(String filename) { 194 this.filename = filename; 195 return this; 196 } 197 198 /** 199 * Set the max number of threads that will be used to move regions 200 */ 201 public RegionMoverBuilder maxthreads(int threads) { 202 this.maxthreads = threads; 203 return this; 204 } 205 206 /** 207 * Path of file containing hostnames to be excluded during region movement. Exclude file should 208 * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single 209 * host. 210 */ 211 public RegionMoverBuilder excludeFile(String excludefile) { 212 this.excludeFile = excludefile; 213 return this; 214 } 215 216 /** 217 * Set the designated file. Designated file contains hostnames where region moves. Designated 218 * file should have 'host:port' per line. Port is mandatory here as we can have many RS running 219 * on a single host. 220 * @param designatedFile The designated file 221 * @return RegionMoverBuilder object 222 */ 223 public RegionMoverBuilder designatedFile(String designatedFile) { 224 this.designatedFile = designatedFile; 225 return this; 226 } 227 228 /** 229 * Set ack/noAck mode. 230 * <p> 231 * In ack mode regions are acknowledged before and after moving and the move is retried 232 * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best 233 * effort mode,each region movement is tried once.This can be used during graceful shutdown as 234 * even if we have a stuck region,upon shutdown it'll be reassigned anyway. 235 * <p> 236 * n * @return RegionMoverBuilder object 237 */ 238 public RegionMoverBuilder ack(boolean ack) { 239 this.ack = ack; 240 return this; 241 } 242 243 /** 244 * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for 245 * movers also have a separate time which is hbase.move.wait.max * number of regions to 246 * load/unload 247 * @param timeout in seconds 248 * @return RegionMoverBuilder object 249 */ 250 public RegionMoverBuilder timeout(int timeout) { 251 this.timeout = timeout; 252 return this; 253 } 254 255 /** 256 * Set specific rackManager implementation. This setter method is for testing purpose only. 257 * @param rackManager rackManager impl 258 * @return RegionMoverBuilder object 259 */ 260 @InterfaceAudience.Private 261 public RegionMoverBuilder rackManager(RackManager rackManager) { 262 this.rackManager = rackManager; 263 return this; 264 } 265 266 /** 267 * This method builds the appropriate RegionMover object which can then be used to load/unload 268 * using load and unload methods 269 * @return RegionMover object 270 */ 271 public RegionMover build() throws IOException { 272 return new RegionMover(this); 273 } 274 } 275 276 /** 277 * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover 278 * Object has to be created using {@link #RegionMover(RegionMoverBuilder)} 279 * @return true if loading succeeded, false otherwise 280 */ 281 public boolean load() throws ExecutionException, InterruptedException, TimeoutException { 282 ExecutorService loadPool = Executors.newFixedThreadPool(1); 283 Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan()); 284 boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading"); 285 if (!isMetaMoved) { 286 return false; 287 } 288 loadPool = Executors.newFixedThreadPool(1); 289 loadTask = loadPool.submit(getNonMetaRegionsMovePlan()); 290 return waitTaskToFinish(loadPool, loadTask, "loading"); 291 } 292 293 private Callable<Boolean> getMetaRegionMovePlan() { 294 return getRegionsMovePlan(true); 295 } 296 297 private Callable<Boolean> getNonMetaRegionsMovePlan() { 298 return getRegionsMovePlan(false); 299 } 300 301 private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) { 302 return () -> { 303 try { 304 List<RegionInfo> regionsToMove = readRegionsFromFile(filename); 305 if (regionsToMove.isEmpty()) { 306 LOG.info("No regions to load.Exiting"); 307 return true; 308 } 309 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove); 310 if (moveMetaRegion) { 311 if (metaRegion.isPresent()) { 312 loadRegions(Collections.singletonList(metaRegion.get())); 313 } 314 } else { 315 metaRegion.ifPresent(regionsToMove::remove); 316 loadRegions(regionsToMove); 317 } 318 } catch (Exception e) { 319 LOG.error("Error while loading regions to " + hostname, e); 320 return false; 321 } 322 return true; 323 }; 324 } 325 326 private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) { 327 return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst(); 328 } 329 330 private void loadRegions(List<RegionInfo> regionsToMove) throws Exception { 331 ServerName server = getTargetServer(); 332 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 333 LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using " 334 + this.maxthreads + " threads.Ack mode:" + this.ack); 335 336 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 337 List<Future<Boolean>> taskList = new ArrayList<>(); 338 int counter = 0; 339 while (counter < regionsToMove.size()) { 340 RegionInfo region = regionsToMove.get(counter); 341 ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn); 342 if (currentServer == null) { 343 LOG 344 .warn("Could not get server for Region:" + region.getRegionNameAsString() + " moving on"); 345 counter++; 346 continue; 347 } else if (server.equals(currentServer)) { 348 LOG.info( 349 "Region " + region.getRegionNameAsString() + " is already on target server=" + server); 350 counter++; 351 continue; 352 } 353 if (ack) { 354 Future<Boolean> task = moveRegionsPool 355 .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions)); 356 taskList.add(task); 357 } else { 358 Future<Boolean> task = moveRegionsPool 359 .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions)); 360 taskList.add(task); 361 } 362 counter++; 363 } 364 365 moveRegionsPool.shutdown(); 366 long timeoutInSeconds = regionsToMove.size() 367 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 368 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 369 } 370 371 /** 372 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 373 * noAck mode we do not make sure that region is successfully online on the target region 374 * server,hence it is best effort.We do not unload regions to hostnames given in 375 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 376 * to hostnames provided in {@link #designatedFile} 377 * @return true if unloading succeeded, false otherwise 378 */ 379 public boolean unload() throws InterruptedException, ExecutionException, TimeoutException { 380 return unloadRegions(false); 381 } 382 383 /** 384 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 385 * noAck mode we do not make sure that region is successfully online on the target region 386 * server,hence it is best effort.We do not unload regions to hostnames given in 387 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 388 * to hostnames provided in {@link #designatedFile}. While unloading regions, destination 389 * RegionServers are selected from different rack i.e regions should not move to any RegionServers 390 * that belong to same rack as source RegionServer. 391 * @return true if unloading succeeded, false otherwise 392 */ 393 public boolean unloadFromRack() 394 throws InterruptedException, ExecutionException, TimeoutException { 395 return unloadRegions(true); 396 } 397 398 private boolean unloadRegions(boolean unloadFromRack) 399 throws InterruptedException, ExecutionException, TimeoutException { 400 deleteFile(this.filename); 401 ExecutorService unloadPool = Executors.newFixedThreadPool(1); 402 Future<Boolean> unloadTask = unloadPool.submit(() -> { 403 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 404 try { 405 // Get Online RegionServers 406 List<ServerName> regionServers = new ArrayList<>(); 407 RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port)); 408 LOG.info("{} belongs to {}", hostname, rsgroup.getName()); 409 regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers())); 410 // Remove the host Region server from target Region Servers list 411 ServerName server = stripServer(regionServers, hostname, port); 412 if (server == null) { 413 LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.", 414 hostname, port); 415 LOG.debug("List of region servers: {}", regionServers); 416 return false; 417 } 418 // Remove RS not present in the designated file 419 includeExcludeRegionServers(designatedFile, regionServers, true); 420 421 // Remove RS present in the exclude file 422 includeExcludeRegionServers(excludeFile, regionServers, false); 423 424 if (unloadFromRack) { 425 // remove regionServers that belong to same rack (as source host) since the goal is to 426 // unload regions from source regionServer to destination regionServers 427 // that belong to different rack only. 428 String sourceRack = rackManager.getRack(server); 429 List<String> racks = rackManager.getRack(regionServers); 430 Iterator<ServerName> iterator = regionServers.iterator(); 431 int i = 0; 432 while (iterator.hasNext()) { 433 iterator.next(); 434 if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) { 435 iterator.remove(); 436 } 437 i++; 438 } 439 } 440 441 // Remove decommissioned RS 442 Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers()); 443 if (CollectionUtils.isNotEmpty(decommissionedRS)) { 444 regionServers.removeIf(decommissionedRS::contains); 445 LOG.debug("Excluded RegionServers from unloading regions to because they " 446 + "are marked as decommissioned. Servers: {}", decommissionedRS); 447 } 448 449 stripMaster(regionServers); 450 if (regionServers.isEmpty()) { 451 LOG.warn("No Regions were moved - no servers available"); 452 return false; 453 } else { 454 LOG.info("Available servers {}", regionServers); 455 } 456 unloadRegions(server, regionServers, movedRegions); 457 } catch (Exception e) { 458 LOG.error("Error while unloading regions ", e); 459 return false; 460 } finally { 461 if (movedRegions != null) { 462 writeFile(filename, movedRegions); 463 } 464 } 465 return true; 466 }); 467 return waitTaskToFinish(unloadPool, unloadTask, "unloading"); 468 } 469 470 @InterfaceAudience.Private 471 Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup, 472 Collection<ServerName> onlineServers) { 473 if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 474 return onlineServers; 475 } 476 List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size()); 477 for (ServerName server : onlineServers) { 478 Address address = Address.fromParts(server.getHostname(), server.getPort()); 479 if (rsgroup.containsServer(address)) { 480 serverLists.add(server); 481 } 482 } 483 return serverLists; 484 } 485 486 private void unloadRegions(ServerName server, List<ServerName> regionServers, 487 List<RegionInfo> movedRegions) throws Exception { 488 while (true) { 489 List<RegionInfo> regionsToMove = admin.getRegions(server); 490 regionsToMove.removeAll(movedRegions); 491 if (regionsToMove.isEmpty()) { 492 LOG.info("No Regions to move....Quitting now"); 493 break; 494 } 495 LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}", 496 regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack); 497 498 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove); 499 if (metaRegion.isPresent()) { 500 RegionInfo meta = metaRegion.get(); 501 submitRegionMovesWhileUnloading(server, regionServers, movedRegions, 502 Collections.singletonList(meta)); 503 regionsToMove.remove(meta); 504 } 505 submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove); 506 } 507 } 508 509 private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers, 510 List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove) throws Exception { 511 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 512 List<Future<Boolean>> taskList = new ArrayList<>(); 513 int serverIndex = 0; 514 for (RegionInfo regionToMove : regionsToMove) { 515 if (ack) { 516 Future<Boolean> task = moveRegionsPool.submit(new MoveWithAck(conn, regionToMove, server, 517 regionServers.get(serverIndex), movedRegions)); 518 taskList.add(task); 519 } else { 520 Future<Boolean> task = moveRegionsPool.submit(new MoveWithoutAck(admin, regionToMove, 521 server, regionServers.get(serverIndex), movedRegions)); 522 taskList.add(task); 523 } 524 serverIndex = (serverIndex + 1) % regionServers.size(); 525 } 526 moveRegionsPool.shutdown(); 527 long timeoutInSeconds = regionsToMove.size() 528 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 529 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 530 } 531 532 private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation) 533 throws TimeoutException, InterruptedException, ExecutionException { 534 pool.shutdown(); 535 try { 536 if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { 537 LOG.warn("Timed out before finishing the " + operation + " operation. Timeout: " 538 + this.timeout + "sec"); 539 pool.shutdownNow(); 540 } 541 } catch (InterruptedException e) { 542 pool.shutdownNow(); 543 Thread.currentThread().interrupt(); 544 } 545 try { 546 return task.get(5, TimeUnit.SECONDS); 547 } catch (InterruptedException e) { 548 LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e); 549 throw e; 550 } catch (ExecutionException e) { 551 LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e); 552 throw e; 553 } 554 } 555 556 private void waitMoveTasksToFinish(ExecutorService moveRegionsPool, 557 List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception { 558 try { 559 if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { 560 moveRegionsPool.shutdownNow(); 561 } 562 } catch (InterruptedException e) { 563 moveRegionsPool.shutdownNow(); 564 Thread.currentThread().interrupt(); 565 } 566 for (Future<Boolean> future : taskList) { 567 try { 568 // if even after shutdownNow threads are stuck we wait for 5 secs max 569 if (!future.get(5, TimeUnit.SECONDS)) { 570 LOG.error("Was Not able to move region....Exiting Now"); 571 throw new Exception("Could not move region Exception"); 572 } 573 } catch (InterruptedException e) { 574 LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); 575 throw e; 576 } catch (ExecutionException e) { 577 boolean ignoreFailure = ignoreRegionMoveFailure(e); 578 if (ignoreFailure) { 579 LOG.debug("Ignore region move failure, it might have been split/merged.", e); 580 } else { 581 LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e); 582 throw e; 583 } 584 } catch (CancellationException e) { 585 LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds 586 + "secs", e); 587 throw e; 588 } 589 } 590 } 591 592 private boolean ignoreRegionMoveFailure(ExecutionException e) { 593 boolean ignoreFailure = false; 594 if (e.getCause() instanceof UnknownRegionException) { 595 // region does not exist anymore 596 ignoreFailure = true; 597 } else if ( 598 e.getCause() instanceof DoNotRetryRegionException && e.getCause().getMessage() != null 599 && e.getCause().getMessage() 600 .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,") 601 ) { 602 // region is recently split 603 ignoreFailure = true; 604 } 605 return ignoreFailure; 606 } 607 608 private ServerName getTargetServer() throws Exception { 609 ServerName server = null; 610 int maxWaitInSeconds = 611 admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX); 612 long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000; 613 while (EnvironmentEdgeManager.currentTime() < maxWait) { 614 try { 615 List<ServerName> regionServers = new ArrayList<>(); 616 regionServers.addAll(admin.getRegionServers()); 617 // Remove the host Region server from target Region Servers list 618 server = stripServer(regionServers, hostname, port); 619 if (server != null) { 620 break; 621 } else { 622 LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting"); 623 } 624 } catch (IOException e) { 625 LOG.warn("Could not get list of region servers", e); 626 } 627 Thread.sleep(500); 628 } 629 if (server == null) { 630 LOG.error("Server " + hostname + ":" + port + " is not up. Giving up."); 631 throw new Exception("Server " + hostname + ":" + port + " to load regions not online"); 632 } 633 return server; 634 } 635 636 private List<RegionInfo> readRegionsFromFile(String filename) throws IOException { 637 List<RegionInfo> regions = new ArrayList<>(); 638 File f = new File(filename); 639 if (!f.exists()) { 640 return regions; 641 } 642 try ( 643 DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(f)))) { 644 int numRegions = dis.readInt(); 645 int index = 0; 646 while (index < numRegions) { 647 regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis))); 648 index++; 649 } 650 } catch (IOException e) { 651 LOG.error("Error while reading regions from file:" + filename, e); 652 throw e; 653 } 654 return regions; 655 } 656 657 /** 658 * Write the number of regions moved in the first line followed by regions moved in subsequent 659 * lines 660 */ 661 private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException { 662 try (DataOutputStream dos = 663 new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filename)))) { 664 dos.writeInt(movedRegions.size()); 665 for (RegionInfo region : movedRegions) { 666 Bytes.writeByteArray(dos, RegionInfo.toByteArray(region)); 667 } 668 } catch (IOException e) { 669 LOG.error("ERROR: Was Not able to write regions moved to output file but moved " 670 + movedRegions.size() + " regions", e); 671 throw e; 672 } 673 } 674 675 private void deleteFile(String filename) { 676 File f = new File(filename); 677 if (f.exists()) { 678 f.delete(); 679 } 680 } 681 682 /** 683 * @param filename The file should have 'host:port' per line 684 * @return List of servers from the file in format 'hostname:port'. 685 */ 686 private List<String> readServersFromFile(String filename) throws IOException { 687 List<String> servers = new ArrayList<>(); 688 if (filename != null) { 689 try { 690 Files.readAllLines(Paths.get(filename)).stream().map(String::trim) 691 .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase) 692 .forEach(servers::add); 693 } catch (IOException e) { 694 LOG.error("Exception while reading servers from file,", e); 695 throw e; 696 } 697 } 698 return servers; 699 } 700 701 /** 702 * Designates or excludes the servername whose hostname and port portion matches the list given in 703 * the file. Example:<br> 704 * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and 705 * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and RS3 706 * are removed from regionServers list so that regions can move to only RS1. If you want to 707 * exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. When we call 708 * includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from regionServers 709 * list so that regions can move to only RS2 and RS3. 710 */ 711 private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers, 712 boolean isInclude) throws IOException { 713 if (fileName != null) { 714 List<String> servers = readServersFromFile(fileName); 715 if (servers.isEmpty()) { 716 LOG.warn("No servers provided in the file: {}." + fileName); 717 return; 718 } 719 Iterator<ServerName> i = regionServers.iterator(); 720 while (i.hasNext()) { 721 String rs = i.next().getServerName(); 722 String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" 723 + rs.split(ServerName.SERVERNAME_SEPARATOR)[1]; 724 if (isInclude != servers.contains(rsPort)) { 725 i.remove(); 726 } 727 } 728 } 729 } 730 731 /** 732 * Exclude master from list of RSs to move regions to 733 */ 734 private void stripMaster(List<ServerName> regionServers) throws IOException { 735 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); 736 stripServer(regionServers, master.getHostname(), master.getPort()); 737 } 738 739 /** 740 * Remove the servername whose hostname and port portion matches from the passed array of servers. 741 * Returns as side-effect the servername removed. 742 * @return server removed from list of Region Servers 743 */ 744 private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) { 745 for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) { 746 ServerName server = iter.next(); 747 if ( 748 server.getAddress().getHostName().equalsIgnoreCase(hostname) 749 && server.getAddress().getPort() == port 750 ) { 751 iter.remove(); 752 return server; 753 } 754 } 755 return null; 756 } 757 758 @Override 759 protected void addOptions() { 760 this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>"); 761 this.addRequiredOptWithArg("o", "operation", "Expected: load/unload/unload_from_rack"); 762 this.addOptWithArg("m", "maxthreads", 763 "Define the maximum number of threads to use to unload and reload the regions"); 764 this.addOptWithArg("x", "excludefile", 765 "File with <hostname:port> per line to exclude as unload targets; default excludes only " 766 + "target host; useful for rack decommisioning."); 767 this.addOptWithArg("d", "designatedfile", 768 "File with <hostname:port> per line as unload targets;" + "default is all online hosts"); 769 this.addOptWithArg("f", "filename", 770 "File to save regions list into unloading, or read from loading; " 771 + "default /tmp/<usernamehostname:port>"); 772 this.addOptNoArg("n", "noack", 773 "Turn on No-Ack mode(default: false) which won't check if region is online on target " 774 + "RegionServer, hence best effort. This is more performant in unloading and loading " 775 + "but might lead to region being unavailable for some time till master reassigns it " 776 + "in case the move failed"); 777 this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit " 778 + "irrespective of whether it finished or not;default Integer.MAX_VALUE"); 779 } 780 781 @Override 782 protected void processOptions(CommandLine cmd) { 783 String hostname = cmd.getOptionValue("r"); 784 rmbuilder = new RegionMoverBuilder(hostname); 785 if (cmd.hasOption('m')) { 786 rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m'))); 787 } 788 if (cmd.hasOption('n')) { 789 rmbuilder.ack(false); 790 } 791 if (cmd.hasOption('f')) { 792 rmbuilder.filename(cmd.getOptionValue('f')); 793 } 794 if (cmd.hasOption('x')) { 795 rmbuilder.excludeFile(cmd.getOptionValue('x')); 796 } 797 if (cmd.hasOption('d')) { 798 rmbuilder.designatedFile(cmd.getOptionValue('d')); 799 } 800 if (cmd.hasOption('t')) { 801 rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t'))); 802 } 803 this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT); 804 } 805 806 @Override 807 protected int doWork() throws Exception { 808 boolean success; 809 try (RegionMover rm = rmbuilder.build()) { 810 if (loadUnload.equalsIgnoreCase("load")) { 811 success = rm.load(); 812 } else if (loadUnload.equalsIgnoreCase("unload")) { 813 success = rm.unload(); 814 } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) { 815 success = rm.unloadFromRack(); 816 } else { 817 printUsage(); 818 success = false; 819 } 820 } 821 return (success ? 0 : 1); 822 } 823 824 public static void main(String[] args) { 825 try (RegionMover mover = new RegionMover()) { 826 mover.doStaticMain(args); 827 } 828 } 829}