001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.BufferedInputStream; 021import java.io.BufferedOutputStream; 022import java.io.Closeable; 023import java.io.DataInputStream; 024import java.io.DataOutputStream; 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.net.InetAddress; 030import java.nio.file.Files; 031import java.nio.file.Paths; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Collection; 035import java.util.Collections; 036import java.util.EnumSet; 037import java.util.HashSet; 038import java.util.Iterator; 039import java.util.List; 040import java.util.Locale; 041import java.util.Optional; 042import java.util.Set; 043import java.util.concurrent.Callable; 044import java.util.concurrent.CancellationException; 045import java.util.concurrent.ExecutionException; 046import java.util.concurrent.ExecutorService; 047import java.util.concurrent.Executors; 048import java.util.concurrent.Future; 049import java.util.concurrent.TimeUnit; 050import java.util.concurrent.TimeoutException; 051import java.util.function.Predicate; 052import org.apache.commons.io.IOUtils; 053import org.apache.hadoop.conf.Configuration; 054import org.apache.hadoop.hbase.ClusterMetrics.Option; 055import org.apache.hadoop.hbase.HBaseConfiguration; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HRegionLocation; 058import org.apache.hadoop.hbase.MetaTableAccessor; 059import org.apache.hadoop.hbase.ServerName; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.UnknownRegionException; 062import org.apache.hadoop.hbase.client.Admin; 063import org.apache.hadoop.hbase.client.Connection; 064import org.apache.hadoop.hbase.client.ConnectionFactory; 065import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 066import org.apache.hadoop.hbase.client.RegionInfo; 067import org.apache.hadoop.hbase.client.RegionInfoBuilder; 068import org.apache.hadoop.hbase.client.Result; 069import org.apache.hadoop.hbase.master.RackManager; 070import org.apache.hadoop.hbase.master.RegionState; 071import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 072import org.apache.hadoop.hbase.net.Address; 073import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 074import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 075import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 076import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses; 082import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 083import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 084 085/** 086 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command 087 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode 088 * acknowledges if regions are online after movement while noAck mode is best effort mode that 089 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck 090 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it 091 * anyways. This can also be used by constructiong an Object using the builder and then calling 092 * {@link #load()} or {@link #unload()} methods for the desired operations. 093 */ 094@InterfaceAudience.Public 095public class RegionMover extends AbstractHBaseTool implements Closeable { 096 public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max"; 097 public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max"; 098 public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max"; 099 public static final int DEFAULT_MOVE_RETRIES_MAX = 5; 100 public static final int DEFAULT_MOVE_WAIT_MAX = 60; 101 public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180; 102 103 private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class); 104 105 private RegionMoverBuilder rmbuilder; 106 private boolean ack = true; 107 private int maxthreads = 1; 108 private int timeout; 109 private List<String> isolateRegionIdArray; 110 private String loadUnload; 111 private String hostname; 112 private String filename; 113 private String excludeFile; 114 private String designatedFile; 115 private int port; 116 private Connection conn; 117 private Admin admin; 118 private RackManager rackManager; 119 120 private RegionMover(RegionMoverBuilder builder) throws IOException { 121 this.hostname = builder.hostname; 122 this.filename = builder.filename; 123 this.excludeFile = builder.excludeFile; 124 this.designatedFile = builder.designatedFile; 125 this.maxthreads = builder.maxthreads; 126 this.isolateRegionIdArray = builder.isolateRegionIdArray; 127 this.ack = builder.ack; 128 this.port = builder.port; 129 this.timeout = builder.timeout; 130 setConf(builder.conf); 131 this.conn = ConnectionFactory.createConnection(conf); 132 this.admin = conn.getAdmin(); 133 134 // if the hostname of master is ip, it indicates that the master/RS has enabled use-ip, we need 135 // to resolve the current hostname to ip to ensure that the RegionMover logic can be executed 136 // normally, see HBASE-27304 for details. 137 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); 138 if (InetAddresses.isInetAddress(master.getHostname())) { 139 if (!InetAddresses.isInetAddress(this.hostname)) { 140 this.hostname = InetAddress.getByName(this.hostname).getHostAddress(); 141 } 142 } 143 144 // Only while running unit tests, builder.rackManager will not be null for the convenience of 145 // providing custom rackManager. Otherwise for regular workflow/user triggered action, 146 // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is 147 // provided as @InterfaceAudience.Private and it is commented that this is just 148 // to be used by unit test. 149 rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager; 150 } 151 152 private RegionMover() { 153 } 154 155 @Override 156 public void close() { 157 IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e)); 158 IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e)); 159 } 160 161 /** 162 * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has 163 * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)}, 164 * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set 165 * the corresponding options. 166 */ 167 public static class RegionMoverBuilder { 168 private boolean ack = true; 169 private int maxthreads = 1; 170 private int timeout = Integer.MAX_VALUE; 171 private List<String> isolateRegionIdArray = new ArrayList<>(); 172 private String hostname; 173 private String filename; 174 private String excludeFile = null; 175 private String designatedFile = null; 176 private String defaultDir = System.getProperty("java.io.tmpdir"); 177 @InterfaceAudience.Private 178 final int port; 179 private final Configuration conf; 180 private RackManager rackManager; 181 182 public RegionMoverBuilder(String hostname) { 183 this(hostname, createConf()); 184 } 185 186 /** 187 * Creates a new configuration and sets region mover specific overrides 188 */ 189 private static Configuration createConf() { 190 final Configuration conf = HBaseConfiguration.create(); 191 return overrideConf(conf); 192 } 193 194 private static Configuration overrideConf(Configuration conf) { 195 conf.setInt("hbase.client.prefetch.limit", 1); 196 conf.setInt("hbase.client.pause", 500); 197 conf.setInt("hbase.client.retries.number", 100); 198 return conf; 199 } 200 201 /** 202 * @param hostname Hostname to unload regions from or load regions to. Can be either hostname or 203 * hostname:port. 204 * @param conf Configuration object 205 */ 206 public RegionMoverBuilder(String hostname, Configuration conf) { 207 String[] splitHostname = hostname.toLowerCase().split(":"); 208 this.hostname = splitHostname[0]; 209 if (splitHostname.length == 2) { 210 this.port = Integer.parseInt(splitHostname[1]); 211 } else { 212 this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); 213 } 214 this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname 215 + ":" + Integer.toString(this.port); 216 this.conf = overrideConf(new Configuration(conf)); 217 } 218 219 /** 220 * Path of file where regions will be written to during unloading/read from during loading 221 * @return RegionMoverBuilder object 222 */ 223 public RegionMoverBuilder filename(String filename) { 224 this.filename = filename; 225 return this; 226 } 227 228 /** 229 * Set the max number of threads that will be used to move regions 230 */ 231 public RegionMoverBuilder maxthreads(int threads) { 232 this.maxthreads = threads; 233 return this; 234 } 235 236 /** 237 * Set the region ID to isolate on the region server. 238 */ 239 public RegionMoverBuilder isolateRegionIdArray(List<String> isolateRegionIdArray) { 240 this.isolateRegionIdArray = isolateRegionIdArray; 241 return this; 242 } 243 244 /** 245 * Path of file containing hostnames to be excluded during region movement. Exclude file should 246 * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single 247 * host. 248 */ 249 public RegionMoverBuilder excludeFile(String excludefile) { 250 this.excludeFile = excludefile; 251 return this; 252 } 253 254 /** 255 * Set the designated file. Designated file contains hostnames where region moves. Designated 256 * file should have 'host:port' per line. Port is mandatory here as we can have many RS running 257 * on a single host. 258 * @param designatedFile The designated file 259 * @return RegionMoverBuilder object 260 */ 261 public RegionMoverBuilder designatedFile(String designatedFile) { 262 this.designatedFile = designatedFile; 263 return this; 264 } 265 266 /** 267 * Set ack/noAck mode. 268 * <p> 269 * In ack mode regions are acknowledged before and after moving and the move is retried 270 * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best 271 * effort mode,each region movement is tried once.This can be used during graceful shutdown as 272 * even if we have a stuck region,upon shutdown it'll be reassigned anyway. 273 * <p> 274 * @return RegionMoverBuilder object 275 */ 276 public RegionMoverBuilder ack(boolean ack) { 277 this.ack = ack; 278 return this; 279 } 280 281 /** 282 * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for 283 * movers also have a separate time which is hbase.move.wait.max * number of regions to 284 * load/unload 285 * @param timeout in seconds 286 * @return RegionMoverBuilder object 287 */ 288 public RegionMoverBuilder timeout(int timeout) { 289 this.timeout = timeout; 290 return this; 291 } 292 293 /** 294 * Set specific rackManager implementation. This setter method is for testing purpose only. 295 * @param rackManager rackManager impl 296 * @return RegionMoverBuilder object 297 */ 298 @InterfaceAudience.Private 299 public RegionMoverBuilder rackManager(RackManager rackManager) { 300 this.rackManager = rackManager; 301 return this; 302 } 303 304 /** 305 * This method builds the appropriate RegionMover object which can then be used to load/unload 306 * using load and unload methods 307 * @return RegionMover object 308 */ 309 public RegionMover build() throws IOException { 310 return new RegionMover(this); 311 } 312 } 313 314 /** 315 * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover 316 * Object has to be created using {@link #RegionMover(RegionMoverBuilder)} 317 * @return true if loading succeeded, false otherwise 318 */ 319 public boolean load() throws ExecutionException, InterruptedException, TimeoutException { 320 ExecutorService loadPool = Executors.newFixedThreadPool(1); 321 Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan()); 322 boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading"); 323 if (!isMetaMoved) { 324 return false; 325 } 326 loadPool = Executors.newFixedThreadPool(1); 327 loadTask = loadPool.submit(getNonMetaRegionsMovePlan()); 328 return waitTaskToFinish(loadPool, loadTask, "loading"); 329 } 330 331 private Callable<Boolean> getMetaRegionMovePlan() { 332 return getRegionsMovePlan(true); 333 } 334 335 private Callable<Boolean> getNonMetaRegionsMovePlan() { 336 return getRegionsMovePlan(false); 337 } 338 339 private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) { 340 return () -> { 341 try { 342 List<RegionInfo> regionsToMove = readRegionsFromFile(filename); 343 if (regionsToMove.isEmpty()) { 344 LOG.info("No regions to load.Exiting"); 345 return true; 346 } 347 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove); 348 if (moveMetaRegion) { 349 if (metaRegion.isPresent()) { 350 loadRegions(Collections.singletonList(metaRegion.get())); 351 } 352 } else { 353 metaRegion.ifPresent(regionsToMove::remove); 354 loadRegions(regionsToMove); 355 } 356 } catch (Exception e) { 357 LOG.error("Error while loading regions to " + hostname, e); 358 return false; 359 } 360 return true; 361 }; 362 } 363 364 private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) { 365 return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst(); 366 } 367 368 private void loadRegions(List<RegionInfo> regionsToMove) throws Exception { 369 ServerName server = getTargetServer(); 370 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 371 LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using " 372 + this.maxthreads + " threads.Ack mode:" + this.ack); 373 374 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 375 List<Future<Boolean>> taskList = new ArrayList<>(); 376 int counter = 0; 377 while (counter < regionsToMove.size()) { 378 RegionInfo region = regionsToMove.get(counter); 379 ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn); 380 if (currentServer == null) { 381 LOG 382 .warn("Could not get server for Region:" + region.getRegionNameAsString() + " moving on"); 383 counter++; 384 continue; 385 } else if (server.equals(currentServer)) { 386 LOG.info( 387 "Region " + region.getRegionNameAsString() + " is already on target server=" + server); 388 counter++; 389 continue; 390 } 391 if (ack) { 392 Future<Boolean> task = moveRegionsPool 393 .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions)); 394 taskList.add(task); 395 } else { 396 Future<Boolean> task = moveRegionsPool 397 .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions)); 398 taskList.add(task); 399 } 400 counter++; 401 } 402 403 moveRegionsPool.shutdown(); 404 long timeoutInSeconds = regionsToMove.size() 405 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 406 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 407 } 408 409 /** 410 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 411 * noAck mode we do not make sure that region is successfully online on the target region 412 * server,hence it is best effort.We do not unload regions to hostnames given in 413 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 414 * to hostnames provided in {@link #designatedFile} 415 * @return true if unloading succeeded, false otherwise 416 */ 417 public boolean unload() throws InterruptedException, ExecutionException, TimeoutException { 418 return unloadRegions(false); 419 } 420 421 /** 422 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 423 * noAck mode we do not make sure that region is successfully online on the target region 424 * server,hence it is best effort.We do not unload regions to hostnames given in 425 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 426 * to hostnames provided in {@link #designatedFile}. While unloading regions, destination 427 * RegionServers are selected from different rack i.e regions should not move to any RegionServers 428 * that belong to same rack as source RegionServer. 429 * @return true if unloading succeeded, false otherwise 430 */ 431 public boolean unloadFromRack() 432 throws InterruptedException, ExecutionException, TimeoutException { 433 return unloadRegions(true); 434 } 435 436 private boolean unloadRegions(boolean unloadFromRack) 437 throws ExecutionException, InterruptedException, TimeoutException { 438 return unloadRegions(unloadFromRack, null); 439 } 440 441 /** 442 * Isolated regions specified in {@link #isolateRegionIdArray} on {@link #hostname} in ack Mode 443 * and Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}. 444 * In noAck mode we do not make sure that region is successfully online on the target region 445 * server,hence it is the best effort. We do not unload regions to hostnames given in 446 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 447 * to hostnames provided in {@link #designatedFile} 448 * @return true if region isolation succeeded, false otherwise 449 */ 450 public boolean isolateRegions() 451 throws ExecutionException, InterruptedException, TimeoutException { 452 return unloadRegions(false, isolateRegionIdArray); 453 } 454 455 private boolean unloadRegions(boolean unloadFromRack, List<String> isolateRegionIdArray) 456 throws InterruptedException, ExecutionException, TimeoutException { 457 deleteFile(this.filename); 458 ExecutorService unloadPool = Executors.newFixedThreadPool(1); 459 Future<Boolean> unloadTask = unloadPool.submit(() -> { 460 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 461 try { 462 // Get Online RegionServers 463 List<ServerName> regionServers = new ArrayList<>(); 464 RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port)); 465 LOG.info("{} belongs to {}", hostname, rsgroup.getName()); 466 regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers())); 467 // Remove the host Region server from target Region Servers list 468 ServerName server = stripServer(regionServers, hostname, port); 469 if (server == null) { 470 LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.", 471 hostname, port); 472 LOG.debug("List of region servers: {}", regionServers); 473 return false; 474 } 475 // Remove RS not present in the designated file 476 includeExcludeRegionServers(designatedFile, regionServers, true); 477 478 // Remove RS present in the exclude file 479 includeExcludeRegionServers(excludeFile, regionServers, false); 480 481 if (unloadFromRack) { 482 // remove regionServers that belong to same rack (as source host) since the goal is to 483 // unload regions from source regionServer to destination regionServers 484 // that belong to different rack only. 485 String sourceRack = rackManager.getRack(server); 486 List<String> racks = rackManager.getRack(regionServers); 487 Iterator<ServerName> iterator = regionServers.iterator(); 488 int i = 0; 489 while (iterator.hasNext()) { 490 iterator.next(); 491 if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) { 492 iterator.remove(); 493 } 494 i++; 495 } 496 } 497 498 // Remove decommissioned RS 499 Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers()); 500 if (CollectionUtils.isNotEmpty(decommissionedRS)) { 501 regionServers.removeIf(decommissionedRS::contains); 502 LOG.debug("Excluded RegionServers from unloading regions to because they " 503 + "are marked as decommissioned. Servers: {}", decommissionedRS); 504 } 505 506 stripMaster(regionServers); 507 if (regionServers.isEmpty()) { 508 LOG.warn("No Regions were moved - no servers available"); 509 return false; 510 } else { 511 LOG.info("Available servers {}", regionServers); 512 } 513 unloadRegions(server, regionServers, movedRegions, isolateRegionIdArray); 514 } catch (Exception e) { 515 LOG.error("Error while unloading regions ", e); 516 return false; 517 } finally { 518 if (movedRegions != null) { 519 writeFile(filename, movedRegions); 520 } 521 } 522 return true; 523 }); 524 return waitTaskToFinish(unloadPool, unloadTask, "unloading"); 525 } 526 527 @InterfaceAudience.Private 528 Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup, 529 Collection<ServerName> onlineServers) { 530 if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 531 return onlineServers; 532 } 533 List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size()); 534 for (ServerName server : onlineServers) { 535 Address address = Address.fromParts(server.getHostname(), server.getPort()); 536 if (rsgroup.containsServer(address)) { 537 serverLists.add(server); 538 } 539 } 540 return serverLists; 541 } 542 543 private void unloadRegions(ServerName server, List<ServerName> regionServers, 544 List<RegionInfo> movedRegions, List<String> isolateRegionIdArray) throws Exception { 545 while (true) { 546 List<RegionInfo> isolateRegionInfoList = Collections.synchronizedList(new ArrayList<>()); 547 RegionInfo isolateRegionInfo = null; 548 if (isolateRegionIdArray != null && !isolateRegionIdArray.isEmpty()) { 549 // Region will be moved to target region server with Ack mode. 550 final ExecutorService isolateRegionPool = Executors.newFixedThreadPool(maxthreads); 551 List<Future<Boolean>> isolateRegionTaskList = new ArrayList<>(); 552 List<RegionInfo> recentlyIsolatedRegion = Collections.synchronizedList(new ArrayList<>()); 553 boolean allRegionOpsSuccessful = true; 554 boolean isMetaIsolated = false; 555 RegionInfo metaRegionInfo = RegionInfoBuilder.FIRST_META_REGIONINFO; 556 List<HRegionLocation> hRegionLocationRegionIsolation = 557 Collections.synchronizedList(new ArrayList<>()); 558 for (String isolateRegionId : isolateRegionIdArray) { 559 if (isolateRegionId.equalsIgnoreCase(metaRegionInfo.getEncodedName())) { 560 isMetaIsolated = true; 561 continue; 562 } 563 Result result = MetaTableAccessor.scanByRegionEncodedName(conn, isolateRegionId); 564 HRegionLocation hRegionLocation = 565 MetaTableAccessor.getRegionLocation(conn, result.getRow()); 566 if (hRegionLocation != null) { 567 hRegionLocationRegionIsolation.add(hRegionLocation); 568 } else { 569 LOG.error("Region " + isolateRegionId + " doesn't exists/can't fetch from" 570 + " meta...Quitting now"); 571 // We only move the regions if all the regions were found. 572 allRegionOpsSuccessful = false; 573 break; 574 } 575 } 576 577 if (!allRegionOpsSuccessful) { 578 break; 579 } 580 // If hbase:meta region was isolated, then it needs to be part of isolateRegionInfoList. 581 if (isMetaIsolated) { 582 ZKWatcher zkWatcher = new ZKWatcher(conf, null, null); 583 List<HRegionLocation> result = new ArrayList<>(); 584 for (String znode : zkWatcher.getMetaReplicaNodes()) { 585 String path = ZNodePaths.joinZNode(zkWatcher.getZNodePaths().baseZNode, znode); 586 int replicaId = zkWatcher.getZNodePaths().getMetaReplicaIdFromPath(path); 587 RegionState state = MetaTableLocator.getMetaRegionState(zkWatcher, replicaId); 588 result.add(new HRegionLocation(state.getRegion(), state.getServerName())); 589 } 590 ServerName metaSeverName = result.get(0).getServerName(); 591 // For isolating hbase:meta, it should move explicitly in Ack mode, 592 // hence the forceMoveRegionByAck = true. 593 if (!metaSeverName.equals(server)) { 594 LOG.info("Region of {} {} is on server {} moving to {}", TableName.META_TABLE_NAME, 595 metaRegionInfo.getEncodedName(), metaSeverName, server); 596 submitRegionMovesWhileUnloading(metaSeverName, Collections.singletonList(server), 597 movedRegions, Collections.singletonList(metaRegionInfo), true); 598 } else { 599 LOG.info("Region of {} {} already exists on server: {}", TableName.META_TABLE_NAME, 600 metaRegionInfo.getEncodedName(), server); 601 } 602 isolateRegionInfoList.add(RegionInfoBuilder.FIRST_META_REGIONINFO); 603 } 604 605 if (!hRegionLocationRegionIsolation.isEmpty()) { 606 for (HRegionLocation hRegionLocation : hRegionLocationRegionIsolation) { 607 isolateRegionInfo = hRegionLocation.getRegion(); 608 isolateRegionInfoList.add(isolateRegionInfo); 609 if (hRegionLocation.getServerName() == server) { 610 LOG.info("Region " + hRegionLocation.getRegion().getEncodedName() + " already exists" 611 + " on server : " + server.getHostname()); 612 } else { 613 Future<Boolean> isolateRegionTask = 614 isolateRegionPool.submit(new MoveWithAck(conn, isolateRegionInfo, 615 hRegionLocation.getServerName(), server, recentlyIsolatedRegion)); 616 isolateRegionTaskList.add(isolateRegionTask); 617 } 618 } 619 } 620 621 if (!isolateRegionTaskList.isEmpty()) { 622 isolateRegionPool.shutdown(); 623 // Now that we have fetched all the region's regionInfo, we can move them. 624 waitMoveTasksToFinish(isolateRegionPool, isolateRegionTaskList, 625 admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX)); 626 627 Set<RegionInfo> currentRegionsOnTheServer = new HashSet<>(admin.getRegions(server)); 628 if (!currentRegionsOnTheServer.containsAll(isolateRegionInfoList)) { 629 // If all the regions are not online on the target server, 630 // we don't put RS in decommission mode and exit from here. 631 LOG.error("One of the Region move failed OR stuck in transition...Quitting now"); 632 break; 633 } 634 } else { 635 LOG.info("All regions already exists on server : " + server.getHostname()); 636 } 637 // Once region has been moved to target RS, put the target RS into decommission mode, 638 // so master doesn't assign new region to the target RS while we unload the target RS. 639 // Also pass 'offload' flag as false since we don't want master to offload the target RS. 640 List<ServerName> listOfServer = new ArrayList<>(); 641 listOfServer.add(server); 642 LOG.info("Putting server : " + server.getHostname() + " in decommission/draining mode"); 643 admin.decommissionRegionServers(listOfServer, false); 644 } 645 List<RegionInfo> regionsToMove = admin.getRegions(server); 646 // Remove all the regions from the online Region list, that we just isolated. 647 // This will also include hbase:meta if it was isolated. 648 regionsToMove.removeAll(isolateRegionInfoList); 649 regionsToMove.removeAll(movedRegions); 650 if (regionsToMove.isEmpty()) { 651 LOG.info("No Regions to move....Quitting now"); 652 break; 653 } 654 LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}", 655 regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack); 656 657 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove); 658 if (metaRegion.isPresent()) { 659 RegionInfo meta = metaRegion.get(); 660 // hbase:meta should move explicitly in Ack mode. 661 submitRegionMovesWhileUnloading(server, regionServers, movedRegions, 662 Collections.singletonList(meta), true); 663 regionsToMove.remove(meta); 664 } 665 submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove, false); 666 } 667 } 668 669 private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers, 670 List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove, boolean forceMoveRegionByAck) 671 throws Exception { 672 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 673 List<Future<Boolean>> taskList = new ArrayList<>(); 674 int serverIndex = 0; 675 for (RegionInfo regionToMove : regionsToMove) { 676 // To move/isolate hbase:meta on a server, it should happen explicitly by Ack mode, hence the 677 // forceMoveRegionByAck = true. 678 if (ack || forceMoveRegionByAck) { 679 Future<Boolean> task = moveRegionsPool.submit(new MoveWithAck(conn, regionToMove, server, 680 regionServers.get(serverIndex), movedRegions)); 681 taskList.add(task); 682 } else { 683 Future<Boolean> task = moveRegionsPool.submit(new MoveWithoutAck(admin, regionToMove, 684 server, regionServers.get(serverIndex), movedRegions)); 685 taskList.add(task); 686 } 687 serverIndex = (serverIndex + 1) % regionServers.size(); 688 } 689 moveRegionsPool.shutdown(); 690 long timeoutInSeconds = regionsToMove.size() 691 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 692 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 693 } 694 695 private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation) 696 throws TimeoutException, InterruptedException, ExecutionException { 697 pool.shutdown(); 698 try { 699 if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { 700 LOG.warn("Timed out before finishing the " + operation + " operation. Timeout: " 701 + this.timeout + "sec"); 702 pool.shutdownNow(); 703 } 704 } catch (InterruptedException e) { 705 pool.shutdownNow(); 706 Thread.currentThread().interrupt(); 707 } 708 try { 709 return task.get(5, TimeUnit.SECONDS); 710 } catch (InterruptedException e) { 711 LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e); 712 throw e; 713 } catch (ExecutionException e) { 714 LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e); 715 throw e; 716 } 717 } 718 719 private void waitMoveTasksToFinish(ExecutorService moveRegionsPool, 720 List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception { 721 try { 722 if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { 723 moveRegionsPool.shutdownNow(); 724 } 725 } catch (InterruptedException e) { 726 moveRegionsPool.shutdownNow(); 727 Thread.currentThread().interrupt(); 728 } 729 for (Future<Boolean> future : taskList) { 730 try { 731 // if even after shutdownNow threads are stuck we wait for 5 secs max 732 if (!future.get(5, TimeUnit.SECONDS)) { 733 LOG.error("Was Not able to move region....Exiting Now"); 734 throw new Exception("Could not move region Exception"); 735 } 736 } catch (InterruptedException e) { 737 LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); 738 throw e; 739 } catch (ExecutionException e) { 740 boolean ignoreFailure = ignoreRegionMoveFailure(e); 741 if (ignoreFailure) { 742 LOG.debug("Ignore region move failure, it might have been split/merged.", e); 743 } else { 744 LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e); 745 throw e; 746 } 747 } catch (CancellationException e) { 748 LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds 749 + "secs", e); 750 throw e; 751 } 752 } 753 } 754 755 private boolean ignoreRegionMoveFailure(ExecutionException e) { 756 boolean ignoreFailure = false; 757 if (e.getCause() instanceof UnknownRegionException) { 758 // region does not exist anymore 759 ignoreFailure = true; 760 } else if ( 761 e.getCause() instanceof DoNotRetryRegionException && e.getCause().getMessage() != null 762 && e.getCause().getMessage() 763 .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,") 764 ) { 765 // region is recently split 766 ignoreFailure = true; 767 } 768 return ignoreFailure; 769 } 770 771 private ServerName getTargetServer() throws Exception { 772 ServerName server = null; 773 int maxWaitInSeconds = 774 admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX); 775 long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000; 776 while (EnvironmentEdgeManager.currentTime() < maxWait) { 777 try { 778 List<ServerName> regionServers = new ArrayList<>(); 779 regionServers.addAll(admin.getRegionServers()); 780 // Remove the host Region server from target Region Servers list 781 server = stripServer(regionServers, hostname, port); 782 if (server != null) { 783 break; 784 } else { 785 LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting"); 786 } 787 } catch (IOException e) { 788 LOG.warn("Could not get list of region servers", e); 789 } 790 Thread.sleep(500); 791 } 792 if (server == null) { 793 LOG.error("Server " + hostname + ":" + port + " is not up. Giving up."); 794 throw new Exception("Server " + hostname + ":" + port + " to load regions not online"); 795 } 796 return server; 797 } 798 799 private List<RegionInfo> readRegionsFromFile(String filename) throws IOException { 800 List<RegionInfo> regions = new ArrayList<>(); 801 File f = new File(filename); 802 if (!f.exists()) { 803 return regions; 804 } 805 try ( 806 DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(f)))) { 807 int numRegions = dis.readInt(); 808 int index = 0; 809 while (index < numRegions) { 810 regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis))); 811 index++; 812 } 813 } catch (IOException e) { 814 LOG.error("Error while reading regions from file:" + filename, e); 815 throw e; 816 } 817 return regions; 818 } 819 820 /** 821 * Write the number of regions moved in the first line followed by regions moved in subsequent 822 * lines 823 */ 824 private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException { 825 try (DataOutputStream dos = 826 new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filename)))) { 827 dos.writeInt(movedRegions.size()); 828 for (RegionInfo region : movedRegions) { 829 Bytes.writeByteArray(dos, RegionInfo.toByteArray(region)); 830 } 831 } catch (IOException e) { 832 LOG.error("ERROR: Was Not able to write regions moved to output file but moved " 833 + movedRegions.size() + " regions", e); 834 throw e; 835 } 836 } 837 838 private void deleteFile(String filename) { 839 File f = new File(filename); 840 if (f.exists()) { 841 f.delete(); 842 } 843 } 844 845 /** 846 * @param filename The file should have 'host:port' per line 847 * @return List of servers from the file in format 'hostname:port'. 848 */ 849 private List<String> readServersFromFile(String filename) throws IOException { 850 List<String> servers = new ArrayList<>(); 851 if (filename != null) { 852 try { 853 Files.readAllLines(Paths.get(filename)).stream().map(String::trim) 854 .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase) 855 .forEach(servers::add); 856 } catch (IOException e) { 857 LOG.error("Exception while reading servers from file,", e); 858 throw e; 859 } 860 } 861 return servers; 862 } 863 864 /** 865 * Designates or excludes the servername whose hostname and port portion matches the list given in 866 * the file. Example:<br> 867 * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and 868 * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and RS3 869 * are removed from regionServers list so that regions can move to only RS1. If you want to 870 * exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. When we call 871 * includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from regionServers 872 * list so that regions can move to only RS2 and RS3. 873 */ 874 private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers, 875 boolean isInclude) throws IOException { 876 if (fileName != null) { 877 List<String> servers = readServersFromFile(fileName); 878 if (servers.isEmpty()) { 879 LOG.warn("No servers provided in the file: {}." + fileName); 880 return; 881 } 882 Iterator<ServerName> i = regionServers.iterator(); 883 while (i.hasNext()) { 884 String rs = i.next().getServerName(); 885 String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" 886 + rs.split(ServerName.SERVERNAME_SEPARATOR)[1]; 887 if (isInclude != servers.contains(rsPort)) { 888 i.remove(); 889 } 890 } 891 } 892 } 893 894 /** 895 * Exclude master from list of RSs to move regions to 896 */ 897 private void stripMaster(List<ServerName> regionServers) throws IOException { 898 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); 899 stripServer(regionServers, master.getHostname(), master.getPort()); 900 } 901 902 /** 903 * Remove the servername whose hostname and port portion matches from the passed array of servers. 904 * Returns as side-effect the servername removed. 905 * @return server removed from list of Region Servers 906 */ 907 private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) { 908 for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) { 909 ServerName server = iter.next(); 910 if ( 911 server.getAddress().getHostName().equalsIgnoreCase(hostname) 912 && server.getAddress().getPort() == port 913 ) { 914 iter.remove(); 915 return server; 916 } 917 } 918 return null; 919 } 920 921 @Override 922 protected void addOptions() { 923 this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>"); 924 this.addRequiredOptWithArg("o", "operation", 925 "Expected: load/unload/unload_from_rack/isolate_regions"); 926 this.addOptWithArg("m", "maxthreads", 927 "Define the maximum number of threads to use to unload and reload the regions"); 928 this.addOptWithArg("i", "isolateRegionIds", 929 "Comma separated list of Region IDs hash to isolate on a RegionServer and put region server" 930 + " in draining mode. This option should only be used with '-o isolate_regions'." 931 + " By putting region server in decommission/draining mode, master can't assign any" 932 + " new region on this server. If one or more regions are not found OR failed to isolate" 933 + " successfully, utility will exist without putting RS in draining/decommission mode." 934 + " Ex. --isolateRegionIds id1,id2,id3 OR -i id1,id2,id3"); 935 this.addOptWithArg("x", "excludefile", 936 "File with <hostname:port> per line to exclude as unload targets; default excludes only " 937 + "target host; useful for rack decommisioning."); 938 this.addOptWithArg("d", "designatedfile", 939 "File with <hostname:port> per line as unload targets;" + "default is all online hosts"); 940 this.addOptWithArg("f", "filename", 941 "File to save regions list into unloading, or read from loading; " 942 + "default /tmp/<usernamehostname:port>"); 943 this.addOptNoArg("n", "noack", 944 "Turn on No-Ack mode(default: false) which won't check if region is online on target " 945 + "RegionServer, hence best effort. This is more performant in unloading and loading " 946 + "but might lead to region being unavailable for some time till master reassigns it " 947 + "in case the move failed"); 948 this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit " 949 + "irrespective of whether it finished or not;default Integer.MAX_VALUE"); 950 } 951 952 @Override 953 protected void processOptions(CommandLine cmd) { 954 String hostname = cmd.getOptionValue("r"); 955 rmbuilder = new RegionMoverBuilder(hostname, getConf()); 956 this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT); 957 if (cmd.hasOption('m')) { 958 rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m'))); 959 } 960 if (this.loadUnload.equals("isolate_regions") && cmd.hasOption("isolateRegionIds")) { 961 rmbuilder 962 .isolateRegionIdArray(Arrays.asList(cmd.getOptionValue("isolateRegionIds").split(","))); 963 } 964 if (cmd.hasOption('n')) { 965 rmbuilder.ack(false); 966 } 967 if (cmd.hasOption('f')) { 968 rmbuilder.filename(cmd.getOptionValue('f')); 969 } 970 if (cmd.hasOption('x')) { 971 rmbuilder.excludeFile(cmd.getOptionValue('x')); 972 } 973 if (cmd.hasOption('d')) { 974 rmbuilder.designatedFile(cmd.getOptionValue('d')); 975 } 976 if (cmd.hasOption('t')) { 977 rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t'))); 978 } 979 } 980 981 @Override 982 protected int doWork() throws Exception { 983 boolean success; 984 try (RegionMover rm = rmbuilder.build()) { 985 if (loadUnload.equalsIgnoreCase("load")) { 986 success = rm.load(); 987 } else if (loadUnload.equalsIgnoreCase("unload")) { 988 success = rm.unload(); 989 } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) { 990 success = rm.unloadFromRack(); 991 } else if (loadUnload.equalsIgnoreCase("isolate_regions")) { 992 if (rm.isolateRegionIdArray != null && !rm.isolateRegionIdArray.isEmpty()) { 993 success = rm.isolateRegions(); 994 } else { 995 LOG.error("Missing -i/--isolate_regions option with '-o isolate_regions' option"); 996 LOG.error("Use -h or --help for usage instructions"); 997 printUsage(); 998 success = false; 999 } 1000 } else { 1001 printUsage(); 1002 success = false; 1003 } 1004 } 1005 return (success ? 0 : 1); 1006 } 1007 1008 public static void main(String[] args) { 1009 try (RegionMover mover = new RegionMover()) { 1010 mover.doStaticMain(args); 1011 } 1012 } 1013}