001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.util; 021 022import java.io.BufferedInputStream; 023import java.io.BufferedOutputStream; 024import java.io.Closeable; 025import java.io.DataInputStream; 026import java.io.DataOutputStream; 027import java.io.File; 028import java.io.FileInputStream; 029import java.io.FileOutputStream; 030import java.io.IOException; 031import java.nio.file.Files; 032import java.nio.file.Paths; 033import java.util.ArrayList; 034import java.util.Collections; 035import java.util.EnumSet; 036import java.util.HashSet; 037import java.util.Iterator; 038import java.util.List; 039import java.util.Locale; 040import java.util.Set; 041import java.util.concurrent.Callable; 042import java.util.concurrent.CancellationException; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Executors; 046import java.util.concurrent.Future; 047import java.util.concurrent.TimeUnit; 048import java.util.concurrent.TimeoutException; 049import java.util.function.Predicate; 050import org.apache.commons.io.IOUtils; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.hbase.ClusterMetrics.Option; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HConstants; 055import org.apache.hadoop.hbase.HRegionLocation; 056import org.apache.hadoop.hbase.ServerName; 057import org.apache.hadoop.hbase.client.Admin; 058import org.apache.hadoop.hbase.client.Connection; 059import org.apache.hadoop.hbase.client.ConnectionFactory; 060import org.apache.hadoop.hbase.client.RegionInfo; 061import org.apache.hadoop.hbase.client.ResultScanner; 062import org.apache.hadoop.hbase.client.Scan; 063import org.apache.hadoop.hbase.client.Table; 064import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 070import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 071import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 072 073/** 074 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command 075 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode 076 * acknowledges if regions are online after movement while noAck mode is best effort mode that 077 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck 078 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it 079 * anyways. This can also be used by constructiong an Object using the builder and then calling 080 * {@link #load()} or {@link #unload()} methods for the desired operations. 081 */ 082@InterfaceAudience.Public 083public class RegionMover extends AbstractHBaseTool implements Closeable { 084 public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max"; 085 public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max"; 086 public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max"; 087 public static final int DEFAULT_MOVE_RETRIES_MAX = 5; 088 public static final int DEFAULT_MOVE_WAIT_MAX = 60; 089 public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180; 090 091 private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class); 092 093 private RegionMoverBuilder rmbuilder; 094 private boolean ack = true; 095 private int maxthreads = 1; 096 private int timeout; 097 private String loadUnload; 098 private String hostname; 099 private String filename; 100 private String excludeFile; 101 private int port; 102 private Connection conn; 103 private Admin admin; 104 105 private RegionMover(RegionMoverBuilder builder) throws IOException { 106 this.hostname = builder.hostname; 107 this.filename = builder.filename; 108 this.excludeFile = builder.excludeFile; 109 this.maxthreads = builder.maxthreads; 110 this.ack = builder.ack; 111 this.port = builder.port; 112 this.timeout = builder.timeout; 113 setConf(builder.conf); 114 this.conn = ConnectionFactory.createConnection(conf); 115 this.admin = conn.getAdmin(); 116 } 117 118 private RegionMover() { 119 } 120 121 @Override 122 public void close() { 123 IOUtils.closeQuietly(this.admin); 124 IOUtils.closeQuietly(this.conn); 125 } 126 127 /** 128 * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has 129 * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)}, 130 * {@link #ack(boolean)}, {@link #timeout(int)} methods to set the corresponding options 131 */ 132 public static class RegionMoverBuilder { 133 private boolean ack = true; 134 private int maxthreads = 1; 135 private int timeout = Integer.MAX_VALUE; 136 private String hostname; 137 private String filename; 138 private String excludeFile = null; 139 private String defaultDir = System.getProperty("java.io.tmpdir"); 140 @VisibleForTesting 141 final int port; 142 private final Configuration conf; 143 144 public RegionMoverBuilder(String hostname) { 145 this(hostname, createConf()); 146 } 147 148 /** 149 * Creates a new configuration and sets region mover specific overrides 150 */ 151 private static Configuration createConf() { 152 Configuration conf = HBaseConfiguration.create(); 153 conf.setInt("hbase.client.prefetch.limit", 1); 154 conf.setInt("hbase.client.pause", 500); 155 conf.setInt("hbase.client.retries.number", 100); 156 return conf; 157 } 158 159 /** 160 * @param hostname Hostname to unload regions from or load regions to. Can be either hostname 161 * or hostname:port. 162 * @param conf Configuration object 163 */ 164 public RegionMoverBuilder(String hostname, Configuration conf) { 165 String[] splitHostname = hostname.toLowerCase().split(":"); 166 this.hostname = splitHostname[0]; 167 if (splitHostname.length == 2) { 168 this.port = Integer.parseInt(splitHostname[1]); 169 } else { 170 this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); 171 } 172 this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname 173 + ":" + Integer.toString(this.port); 174 this.conf = conf; 175 } 176 177 /** 178 * Path of file where regions will be written to during unloading/read from during loading 179 * @param filename 180 * @return RegionMoverBuilder object 181 */ 182 public RegionMoverBuilder filename(String filename) { 183 this.filename = filename; 184 return this; 185 } 186 187 /** 188 * Set the max number of threads that will be used to move regions 189 */ 190 public RegionMoverBuilder maxthreads(int threads) { 191 this.maxthreads = threads; 192 return this; 193 } 194 195 /** 196 * Path of file containing hostnames to be excluded during region movement. Exclude file should 197 * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single 198 * host. 199 */ 200 public RegionMoverBuilder excludeFile(String excludefile) { 201 this.excludeFile = excludefile; 202 return this; 203 } 204 205 /** 206 * Set ack/noAck mode. 207 * <p> 208 * In ack mode regions are acknowledged before and after moving and the move is retried 209 * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best 210 * effort mode,each region movement is tried once.This can be used during graceful shutdown as 211 * even if we have a stuck region,upon shutdown it'll be reassigned anyway. 212 * <p> 213 * @param ack 214 * @return RegionMoverBuilder object 215 */ 216 public RegionMoverBuilder ack(boolean ack) { 217 this.ack = ack; 218 return this; 219 } 220 221 /** 222 * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for 223 * movers also have a separate time which is hbase.move.wait.max * number of regions to 224 * load/unload 225 * @param timeout in seconds 226 * @return RegionMoverBuilder object 227 */ 228 public RegionMoverBuilder timeout(int timeout) { 229 this.timeout = timeout; 230 return this; 231 } 232 233 /** 234 * This method builds the appropriate RegionMover object which can then be used to load/unload 235 * using load and unload methods 236 * @return RegionMover object 237 */ 238 public RegionMover build() throws IOException { 239 return new RegionMover(this); 240 } 241 } 242 243 /** 244 * Move Regions and make sure that they are up on the target server.If a region movement fails we 245 * exit as failure 246 */ 247 private class MoveWithAck implements Callable<Boolean> { 248 private RegionInfo region; 249 private ServerName targetServer; 250 private List<RegionInfo> movedRegions; 251 private ServerName sourceServer; 252 253 public MoveWithAck(RegionInfo regionInfo, ServerName sourceServer, 254 ServerName targetServer, List<RegionInfo> movedRegions) { 255 this.region = regionInfo; 256 this.targetServer = targetServer; 257 this.movedRegions = movedRegions; 258 this.sourceServer = sourceServer; 259 } 260 261 @Override 262 public Boolean call() throws IOException, InterruptedException { 263 boolean moved = false; 264 int count = 0; 265 int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX); 266 int maxWaitInSeconds = 267 admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 268 long startTime = EnvironmentEdgeManager.currentTime(); 269 boolean sameServer = true; 270 // Assert we can scan the region in its current location 271 isSuccessfulScan(region); 272 LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to " 273 + targetServer); 274 while (count < retries && sameServer) { 275 if (count > 0) { 276 LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries)); 277 } 278 count = count + 1; 279 admin.move(region.getEncodedNameAsBytes(), targetServer); 280 long maxWait = startTime + (maxWaitInSeconds * 1000); 281 while (EnvironmentEdgeManager.currentTime() < maxWait) { 282 sameServer = isSameServer(region, sourceServer); 283 if (!sameServer) { 284 break; 285 } 286 Thread.sleep(100); 287 } 288 } 289 if (sameServer) { 290 LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer 291 + ",newServer=" + this.targetServer); 292 } else { 293 isSuccessfulScan(region); 294 LOG.info("Moved Region " 295 + region.getRegionNameAsString() 296 + " cost:" 297 + String.format("%.3f", 298 (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000)); 299 moved = true; 300 movedRegions.add(region); 301 } 302 return moved; 303 } 304 } 305 306 /** 307 * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the 308 * RS down anyways and not abort on a stuck region. Improves movement performance 309 */ 310 private class MoveWithoutAck implements Callable<Boolean> { 311 private RegionInfo region; 312 private ServerName targetServer; 313 private List<RegionInfo> movedRegions; 314 private ServerName sourceServer; 315 316 public MoveWithoutAck(RegionInfo regionInfo, ServerName sourceServer, 317 ServerName targetServer, List<RegionInfo> movedRegions) { 318 this.region = regionInfo; 319 this.targetServer = targetServer; 320 this.movedRegions = movedRegions; 321 this.sourceServer = sourceServer; 322 } 323 324 @Override 325 public Boolean call() { 326 try { 327 LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to " 328 + targetServer); 329 admin.move(region.getEncodedNameAsBytes(), targetServer); 330 LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to " 331 + targetServer); 332 } catch (Exception e) { 333 LOG.error("Error Moving Region:" + region.getEncodedName(), e); 334 } finally { 335 // we add region to the moved regions list in No Ack Mode since this is best effort 336 movedRegions.add(region); 337 } 338 return true; 339 } 340 } 341 342 /** 343 * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover 344 * Object has to be created using {@link #RegionMover(RegionMoverBuilder)} 345 * @return true if loading succeeded, false otherwise 346 */ 347 public boolean load() throws ExecutionException, InterruptedException, TimeoutException { 348 ExecutorService loadPool = Executors.newFixedThreadPool(1); 349 Future<Boolean> loadTask = loadPool.submit(() -> { 350 try { 351 List<RegionInfo> regionsToMove = readRegionsFromFile(filename); 352 if (regionsToMove.isEmpty()) { 353 LOG.info("No regions to load.Exiting"); 354 return true; 355 } 356 loadRegions(regionsToMove); 357 } catch (Exception e) { 358 LOG.error("Error while loading regions to " + hostname, e); 359 return false; 360 } 361 return true; 362 }); 363 return waitTaskToFinish(loadPool, loadTask, "loading"); 364 } 365 366 private void loadRegions(List<RegionInfo> regionsToMove) 367 throws Exception { 368 ServerName server = getTargetServer(); 369 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 370 LOG.info( 371 "Moving " + regionsToMove.size() + " regions to " + server + " using " + this.maxthreads 372 + " threads.Ack mode:" + this.ack); 373 374 ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 375 List<Future<Boolean>> taskList = new ArrayList<>(); 376 int counter = 0; 377 while (counter < regionsToMove.size()) { 378 RegionInfo region = regionsToMove.get(counter); 379 ServerName currentServer = getServerNameForRegion(region); 380 if (currentServer == null) { 381 LOG.warn( 382 "Could not get server for Region:" + region.getRegionNameAsString() + " moving on"); 383 counter++; 384 continue; 385 } else if (server.equals(currentServer)) { 386 LOG.info( 387 "Region " + region.getRegionNameAsString() + " is already on target server=" + server); 388 counter++; 389 continue; 390 } 391 if (ack) { 392 Future<Boolean> task = 393 moveRegionsPool.submit(new MoveWithAck(region, currentServer, server, movedRegions)); 394 taskList.add(task); 395 } else { 396 Future<Boolean> task = 397 moveRegionsPool.submit(new MoveWithoutAck(region, currentServer, server, movedRegions)); 398 taskList.add(task); 399 } 400 counter++; 401 } 402 403 moveRegionsPool.shutdown(); 404 long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration() 405 .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 406 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 407 } 408 409 /** 410 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 411 * noAck mode we do not make sure that region is successfully online on the target region 412 * server,hence it is best effort.We do not unload regions to hostnames given in 413 * {@link #excludeFile}. 414 * @return true if unloading succeeded, false otherwise 415 */ 416 public boolean unload() throws InterruptedException, ExecutionException, TimeoutException { 417 deleteFile(this.filename); 418 ExecutorService unloadPool = Executors.newFixedThreadPool(1); 419 Future<Boolean> unloadTask = unloadPool.submit(() -> { 420 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 421 try { 422 // Get Online RegionServers 423 List<ServerName> regionServers = new ArrayList<>(); 424 regionServers.addAll(admin.getRegionServers()); 425 // Remove the host Region server from target Region Servers list 426 ServerName server = stripServer(regionServers, hostname, port); 427 if (server == null) { 428 LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.", 429 hostname, port); 430 LOG.debug("List of region servers: {}", regionServers); 431 return false; 432 } 433 // Remove RS present in the exclude file 434 stripExcludes(regionServers); 435 436 // Remove decommissioned RS 437 Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers()); 438 if (CollectionUtils.isNotEmpty(decommissionedRS)) { 439 regionServers.removeIf(decommissionedRS::contains); 440 LOG.debug("Excluded RegionServers from unloading regions to because they " + 441 "are marked as decommissioned. Servers: {}", decommissionedRS); 442 } 443 444 stripMaster(regionServers); 445 if (regionServers.isEmpty()) { 446 LOG.warn("No Regions were moved - no servers available"); 447 return false; 448 } 449 unloadRegions(server, regionServers, movedRegions); 450 } catch (Exception e) { 451 LOG.error("Error while unloading regions ", e); 452 return false; 453 } finally { 454 if (movedRegions != null) { 455 writeFile(filename, movedRegions); 456 } 457 } 458 return true; 459 }); 460 return waitTaskToFinish(unloadPool, unloadTask, "unloading"); 461 } 462 463 private void unloadRegions(ServerName server, List<ServerName> regionServers, 464 List<RegionInfo> movedRegions) throws Exception { 465 while (true) { 466 List<RegionInfo> regionsToMove = admin.getRegions(server); 467 regionsToMove.removeAll(movedRegions); 468 if (regionsToMove.isEmpty()) { 469 LOG.info("No Regions to move....Quitting now"); 470 break; 471 } 472 int counter = 0; 473 LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to " 474 + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:" 475 + ack); 476 ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 477 List<Future<Boolean>> taskList = new ArrayList<>(); 478 int serverIndex = 0; 479 while (counter < regionsToMove.size()) { 480 if (ack) { 481 Future<Boolean> task = moveRegionsPool.submit( 482 new MoveWithAck(regionsToMove.get(counter), server, regionServers.get(serverIndex), 483 movedRegions)); 484 taskList.add(task); 485 } else { 486 Future<Boolean> task = moveRegionsPool.submit( 487 new MoveWithoutAck(regionsToMove.get(counter), server, regionServers.get(serverIndex), 488 movedRegions)); 489 taskList.add(task); 490 } 491 counter++; 492 serverIndex = (serverIndex + 1) % regionServers.size(); 493 } 494 moveRegionsPool.shutdown(); 495 long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration() 496 .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 497 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 498 } 499 } 500 501 private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation) 502 throws TimeoutException, InterruptedException, ExecutionException { 503 pool.shutdown(); 504 try { 505 if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { 506 LOG.warn( 507 "Timed out before finishing the " + operation + " operation. Timeout: " + this.timeout 508 + "sec"); 509 pool.shutdownNow(); 510 } 511 } catch (InterruptedException e) { 512 pool.shutdownNow(); 513 Thread.currentThread().interrupt(); 514 } 515 try { 516 return task.get(5, TimeUnit.SECONDS); 517 } catch (InterruptedException e) { 518 LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e); 519 throw e; 520 } catch (ExecutionException e) { 521 LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e); 522 throw e; 523 } 524 } 525 526 private void waitMoveTasksToFinish(ExecutorService moveRegionsPool, 527 List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception { 528 try { 529 if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { 530 moveRegionsPool.shutdownNow(); 531 } 532 } catch (InterruptedException e) { 533 moveRegionsPool.shutdownNow(); 534 Thread.currentThread().interrupt(); 535 } 536 for (Future<Boolean> future : taskList) { 537 try { 538 // if even after shutdownNow threads are stuck we wait for 5 secs max 539 if (!future.get(5, TimeUnit.SECONDS)) { 540 LOG.error("Was Not able to move region....Exiting Now"); 541 throw new Exception("Could not move region Exception"); 542 } 543 } catch (InterruptedException e) { 544 LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); 545 throw e; 546 } catch (ExecutionException e) { 547 LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e); 548 throw e; 549 } catch (CancellationException e) { 550 LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds 551 + "secs", e); 552 throw e; 553 } 554 } 555 } 556 557 private ServerName getTargetServer() throws Exception { 558 ServerName server = null; 559 int maxWaitInSeconds = 560 admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX); 561 long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000; 562 while (EnvironmentEdgeManager.currentTime() < maxWait) { 563 try { 564 List<ServerName> regionServers = new ArrayList<>(); 565 regionServers.addAll(admin.getRegionServers()); 566 // Remove the host Region server from target Region Servers list 567 server = stripServer(regionServers, hostname, port); 568 if (server != null) { 569 break; 570 } else { 571 LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting"); 572 } 573 } catch (IOException e) { 574 LOG.warn("Could not get list of region servers", e); 575 } 576 Thread.sleep(500); 577 } 578 if (server == null) { 579 LOG.error("Server " + hostname + ":" + port + " is not up. Giving up."); 580 throw new Exception("Server " + hostname + ":" + port + " to load regions not online"); 581 } 582 return server; 583 } 584 585 private List<RegionInfo> readRegionsFromFile(String filename) throws IOException { 586 List<RegionInfo> regions = new ArrayList<>(); 587 File f = new File(filename); 588 if (!f.exists()) { 589 return regions; 590 } 591 try (DataInputStream dis = new DataInputStream( 592 new BufferedInputStream(new FileInputStream(f)))) { 593 int numRegions = dis.readInt(); 594 int index = 0; 595 while (index < numRegions) { 596 regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis))); 597 index++; 598 } 599 } catch (IOException e) { 600 LOG.error("Error while reading regions from file:" + filename, e); 601 throw e; 602 } 603 return regions; 604 } 605 606 /** 607 * Write the number of regions moved in the first line followed by regions moved in subsequent 608 * lines 609 */ 610 private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException { 611 try (DataOutputStream dos = new DataOutputStream( 612 new BufferedOutputStream(new FileOutputStream(filename)))) { 613 dos.writeInt(movedRegions.size()); 614 for (RegionInfo region : movedRegions) { 615 Bytes.writeByteArray(dos, RegionInfo.toByteArray(region)); 616 } 617 } catch (IOException e) { 618 LOG.error( 619 "ERROR: Was Not able to write regions moved to output file but moved " + movedRegions 620 .size() + " regions", e); 621 throw e; 622 } 623 } 624 625 private void deleteFile(String filename) { 626 File f = new File(filename); 627 if (f.exists()) { 628 f.delete(); 629 } 630 } 631 632 /** 633 * @return List of servers from the exclude file in format 'hostname:port'. 634 */ 635 private List<String> readExcludes(String excludeFile) throws IOException { 636 List<String> excludeServers = new ArrayList<>(); 637 if (excludeFile == null) { 638 return excludeServers; 639 } else { 640 try { 641 Files.readAllLines(Paths.get(excludeFile)).stream().map(String::trim) 642 .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase) 643 .forEach(excludeServers::add); 644 } catch (IOException e) { 645 LOG.warn("Exception while reading excludes file, continuing anyways", e); 646 } 647 return excludeServers; 648 } 649 } 650 651 /** 652 * Excludes the servername whose hostname and port portion matches the list given in exclude file 653 */ 654 private void stripExcludes(List<ServerName> regionServers) throws IOException { 655 if (excludeFile != null) { 656 List<String> excludes = readExcludes(excludeFile); 657 Iterator<ServerName> i = regionServers.iterator(); 658 while (i.hasNext()) { 659 String rs = i.next().getServerName(); 660 String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" + rs 661 .split(ServerName.SERVERNAME_SEPARATOR)[1]; 662 if (excludes.contains(rsPort)) { 663 i.remove(); 664 } 665 } 666 LOG.info("Valid Region server targets are:" + regionServers.toString()); 667 LOG.info("Excluded Servers are" + excludes.toString()); 668 } 669 } 670 671 /** 672 * Exclude master from list of RSs to move regions to 673 */ 674 private void stripMaster(List<ServerName> regionServers) throws IOException { 675 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); 676 stripServer(regionServers, master.getHostname(), master.getPort()); 677 } 678 679 /** 680 * Remove the servername whose hostname and port portion matches from the passed array of servers. 681 * Returns as side-effect the servername removed. 682 * @return server removed from list of Region Servers 683 */ 684 private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) { 685 for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) { 686 ServerName server = iter.next(); 687 if (server.getAddress().getHostname().equalsIgnoreCase(hostname) && 688 server.getAddress().getPort() == port) { 689 iter.remove(); 690 return server; 691 } 692 } 693 return null; 694 } 695 696 /** 697 * Tries to scan a row from passed region 698 */ 699 private void isSuccessfulScan(RegionInfo region) throws IOException { 700 Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit() 701 .setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter()) 702 .setCacheBlocks(false); 703 try (Table table = conn.getTable(region.getTable()); 704 ResultScanner scanner = table.getScanner(scan)) { 705 scanner.next(); 706 } catch (IOException e) { 707 LOG.error("Could not scan region:" + region.getEncodedName(), e); 708 throw e; 709 } 710 } 711 712 /** 713 * Returns true if passed region is still on serverName when we look at hbase:meta. 714 * @return true if region is hosted on serverName otherwise false 715 */ 716 private boolean isSameServer(RegionInfo region, ServerName serverName) 717 throws IOException { 718 ServerName serverForRegion = getServerNameForRegion(region); 719 if (serverForRegion != null && serverForRegion.equals(serverName)) { 720 return true; 721 } 722 return false; 723 } 724 725 /** 726 * Get servername that is up in hbase:meta hosting the given region. this is hostname + port + 727 * startcode comma-delimited. Can return null 728 * @return regionServer hosting the given region 729 */ 730 private ServerName getServerNameForRegion(RegionInfo region) throws IOException { 731 if (!admin.isTableEnabled(region.getTable())) { 732 return null; 733 } 734 HRegionLocation loc = 735 conn.getRegionLocator(region.getTable()).getRegionLocation(region.getStartKey(), 736 region.getReplicaId(),true); 737 if (loc != null) { 738 return loc.getServerName(); 739 } else { 740 return null; 741 } 742 } 743 744 @Override 745 protected void addOptions() { 746 this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>"); 747 this.addRequiredOptWithArg("o", "operation", "Expected: load/unload"); 748 this.addOptWithArg("m", "maxthreads", 749 "Define the maximum number of threads to use to unload and reload the regions"); 750 this.addOptWithArg("x", "excludefile", 751 "File with <hostname:port> per line to exclude as unload targets; default excludes only " 752 + "target host; useful for rack decommisioning."); 753 this.addOptWithArg("f", "filename", 754 "File to save regions list into unloading, or read from loading; " 755 + "default /tmp/<usernamehostname:port>"); 756 this.addOptNoArg("n", "noack", 757 "Turn on No-Ack mode(default: false) which won't check if region is online on target " 758 + "RegionServer, hence best effort. This is more performant in unloading and loading " 759 + "but might lead to region being unavailable for some time till master reassigns it " 760 + "in case the move failed"); 761 this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit " 762 + "irrespective of whether it finished or not;default Integer.MAX_VALUE"); 763 } 764 765 @Override 766 protected void processOptions(CommandLine cmd) { 767 String hostname = cmd.getOptionValue("r"); 768 rmbuilder = new RegionMoverBuilder(hostname); 769 if (cmd.hasOption('m')) { 770 rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m'))); 771 } 772 if (cmd.hasOption('n')) { 773 rmbuilder.ack(false); 774 } 775 if (cmd.hasOption('f')) { 776 rmbuilder.filename(cmd.getOptionValue('f')); 777 } 778 if (cmd.hasOption('x')) { 779 rmbuilder.excludeFile(cmd.getOptionValue('x')); 780 } 781 if (cmd.hasOption('t')) { 782 rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t'))); 783 } 784 this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT); 785 } 786 787 @Override 788 protected int doWork() throws Exception { 789 boolean success; 790 try (RegionMover rm = rmbuilder.build()) { 791 if (loadUnload.equalsIgnoreCase("load")) { 792 success = rm.load(); 793 } else if (loadUnload.equalsIgnoreCase("unload")) { 794 success = rm.unload(); 795 } else { 796 printUsage(); 797 success = false; 798 } 799 } 800 return (success ? 0 : 1); 801 } 802 803 public static void main(String[] args) { 804 try (RegionMover mover = new RegionMover()) { 805 mover.doStaticMain(args); 806 } 807 } 808}