001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.BufferedInputStream; 021import java.io.BufferedOutputStream; 022import java.io.Closeable; 023import java.io.DataInputStream; 024import java.io.DataOutputStream; 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.net.InetAddress; 030import java.nio.file.Files; 031import java.nio.file.Paths; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Collection; 035import java.util.Collections; 036import java.util.EnumSet; 037import java.util.HashSet; 038import java.util.Iterator; 039import java.util.List; 040import java.util.Locale; 041import java.util.Optional; 042import java.util.Set; 043import java.util.concurrent.Callable; 044import java.util.concurrent.CancellationException; 045import java.util.concurrent.ExecutionException; 046import java.util.concurrent.ExecutorService; 047import java.util.concurrent.Executors; 048import java.util.concurrent.Future; 049import java.util.concurrent.TimeUnit; 050import java.util.concurrent.TimeoutException; 051import java.util.function.Predicate; 052import org.apache.commons.io.IOUtils; 053import org.apache.hadoop.conf.Configuration; 054import org.apache.hadoop.hbase.ClusterMetrics.Option; 055import org.apache.hadoop.hbase.HBaseConfiguration; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HRegionLocation; 058import org.apache.hadoop.hbase.MetaTableAccessor; 059import org.apache.hadoop.hbase.ServerName; 060import org.apache.hadoop.hbase.UnknownRegionException; 061import org.apache.hadoop.hbase.client.Admin; 062import org.apache.hadoop.hbase.client.Connection; 063import org.apache.hadoop.hbase.client.ConnectionFactory; 064import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 065import org.apache.hadoop.hbase.client.RegionInfo; 066import org.apache.hadoop.hbase.client.RegionInfoBuilder; 067import org.apache.hadoop.hbase.client.Result; 068import org.apache.hadoop.hbase.master.RackManager; 069import org.apache.hadoop.hbase.master.RegionState; 070import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 071import org.apache.hadoop.hbase.net.Address; 072import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 073import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 074import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 075import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 076import org.apache.yetus.audience.InterfaceAudience; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses; 081import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 082import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 083 084/** 085 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command 086 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode 087 * acknowledges if regions are online after movement while noAck mode is best effort mode that 088 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck 089 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it 090 * anyways. This can also be used by constructiong an Object using the builder and then calling 091 * {@link #load()} or {@link #unload()} methods for the desired operations. 092 */ 093@InterfaceAudience.Public 094public class RegionMover extends AbstractHBaseTool implements Closeable { 095 public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max"; 096 public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max"; 097 public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max"; 098 public static final int DEFAULT_MOVE_RETRIES_MAX = 5; 099 public static final int DEFAULT_MOVE_WAIT_MAX = 60; 100 public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180; 101 102 private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class); 103 104 private RegionMoverBuilder rmbuilder; 105 private boolean ack = true; 106 private int maxthreads = 1; 107 private int timeout; 108 private List<String> isolateRegionIdArray; 109 private String loadUnload; 110 private String hostname; 111 private String filename; 112 private String excludeFile; 113 private String designatedFile; 114 private int port; 115 private Connection conn; 116 private Admin admin; 117 private RackManager rackManager; 118 119 private RegionMover(RegionMoverBuilder builder) throws IOException { 120 this.hostname = builder.hostname; 121 this.filename = builder.filename; 122 this.excludeFile = builder.excludeFile; 123 this.designatedFile = builder.designatedFile; 124 this.maxthreads = builder.maxthreads; 125 this.isolateRegionIdArray = builder.isolateRegionIdArray; 126 this.ack = builder.ack; 127 this.port = builder.port; 128 this.timeout = builder.timeout; 129 setConf(builder.conf); 130 this.conn = ConnectionFactory.createConnection(conf); 131 this.admin = conn.getAdmin(); 132 133 // if the hostname of master is ip, it indicates that the master/RS has enabled use-ip, we need 134 // to resolve the current hostname to ip to ensure that the RegionMover logic can be executed 135 // normally, see HBASE-27304 for details. 136 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); 137 if (InetAddresses.isInetAddress(master.getHostname())) { 138 if (!InetAddresses.isInetAddress(this.hostname)) { 139 this.hostname = InetAddress.getByName(this.hostname).getHostAddress(); 140 } 141 } 142 143 // Only while running unit tests, builder.rackManager will not be null for the convenience of 144 // providing custom rackManager. Otherwise for regular workflow/user triggered action, 145 // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is 146 // provided as @InterfaceAudience.Private and it is commented that this is just 147 // to be used by unit test. 148 rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager; 149 } 150 151 private RegionMover() { 152 } 153 154 @Override 155 public void close() { 156 IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e)); 157 IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e)); 158 } 159 160 /** 161 * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has 162 * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)}, 163 * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set 164 * the corresponding options. 165 */ 166 public static class RegionMoverBuilder { 167 private boolean ack = true; 168 private int maxthreads = 1; 169 private int timeout = Integer.MAX_VALUE; 170 private List<String> isolateRegionIdArray = new ArrayList<>(); 171 private String hostname; 172 private String filename; 173 private String excludeFile = null; 174 private String designatedFile = null; 175 private String defaultDir = System.getProperty("java.io.tmpdir"); 176 @InterfaceAudience.Private 177 final int port; 178 private final Configuration conf; 179 private RackManager rackManager; 180 181 public RegionMoverBuilder(String hostname) { 182 this(hostname, createConf()); 183 } 184 185 /** 186 * Creates a new configuration and sets region mover specific overrides 187 */ 188 private static Configuration createConf() { 189 Configuration conf = HBaseConfiguration.create(); 190 conf.setInt("hbase.client.prefetch.limit", 1); 191 conf.setInt("hbase.client.pause", 500); 192 conf.setInt("hbase.client.retries.number", 100); 193 return conf; 194 } 195 196 /** 197 * @param hostname Hostname to unload regions from or load regions to. Can be either hostname or 198 * hostname:port. 199 * @param conf Configuration object 200 */ 201 public RegionMoverBuilder(String hostname, Configuration conf) { 202 String[] splitHostname = hostname.toLowerCase().split(":"); 203 this.hostname = splitHostname[0]; 204 if (splitHostname.length == 2) { 205 this.port = Integer.parseInt(splitHostname[1]); 206 } else { 207 this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); 208 } 209 this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname 210 + ":" + Integer.toString(this.port); 211 this.conf = conf; 212 } 213 214 /** 215 * Path of file where regions will be written to during unloading/read from during loading 216 * @return RegionMoverBuilder object 217 */ 218 public RegionMoverBuilder filename(String filename) { 219 this.filename = filename; 220 return this; 221 } 222 223 /** 224 * Set the max number of threads that will be used to move regions 225 */ 226 public RegionMoverBuilder maxthreads(int threads) { 227 this.maxthreads = threads; 228 return this; 229 } 230 231 /** 232 * Set the region ID to isolate on the region server. 233 */ 234 public RegionMoverBuilder isolateRegionIdArray(List<String> isolateRegionIdArray) { 235 this.isolateRegionIdArray = isolateRegionIdArray; 236 return this; 237 } 238 239 /** 240 * Path of file containing hostnames to be excluded during region movement. Exclude file should 241 * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single 242 * host. 243 */ 244 public RegionMoverBuilder excludeFile(String excludefile) { 245 this.excludeFile = excludefile; 246 return this; 247 } 248 249 /** 250 * Set the designated file. Designated file contains hostnames where region moves. Designated 251 * file should have 'host:port' per line. Port is mandatory here as we can have many RS running 252 * on a single host. 253 * @param designatedFile The designated file 254 * @return RegionMoverBuilder object 255 */ 256 public RegionMoverBuilder designatedFile(String designatedFile) { 257 this.designatedFile = designatedFile; 258 return this; 259 } 260 261 /** 262 * Set ack/noAck mode. 263 * <p> 264 * In ack mode regions are acknowledged before and after moving and the move is retried 265 * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best 266 * effort mode,each region movement is tried once.This can be used during graceful shutdown as 267 * even if we have a stuck region,upon shutdown it'll be reassigned anyway. 268 * <p> 269 * @return RegionMoverBuilder object 270 */ 271 public RegionMoverBuilder ack(boolean ack) { 272 this.ack = ack; 273 return this; 274 } 275 276 /** 277 * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for 278 * movers also have a separate time which is hbase.move.wait.max * number of regions to 279 * load/unload 280 * @param timeout in seconds 281 * @return RegionMoverBuilder object 282 */ 283 public RegionMoverBuilder timeout(int timeout) { 284 this.timeout = timeout; 285 return this; 286 } 287 288 /** 289 * Set specific rackManager implementation. This setter method is for testing purpose only. 290 * @param rackManager rackManager impl 291 * @return RegionMoverBuilder object 292 */ 293 @InterfaceAudience.Private 294 public RegionMoverBuilder rackManager(RackManager rackManager) { 295 this.rackManager = rackManager; 296 return this; 297 } 298 299 /** 300 * This method builds the appropriate RegionMover object which can then be used to load/unload 301 * using load and unload methods 302 * @return RegionMover object 303 */ 304 public RegionMover build() throws IOException { 305 return new RegionMover(this); 306 } 307 } 308 309 /** 310 * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover 311 * Object has to be created using {@link #RegionMover(RegionMoverBuilder)} 312 * @return true if loading succeeded, false otherwise 313 */ 314 public boolean load() throws ExecutionException, InterruptedException, TimeoutException { 315 ExecutorService loadPool = Executors.newFixedThreadPool(1); 316 Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan()); 317 boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading"); 318 if (!isMetaMoved) { 319 return false; 320 } 321 loadPool = Executors.newFixedThreadPool(1); 322 loadTask = loadPool.submit(getNonMetaRegionsMovePlan()); 323 return waitTaskToFinish(loadPool, loadTask, "loading"); 324 } 325 326 private Callable<Boolean> getMetaRegionMovePlan() { 327 return getRegionsMovePlan(true); 328 } 329 330 private Callable<Boolean> getNonMetaRegionsMovePlan() { 331 return getRegionsMovePlan(false); 332 } 333 334 private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) { 335 return () -> { 336 try { 337 List<RegionInfo> regionsToMove = readRegionsFromFile(filename); 338 if (regionsToMove.isEmpty()) { 339 LOG.info("No regions to load.Exiting"); 340 return true; 341 } 342 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove); 343 if (moveMetaRegion) { 344 if (metaRegion.isPresent()) { 345 loadRegions(Collections.singletonList(metaRegion.get())); 346 } 347 } else { 348 metaRegion.ifPresent(regionsToMove::remove); 349 loadRegions(regionsToMove); 350 } 351 } catch (Exception e) { 352 LOG.error("Error while loading regions to " + hostname, e); 353 return false; 354 } 355 return true; 356 }; 357 } 358 359 private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) { 360 return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst(); 361 } 362 363 private void loadRegions(List<RegionInfo> regionsToMove) throws Exception { 364 ServerName server = getTargetServer(); 365 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 366 LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using " 367 + this.maxthreads + " threads.Ack mode:" + this.ack); 368 369 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 370 List<Future<Boolean>> taskList = new ArrayList<>(); 371 int counter = 0; 372 while (counter < regionsToMove.size()) { 373 RegionInfo region = regionsToMove.get(counter); 374 ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn); 375 if (currentServer == null) { 376 LOG 377 .warn("Could not get server for Region:" + region.getRegionNameAsString() + " moving on"); 378 counter++; 379 continue; 380 } else if (server.equals(currentServer)) { 381 LOG.info( 382 "Region " + region.getRegionNameAsString() + " is already on target server=" + server); 383 counter++; 384 continue; 385 } 386 if (ack) { 387 Future<Boolean> task = moveRegionsPool 388 .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions)); 389 taskList.add(task); 390 } else { 391 Future<Boolean> task = moveRegionsPool 392 .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions)); 393 taskList.add(task); 394 } 395 counter++; 396 } 397 398 moveRegionsPool.shutdown(); 399 long timeoutInSeconds = regionsToMove.size() 400 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 401 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 402 } 403 404 /** 405 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 406 * noAck mode we do not make sure that region is successfully online on the target region 407 * server,hence it is best effort.We do not unload regions to hostnames given in 408 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 409 * to hostnames provided in {@link #designatedFile} 410 * @return true if unloading succeeded, false otherwise 411 */ 412 public boolean unload() throws InterruptedException, ExecutionException, TimeoutException { 413 return unloadRegions(false); 414 } 415 416 /** 417 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 418 * noAck mode we do not make sure that region is successfully online on the target region 419 * server,hence it is best effort.We do not unload regions to hostnames given in 420 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 421 * to hostnames provided in {@link #designatedFile}. While unloading regions, destination 422 * RegionServers are selected from different rack i.e regions should not move to any RegionServers 423 * that belong to same rack as source RegionServer. 424 * @return true if unloading succeeded, false otherwise 425 */ 426 public boolean unloadFromRack() 427 throws InterruptedException, ExecutionException, TimeoutException { 428 return unloadRegions(true); 429 } 430 431 private boolean unloadRegions(boolean unloadFromRack) 432 throws ExecutionException, InterruptedException, TimeoutException { 433 return unloadRegions(unloadFromRack, null); 434 } 435 436 /** 437 * Isolated regions specified in {@link #isolateRegionIdArray} on {@link #hostname} in ack Mode 438 * and Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}. 439 * In noAck mode we do not make sure that region is successfully online on the target region 440 * server,hence it is the best effort. We do not unload regions to hostnames given in 441 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 442 * to hostnames provided in {@link #designatedFile} 443 * @return true if region isolation succeeded, false otherwise 444 */ 445 public boolean isolateRegions() 446 throws ExecutionException, InterruptedException, TimeoutException { 447 return unloadRegions(false, isolateRegionIdArray); 448 } 449 450 private boolean unloadRegions(boolean unloadFromRack, List<String> isolateRegionIdArray) 451 throws InterruptedException, ExecutionException, TimeoutException { 452 deleteFile(this.filename); 453 ExecutorService unloadPool = Executors.newFixedThreadPool(1); 454 Future<Boolean> unloadTask = unloadPool.submit(() -> { 455 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 456 try { 457 // Get Online RegionServers 458 List<ServerName> regionServers = new ArrayList<>(); 459 RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port)); 460 LOG.info("{} belongs to {}", hostname, rsgroup.getName()); 461 regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers())); 462 // Remove the host Region server from target Region Servers list 463 ServerName server = stripServer(regionServers, hostname, port); 464 if (server == null) { 465 LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.", 466 hostname, port); 467 LOG.debug("List of region servers: {}", regionServers); 468 return false; 469 } 470 // Remove RS not present in the designated file 471 includeExcludeRegionServers(designatedFile, regionServers, true); 472 473 // Remove RS present in the exclude file 474 includeExcludeRegionServers(excludeFile, regionServers, false); 475 476 if (unloadFromRack) { 477 // remove regionServers that belong to same rack (as source host) since the goal is to 478 // unload regions from source regionServer to destination regionServers 479 // that belong to different rack only. 480 String sourceRack = rackManager.getRack(server); 481 List<String> racks = rackManager.getRack(regionServers); 482 Iterator<ServerName> iterator = regionServers.iterator(); 483 int i = 0; 484 while (iterator.hasNext()) { 485 iterator.next(); 486 if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) { 487 iterator.remove(); 488 } 489 i++; 490 } 491 } 492 493 // Remove decommissioned RS 494 Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers()); 495 if (CollectionUtils.isNotEmpty(decommissionedRS)) { 496 regionServers.removeIf(decommissionedRS::contains); 497 LOG.debug("Excluded RegionServers from unloading regions to because they " 498 + "are marked as decommissioned. Servers: {}", decommissionedRS); 499 } 500 501 stripMaster(regionServers); 502 if (regionServers.isEmpty()) { 503 LOG.warn("No Regions were moved - no servers available"); 504 return false; 505 } else { 506 LOG.info("Available servers {}", regionServers); 507 } 508 unloadRegions(server, regionServers, movedRegions, isolateRegionIdArray); 509 } catch (Exception e) { 510 LOG.error("Error while unloading regions ", e); 511 return false; 512 } finally { 513 if (movedRegions != null) { 514 writeFile(filename, movedRegions); 515 } 516 } 517 return true; 518 }); 519 return waitTaskToFinish(unloadPool, unloadTask, "unloading"); 520 } 521 522 @InterfaceAudience.Private 523 Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup, 524 Collection<ServerName> onlineServers) { 525 if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 526 return onlineServers; 527 } 528 List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size()); 529 for (ServerName server : onlineServers) { 530 Address address = Address.fromParts(server.getHostname(), server.getPort()); 531 if (rsgroup.containsServer(address)) { 532 serverLists.add(server); 533 } 534 } 535 return serverLists; 536 } 537 538 private void unloadRegions(ServerName server, List<ServerName> regionServers, 539 List<RegionInfo> movedRegions, List<String> isolateRegionIdArray) throws Exception { 540 while (true) { 541 List<RegionInfo> isolateRegionInfoList = Collections.synchronizedList(new ArrayList<>()); 542 RegionInfo isolateRegionInfo = null; 543 if (isolateRegionIdArray != null && !isolateRegionIdArray.isEmpty()) { 544 // Region will be moved to target region server with Ack mode. 545 final ExecutorService isolateRegionPool = Executors.newFixedThreadPool(maxthreads); 546 List<Future<Boolean>> isolateRegionTaskList = new ArrayList<>(); 547 List<RegionInfo> recentlyIsolatedRegion = Collections.synchronizedList(new ArrayList<>()); 548 boolean allRegionOpsSuccessful = true; 549 boolean isMetaIsolated = false; 550 RegionInfo metaRegionInfo = RegionInfoBuilder.FIRST_META_REGIONINFO; 551 List<HRegionLocation> hRegionLocationRegionIsolation = 552 Collections.synchronizedList(new ArrayList<>()); 553 for (String isolateRegionId : isolateRegionIdArray) { 554 if (isolateRegionId.equalsIgnoreCase(metaRegionInfo.getEncodedName())) { 555 isMetaIsolated = true; 556 continue; 557 } 558 Result result = MetaTableAccessor.scanByRegionEncodedName(conn, isolateRegionId); 559 HRegionLocation hRegionLocation = 560 MetaTableAccessor.getRegionLocation(conn, result.getRow()); 561 if (hRegionLocation != null) { 562 hRegionLocationRegionIsolation.add(hRegionLocation); 563 } else { 564 LOG.error("Region " + isolateRegionId + " doesn't exists/can't fetch from" 565 + " meta...Quitting now"); 566 // We only move the regions if all the regions were found. 567 allRegionOpsSuccessful = false; 568 break; 569 } 570 } 571 572 if (!allRegionOpsSuccessful) { 573 break; 574 } 575 // If hbase:meta region was isolated, then it needs to be part of isolateRegionInfoList. 576 if (isMetaIsolated) { 577 ZKWatcher zkWatcher = new ZKWatcher(conf, null, null); 578 List<HRegionLocation> result = new ArrayList<>(); 579 for (String znode : zkWatcher.getMetaReplicaNodes()) { 580 String path = ZNodePaths.joinZNode(zkWatcher.getZNodePaths().baseZNode, znode); 581 int replicaId = zkWatcher.getZNodePaths().getMetaReplicaIdFromPath(path); 582 RegionState state = MetaTableLocator.getMetaRegionState(zkWatcher, replicaId); 583 result.add(new HRegionLocation(state.getRegion(), state.getServerName())); 584 } 585 ServerName metaSeverName = result.get(0).getServerName(); 586 // For isolating hbase:meta, it should move explicitly in Ack mode, 587 // hence the forceMoveRegionByAck = true. 588 if (!metaSeverName.equals(server)) { 589 LOG.info("Region of hbase:meta " + metaRegionInfo.getEncodedName() + " is on server " 590 + metaSeverName + " moving to " + server); 591 submitRegionMovesWhileUnloading(metaSeverName, Collections.singletonList(server), 592 movedRegions, Collections.singletonList(metaRegionInfo), true); 593 } else { 594 LOG.info("Region of hbase:meta " + metaRegionInfo.getEncodedName() + " already exists" 595 + " on server : " + server); 596 } 597 isolateRegionInfoList.add(RegionInfoBuilder.FIRST_META_REGIONINFO); 598 } 599 600 if (!hRegionLocationRegionIsolation.isEmpty()) { 601 for (HRegionLocation hRegionLocation : hRegionLocationRegionIsolation) { 602 isolateRegionInfo = hRegionLocation.getRegion(); 603 isolateRegionInfoList.add(isolateRegionInfo); 604 if (hRegionLocation.getServerName() == server) { 605 LOG.info("Region " + hRegionLocation.getRegion().getEncodedName() + " already exists" 606 + " on server : " + server.getHostname()); 607 } else { 608 Future<Boolean> isolateRegionTask = 609 isolateRegionPool.submit(new MoveWithAck(conn, isolateRegionInfo, 610 hRegionLocation.getServerName(), server, recentlyIsolatedRegion)); 611 isolateRegionTaskList.add(isolateRegionTask); 612 } 613 } 614 } 615 616 if (!isolateRegionTaskList.isEmpty()) { 617 isolateRegionPool.shutdown(); 618 // Now that we have fetched all the region's regionInfo, we can move them. 619 waitMoveTasksToFinish(isolateRegionPool, isolateRegionTaskList, 620 admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX)); 621 622 Set<RegionInfo> currentRegionsOnTheServer = new HashSet<>(admin.getRegions(server)); 623 if (!currentRegionsOnTheServer.containsAll(isolateRegionInfoList)) { 624 // If all the regions are not online on the target server, 625 // we don't put RS in decommission mode and exit from here. 626 LOG.error("One of the Region move failed OR stuck in transition...Quitting now"); 627 break; 628 } 629 } else { 630 LOG.info("All regions already exists on server : " + server.getHostname()); 631 } 632 // Once region has been moved to target RS, put the target RS into decommission mode, 633 // so master doesn't assign new region to the target RS while we unload the target RS. 634 // Also pass 'offload' flag as false since we don't want master to offload the target RS. 635 List<ServerName> listOfServer = new ArrayList<>(); 636 listOfServer.add(server); 637 LOG.info("Putting server : " + server.getHostname() + " in decommission/draining mode"); 638 admin.decommissionRegionServers(listOfServer, false); 639 } 640 List<RegionInfo> regionsToMove = admin.getRegions(server); 641 // Remove all the regions from the online Region list, that we just isolated. 642 // This will also include hbase:meta if it was isolated. 643 regionsToMove.removeAll(isolateRegionInfoList); 644 regionsToMove.removeAll(movedRegions); 645 if (regionsToMove.isEmpty()) { 646 LOG.info("No Regions to move....Quitting now"); 647 break; 648 } 649 LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}", 650 regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack); 651 652 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove); 653 if (metaRegion.isPresent()) { 654 RegionInfo meta = metaRegion.get(); 655 // hbase:meta should move explicitly in Ack mode. 656 submitRegionMovesWhileUnloading(server, regionServers, movedRegions, 657 Collections.singletonList(meta), true); 658 regionsToMove.remove(meta); 659 } 660 submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove, false); 661 } 662 } 663 664 private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers, 665 List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove, boolean forceMoveRegionByAck) 666 throws Exception { 667 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 668 List<Future<Boolean>> taskList = new ArrayList<>(); 669 int serverIndex = 0; 670 for (RegionInfo regionToMove : regionsToMove) { 671 // To move/isolate hbase:meta on a server, it should happen explicitly by Ack mode, hence the 672 // forceMoveRegionByAck = true. 673 if (ack || forceMoveRegionByAck) { 674 Future<Boolean> task = moveRegionsPool.submit(new MoveWithAck(conn, regionToMove, server, 675 regionServers.get(serverIndex), movedRegions)); 676 taskList.add(task); 677 } else { 678 Future<Boolean> task = moveRegionsPool.submit(new MoveWithoutAck(admin, regionToMove, 679 server, regionServers.get(serverIndex), movedRegions)); 680 taskList.add(task); 681 } 682 serverIndex = (serverIndex + 1) % regionServers.size(); 683 } 684 moveRegionsPool.shutdown(); 685 long timeoutInSeconds = regionsToMove.size() 686 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 687 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 688 } 689 690 private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation) 691 throws TimeoutException, InterruptedException, ExecutionException { 692 pool.shutdown(); 693 try { 694 if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { 695 LOG.warn("Timed out before finishing the " + operation + " operation. Timeout: " 696 + this.timeout + "sec"); 697 pool.shutdownNow(); 698 } 699 } catch (InterruptedException e) { 700 pool.shutdownNow(); 701 Thread.currentThread().interrupt(); 702 } 703 try { 704 return task.get(5, TimeUnit.SECONDS); 705 } catch (InterruptedException e) { 706 LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e); 707 throw e; 708 } catch (ExecutionException e) { 709 LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e); 710 throw e; 711 } 712 } 713 714 private void waitMoveTasksToFinish(ExecutorService moveRegionsPool, 715 List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception { 716 try { 717 if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { 718 moveRegionsPool.shutdownNow(); 719 } 720 } catch (InterruptedException e) { 721 moveRegionsPool.shutdownNow(); 722 Thread.currentThread().interrupt(); 723 } 724 for (Future<Boolean> future : taskList) { 725 try { 726 // if even after shutdownNow threads are stuck we wait for 5 secs max 727 if (!future.get(5, TimeUnit.SECONDS)) { 728 LOG.error("Was Not able to move region....Exiting Now"); 729 throw new Exception("Could not move region Exception"); 730 } 731 } catch (InterruptedException e) { 732 LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); 733 throw e; 734 } catch (ExecutionException e) { 735 boolean ignoreFailure = ignoreRegionMoveFailure(e); 736 if (ignoreFailure) { 737 LOG.debug("Ignore region move failure, it might have been split/merged.", e); 738 } else { 739 LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e); 740 throw e; 741 } 742 } catch (CancellationException e) { 743 LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds 744 + "secs", e); 745 throw e; 746 } 747 } 748 } 749 750 private boolean ignoreRegionMoveFailure(ExecutionException e) { 751 boolean ignoreFailure = false; 752 if (e.getCause() instanceof UnknownRegionException) { 753 // region does not exist anymore 754 ignoreFailure = true; 755 } else if ( 756 e.getCause() instanceof DoNotRetryRegionException && e.getCause().getMessage() != null 757 && e.getCause().getMessage() 758 .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,") 759 ) { 760 // region is recently split 761 ignoreFailure = true; 762 } 763 return ignoreFailure; 764 } 765 766 private ServerName getTargetServer() throws Exception { 767 ServerName server = null; 768 int maxWaitInSeconds = 769 admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX); 770 long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000; 771 while (EnvironmentEdgeManager.currentTime() < maxWait) { 772 try { 773 List<ServerName> regionServers = new ArrayList<>(); 774 regionServers.addAll(admin.getRegionServers()); 775 // Remove the host Region server from target Region Servers list 776 server = stripServer(regionServers, hostname, port); 777 if (server != null) { 778 break; 779 } else { 780 LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting"); 781 } 782 } catch (IOException e) { 783 LOG.warn("Could not get list of region servers", e); 784 } 785 Thread.sleep(500); 786 } 787 if (server == null) { 788 LOG.error("Server " + hostname + ":" + port + " is not up. Giving up."); 789 throw new Exception("Server " + hostname + ":" + port + " to load regions not online"); 790 } 791 return server; 792 } 793 794 private List<RegionInfo> readRegionsFromFile(String filename) throws IOException { 795 List<RegionInfo> regions = new ArrayList<>(); 796 File f = new File(filename); 797 if (!f.exists()) { 798 return regions; 799 } 800 try ( 801 DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(f)))) { 802 int numRegions = dis.readInt(); 803 int index = 0; 804 while (index < numRegions) { 805 regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis))); 806 index++; 807 } 808 } catch (IOException e) { 809 LOG.error("Error while reading regions from file:" + filename, e); 810 throw e; 811 } 812 return regions; 813 } 814 815 /** 816 * Write the number of regions moved in the first line followed by regions moved in subsequent 817 * lines 818 */ 819 private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException { 820 try (DataOutputStream dos = 821 new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filename)))) { 822 dos.writeInt(movedRegions.size()); 823 for (RegionInfo region : movedRegions) { 824 Bytes.writeByteArray(dos, RegionInfo.toByteArray(region)); 825 } 826 } catch (IOException e) { 827 LOG.error("ERROR: Was Not able to write regions moved to output file but moved " 828 + movedRegions.size() + " regions", e); 829 throw e; 830 } 831 } 832 833 private void deleteFile(String filename) { 834 File f = new File(filename); 835 if (f.exists()) { 836 f.delete(); 837 } 838 } 839 840 /** 841 * @param filename The file should have 'host:port' per line 842 * @return List of servers from the file in format 'hostname:port'. 843 */ 844 private List<String> readServersFromFile(String filename) throws IOException { 845 List<String> servers = new ArrayList<>(); 846 if (filename != null) { 847 try { 848 Files.readAllLines(Paths.get(filename)).stream().map(String::trim) 849 .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase) 850 .forEach(servers::add); 851 } catch (IOException e) { 852 LOG.error("Exception while reading servers from file,", e); 853 throw e; 854 } 855 } 856 return servers; 857 } 858 859 /** 860 * Designates or excludes the servername whose hostname and port portion matches the list given in 861 * the file. Example:<br> 862 * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and 863 * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and RS3 864 * are removed from regionServers list so that regions can move to only RS1. If you want to 865 * exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. When we call 866 * includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from regionServers 867 * list so that regions can move to only RS2 and RS3. 868 */ 869 private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers, 870 boolean isInclude) throws IOException { 871 if (fileName != null) { 872 List<String> servers = readServersFromFile(fileName); 873 if (servers.isEmpty()) { 874 LOG.warn("No servers provided in the file: {}." + fileName); 875 return; 876 } 877 Iterator<ServerName> i = regionServers.iterator(); 878 while (i.hasNext()) { 879 String rs = i.next().getServerName(); 880 String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" 881 + rs.split(ServerName.SERVERNAME_SEPARATOR)[1]; 882 if (isInclude != servers.contains(rsPort)) { 883 i.remove(); 884 } 885 } 886 } 887 } 888 889 /** 890 * Exclude master from list of RSs to move regions to 891 */ 892 private void stripMaster(List<ServerName> regionServers) throws IOException { 893 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); 894 stripServer(regionServers, master.getHostname(), master.getPort()); 895 } 896 897 /** 898 * Remove the servername whose hostname and port portion matches from the passed array of servers. 899 * Returns as side-effect the servername removed. 900 * @return server removed from list of Region Servers 901 */ 902 private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) { 903 for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) { 904 ServerName server = iter.next(); 905 if ( 906 server.getAddress().getHostName().equalsIgnoreCase(hostname) 907 && server.getAddress().getPort() == port 908 ) { 909 iter.remove(); 910 return server; 911 } 912 } 913 return null; 914 } 915 916 @Override 917 protected void addOptions() { 918 this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>"); 919 this.addRequiredOptWithArg("o", "operation", 920 "Expected: load/unload/unload_from_rack/isolate_regions"); 921 this.addOptWithArg("m", "maxthreads", 922 "Define the maximum number of threads to use to unload and reload the regions"); 923 this.addOptWithArg("i", "isolateRegionIds", 924 "Comma separated list of Region IDs hash to isolate on a RegionServer and put region server" 925 + " in draining mode. This option should only be used with '-o isolate_regions'." 926 + " By putting region server in decommission/draining mode, master can't assign any" 927 + " new region on this server. If one or more regions are not found OR failed to isolate" 928 + " successfully, utility will exist without putting RS in draining/decommission mode." 929 + " Ex. --isolateRegionIds id1,id2,id3 OR -i id1,id2,id3"); 930 this.addOptWithArg("x", "excludefile", 931 "File with <hostname:port> per line to exclude as unload targets; default excludes only " 932 + "target host; useful for rack decommisioning."); 933 this.addOptWithArg("d", "designatedfile", 934 "File with <hostname:port> per line as unload targets;" + "default is all online hosts"); 935 this.addOptWithArg("f", "filename", 936 "File to save regions list into unloading, or read from loading; " 937 + "default /tmp/<usernamehostname:port>"); 938 this.addOptNoArg("n", "noack", 939 "Turn on No-Ack mode(default: false) which won't check if region is online on target " 940 + "RegionServer, hence best effort. This is more performant in unloading and loading " 941 + "but might lead to region being unavailable for some time till master reassigns it " 942 + "in case the move failed"); 943 this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit " 944 + "irrespective of whether it finished or not;default Integer.MAX_VALUE"); 945 } 946 947 @Override 948 protected void processOptions(CommandLine cmd) { 949 String hostname = cmd.getOptionValue("r"); 950 rmbuilder = new RegionMoverBuilder(hostname); 951 this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT); 952 if (cmd.hasOption('m')) { 953 rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m'))); 954 } 955 if (this.loadUnload.equals("isolate_regions") && cmd.hasOption("isolateRegionIds")) { 956 rmbuilder 957 .isolateRegionIdArray(Arrays.asList(cmd.getOptionValue("isolateRegionIds").split(","))); 958 } 959 if (cmd.hasOption('n')) { 960 rmbuilder.ack(false); 961 } 962 if (cmd.hasOption('f')) { 963 rmbuilder.filename(cmd.getOptionValue('f')); 964 } 965 if (cmd.hasOption('x')) { 966 rmbuilder.excludeFile(cmd.getOptionValue('x')); 967 } 968 if (cmd.hasOption('d')) { 969 rmbuilder.designatedFile(cmd.getOptionValue('d')); 970 } 971 if (cmd.hasOption('t')) { 972 rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t'))); 973 } 974 } 975 976 @Override 977 protected int doWork() throws Exception { 978 boolean success; 979 try (RegionMover rm = rmbuilder.build()) { 980 if (loadUnload.equalsIgnoreCase("load")) { 981 success = rm.load(); 982 } else if (loadUnload.equalsIgnoreCase("unload")) { 983 success = rm.unload(); 984 } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) { 985 success = rm.unloadFromRack(); 986 } else if (loadUnload.equalsIgnoreCase("isolate_regions")) { 987 if (rm.isolateRegionIdArray != null && !rm.isolateRegionIdArray.isEmpty()) { 988 success = rm.isolateRegions(); 989 } else { 990 LOG.error("Missing -i/--isolate_regions option with '-o isolate_regions' option"); 991 LOG.error("Use -h or --help for usage instructions"); 992 printUsage(); 993 success = false; 994 } 995 } else { 996 printUsage(); 997 success = false; 998 } 999 } 1000 return (success ? 0 : 1); 1001 } 1002 1003 public static void main(String[] args) { 1004 try (RegionMover mover = new RegionMover()) { 1005 mover.doStaticMain(args); 1006 } 1007 } 1008}