001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.BufferedInputStream;
021import java.io.BufferedOutputStream;
022import java.io.Closeable;
023import java.io.DataInputStream;
024import java.io.DataOutputStream;
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.net.InetAddress;
030import java.nio.file.Files;
031import java.nio.file.Paths;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collection;
035import java.util.Collections;
036import java.util.EnumSet;
037import java.util.HashSet;
038import java.util.Iterator;
039import java.util.List;
040import java.util.Locale;
041import java.util.Optional;
042import java.util.Set;
043import java.util.concurrent.Callable;
044import java.util.concurrent.CancellationException;
045import java.util.concurrent.ExecutionException;
046import java.util.concurrent.ExecutorService;
047import java.util.concurrent.Executors;
048import java.util.concurrent.Future;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.TimeoutException;
051import java.util.function.Predicate;
052import org.apache.commons.io.IOUtils;
053import org.apache.hadoop.conf.Configuration;
054import org.apache.hadoop.hbase.ClusterMetrics.Option;
055import org.apache.hadoop.hbase.HBaseConfiguration;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionLocation;
058import org.apache.hadoop.hbase.MetaTableAccessor;
059import org.apache.hadoop.hbase.ServerName;
060import org.apache.hadoop.hbase.TableName;
061import org.apache.hadoop.hbase.UnknownRegionException;
062import org.apache.hadoop.hbase.client.Admin;
063import org.apache.hadoop.hbase.client.Connection;
064import org.apache.hadoop.hbase.client.ConnectionFactory;
065import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
066import org.apache.hadoop.hbase.client.RegionInfo;
067import org.apache.hadoop.hbase.client.RegionInfoBuilder;
068import org.apache.hadoop.hbase.client.Result;
069import org.apache.hadoop.hbase.master.RackManager;
070import org.apache.hadoop.hbase.master.RegionState;
071import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
072import org.apache.hadoop.hbase.net.Address;
073import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
074import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
075import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
076import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
082import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
083import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
084
085/**
086 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
087 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
088 * acknowledges if regions are online after movement while noAck mode is best effort mode that
089 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
090 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
091 * anyways. This can also be used by constructiong an Object using the builder and then calling
092 * {@link #load()} or {@link #unload()} methods for the desired operations.
093 */
094@InterfaceAudience.Public
095public class RegionMover extends AbstractHBaseTool implements Closeable {
096  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
097  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
098  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
099  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
100  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
101  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
102
103  private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
104
105  private RegionMoverBuilder rmbuilder;
106  private boolean ack = true;
107  private int maxthreads = 1;
108  private int timeout;
109  private List<String> isolateRegionIdArray;
110  private String loadUnload;
111  private String hostname;
112  private String filename;
113  private String excludeFile;
114  private String designatedFile;
115  private int port;
116  private Connection conn;
117  private Admin admin;
118  private RackManager rackManager;
119
120  private RegionMover(RegionMoverBuilder builder) throws IOException {
121    this.hostname = builder.hostname;
122    this.filename = builder.filename;
123    this.excludeFile = builder.excludeFile;
124    this.designatedFile = builder.designatedFile;
125    this.maxthreads = builder.maxthreads;
126    this.isolateRegionIdArray = builder.isolateRegionIdArray;
127    this.ack = builder.ack;
128    this.port = builder.port;
129    this.timeout = builder.timeout;
130    setConf(builder.conf);
131    this.conn = ConnectionFactory.createConnection(conf);
132    this.admin = conn.getAdmin();
133
134    // if the hostname of master is ip, it indicates that the master/RS has enabled use-ip, we need
135    // to resolve the current hostname to ip to ensure that the RegionMover logic can be executed
136    // normally, see HBASE-27304 for details.
137    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
138    if (InetAddresses.isInetAddress(master.getHostname())) {
139      if (!InetAddresses.isInetAddress(this.hostname)) {
140        this.hostname = InetAddress.getByName(this.hostname).getHostAddress();
141      }
142    }
143
144    // Only while running unit tests, builder.rackManager will not be null for the convenience of
145    // providing custom rackManager. Otherwise for regular workflow/user triggered action,
146    // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is
147    // provided as @InterfaceAudience.Private and it is commented that this is just
148    // to be used by unit test.
149    rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager;
150  }
151
152  private RegionMover() {
153  }
154
155  @Override
156  public void close() {
157    IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e));
158    IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e));
159  }
160
161  /**
162   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
163   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
164   * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set
165   * the corresponding options.
166   */
167  public static class RegionMoverBuilder {
168    private boolean ack = true;
169    private int maxthreads = 1;
170    private int timeout = Integer.MAX_VALUE;
171    private List<String> isolateRegionIdArray = new ArrayList<>();
172    private String hostname;
173    private String filename;
174    private String excludeFile = null;
175    private String designatedFile = null;
176    private String defaultDir = System.getProperty("java.io.tmpdir");
177    @InterfaceAudience.Private
178    final int port;
179    private final Configuration conf;
180    private RackManager rackManager;
181
182    public RegionMoverBuilder(String hostname) {
183      this(hostname, createConf());
184    }
185
186    /**
187     * Creates a new configuration and sets region mover specific overrides
188     */
189    private static Configuration createConf() {
190      final Configuration conf = HBaseConfiguration.create();
191      return overrideConf(conf);
192    }
193
194    private static Configuration overrideConf(Configuration conf) {
195      conf.setInt("hbase.client.prefetch.limit", 1);
196      conf.setInt("hbase.client.pause", 500);
197      conf.setInt("hbase.client.retries.number", 100);
198      return conf;
199    }
200
201    /**
202     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname or
203     *                 hostname:port.
204     * @param conf     Configuration object
205     */
206    public RegionMoverBuilder(String hostname, Configuration conf) {
207      String[] splitHostname = hostname.toLowerCase().split(":");
208      this.hostname = splitHostname[0];
209      if (splitHostname.length == 2) {
210        this.port = Integer.parseInt(splitHostname[1]);
211      } else {
212        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
213      }
214      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
215        + ":" + Integer.toString(this.port);
216      this.conf = overrideConf(new Configuration(conf));
217    }
218
219    /**
220     * Path of file where regions will be written to during unloading/read from during loading
221     * @return RegionMoverBuilder object
222     */
223    public RegionMoverBuilder filename(String filename) {
224      this.filename = filename;
225      return this;
226    }
227
228    /**
229     * Set the max number of threads that will be used to move regions
230     */
231    public RegionMoverBuilder maxthreads(int threads) {
232      this.maxthreads = threads;
233      return this;
234    }
235
236    /**
237     * Set the region ID to isolate on the region server.
238     */
239    public RegionMoverBuilder isolateRegionIdArray(List<String> isolateRegionIdArray) {
240      this.isolateRegionIdArray = isolateRegionIdArray;
241      return this;
242    }
243
244    /**
245     * Path of file containing hostnames to be excluded during region movement. Exclude file should
246     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
247     * host.
248     */
249    public RegionMoverBuilder excludeFile(String excludefile) {
250      this.excludeFile = excludefile;
251      return this;
252    }
253
254    /**
255     * Set the designated file. Designated file contains hostnames where region moves. Designated
256     * file should have 'host:port' per line. Port is mandatory here as we can have many RS running
257     * on a single host.
258     * @param designatedFile The designated file
259     * @return RegionMoverBuilder object
260     */
261    public RegionMoverBuilder designatedFile(String designatedFile) {
262      this.designatedFile = designatedFile;
263      return this;
264    }
265
266    /**
267     * Set ack/noAck mode.
268     * <p>
269     * In ack mode regions are acknowledged before and after moving and the move is retried
270     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
271     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
272     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
273     * <p>
274     * @return RegionMoverBuilder object
275     */
276    public RegionMoverBuilder ack(boolean ack) {
277      this.ack = ack;
278      return this;
279    }
280
281    /**
282     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
283     * movers also have a separate time which is hbase.move.wait.max * number of regions to
284     * load/unload
285     * @param timeout in seconds
286     * @return RegionMoverBuilder object
287     */
288    public RegionMoverBuilder timeout(int timeout) {
289      this.timeout = timeout;
290      return this;
291    }
292
293    /**
294     * Set specific rackManager implementation. This setter method is for testing purpose only.
295     * @param rackManager rackManager impl
296     * @return RegionMoverBuilder object
297     */
298    @InterfaceAudience.Private
299    public RegionMoverBuilder rackManager(RackManager rackManager) {
300      this.rackManager = rackManager;
301      return this;
302    }
303
304    /**
305     * This method builds the appropriate RegionMover object which can then be used to load/unload
306     * using load and unload methods
307     * @return RegionMover object
308     */
309    public RegionMover build() throws IOException {
310      return new RegionMover(this);
311    }
312  }
313
314  /**
315   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
316   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
317   * @return true if loading succeeded, false otherwise
318   */
319  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
320    ExecutorService loadPool = Executors.newFixedThreadPool(1);
321    Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan());
322    boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading");
323    if (!isMetaMoved) {
324      return false;
325    }
326    loadPool = Executors.newFixedThreadPool(1);
327    loadTask = loadPool.submit(getNonMetaRegionsMovePlan());
328    return waitTaskToFinish(loadPool, loadTask, "loading");
329  }
330
331  private Callable<Boolean> getMetaRegionMovePlan() {
332    return getRegionsMovePlan(true);
333  }
334
335  private Callable<Boolean> getNonMetaRegionsMovePlan() {
336    return getRegionsMovePlan(false);
337  }
338
339  private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) {
340    return () -> {
341      try {
342        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
343        if (regionsToMove.isEmpty()) {
344          LOG.info("No regions to load.Exiting");
345          return true;
346        }
347        Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
348        if (moveMetaRegion) {
349          if (metaRegion.isPresent()) {
350            loadRegions(Collections.singletonList(metaRegion.get()));
351          }
352        } else {
353          metaRegion.ifPresent(regionsToMove::remove);
354          loadRegions(regionsToMove);
355        }
356      } catch (Exception e) {
357        LOG.error("Error while loading regions to " + hostname, e);
358        return false;
359      }
360      return true;
361    };
362  }
363
364  private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) {
365    return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst();
366  }
367
368  private void loadRegions(List<RegionInfo> regionsToMove) throws Exception {
369    ServerName server = getTargetServer();
370    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
371    LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using "
372      + this.maxthreads + " threads.Ack mode:" + this.ack);
373
374    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
375    List<Future<Boolean>> taskList = new ArrayList<>();
376    int counter = 0;
377    while (counter < regionsToMove.size()) {
378      RegionInfo region = regionsToMove.get(counter);
379      ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn);
380      if (currentServer == null) {
381        LOG
382          .warn("Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
383        counter++;
384        continue;
385      } else if (server.equals(currentServer)) {
386        LOG.info(
387          "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
388        counter++;
389        continue;
390      }
391      if (ack) {
392        Future<Boolean> task = moveRegionsPool
393          .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions));
394        taskList.add(task);
395      } else {
396        Future<Boolean> task = moveRegionsPool
397          .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions));
398        taskList.add(task);
399      }
400      counter++;
401    }
402
403    moveRegionsPool.shutdown();
404    long timeoutInSeconds = regionsToMove.size()
405      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
406    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
407  }
408
409  /**
410   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
411   * noAck mode we do not make sure that region is successfully online on the target region
412   * server,hence it is best effort.We do not unload regions to hostnames given in
413   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
414   * to hostnames provided in {@link #designatedFile}
415   * @return true if unloading succeeded, false otherwise
416   */
417  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
418    return unloadRegions(false);
419  }
420
421  /**
422   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
423   * noAck mode we do not make sure that region is successfully online on the target region
424   * server,hence it is best effort.We do not unload regions to hostnames given in
425   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
426   * to hostnames provided in {@link #designatedFile}. While unloading regions, destination
427   * RegionServers are selected from different rack i.e regions should not move to any RegionServers
428   * that belong to same rack as source RegionServer.
429   * @return true if unloading succeeded, false otherwise
430   */
431  public boolean unloadFromRack()
432    throws InterruptedException, ExecutionException, TimeoutException {
433    return unloadRegions(true);
434  }
435
436  private boolean unloadRegions(boolean unloadFromRack)
437    throws ExecutionException, InterruptedException, TimeoutException {
438    return unloadRegions(unloadFromRack, null);
439  }
440
441  /**
442   * Isolated regions specified in {@link #isolateRegionIdArray} on {@link #hostname} in ack Mode
443   * and Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.
444   * In noAck mode we do not make sure that region is successfully online on the target region
445   * server,hence it is the best effort. We do not unload regions to hostnames given in
446   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
447   * to hostnames provided in {@link #designatedFile}
448   * @return true if region isolation succeeded, false otherwise
449   */
450  public boolean isolateRegions()
451    throws ExecutionException, InterruptedException, TimeoutException {
452    return unloadRegions(false, isolateRegionIdArray);
453  }
454
455  private boolean unloadRegions(boolean unloadFromRack, List<String> isolateRegionIdArray)
456    throws InterruptedException, ExecutionException, TimeoutException {
457    deleteFile(this.filename);
458    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
459    Future<Boolean> unloadTask = unloadPool.submit(() -> {
460      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
461      try {
462        // Get Online RegionServers
463        List<ServerName> regionServers = new ArrayList<>();
464        RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port));
465        LOG.info("{} belongs to {}", hostname, rsgroup.getName());
466        regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers()));
467        // Remove the host Region server from target Region Servers list
468        ServerName server = stripServer(regionServers, hostname, port);
469        if (server == null) {
470          LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
471            hostname, port);
472          LOG.debug("List of region servers: {}", regionServers);
473          return false;
474        }
475        // Remove RS not present in the designated file
476        includeExcludeRegionServers(designatedFile, regionServers, true);
477
478        // Remove RS present in the exclude file
479        includeExcludeRegionServers(excludeFile, regionServers, false);
480
481        if (unloadFromRack) {
482          // remove regionServers that belong to same rack (as source host) since the goal is to
483          // unload regions from source regionServer to destination regionServers
484          // that belong to different rack only.
485          String sourceRack = rackManager.getRack(server);
486          List<String> racks = rackManager.getRack(regionServers);
487          Iterator<ServerName> iterator = regionServers.iterator();
488          int i = 0;
489          while (iterator.hasNext()) {
490            iterator.next();
491            if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) {
492              iterator.remove();
493            }
494            i++;
495          }
496        }
497
498        // Remove decommissioned RS
499        Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers());
500        if (CollectionUtils.isNotEmpty(decommissionedRS)) {
501          regionServers.removeIf(decommissionedRS::contains);
502          LOG.debug("Excluded RegionServers from unloading regions to because they "
503            + "are marked as decommissioned. Servers: {}", decommissionedRS);
504        }
505
506        stripMaster(regionServers);
507        if (regionServers.isEmpty()) {
508          LOG.warn("No Regions were moved - no servers available");
509          return false;
510        } else {
511          LOG.info("Available servers {}", regionServers);
512        }
513        unloadRegions(server, regionServers, movedRegions, isolateRegionIdArray);
514      } catch (Exception e) {
515        LOG.error("Error while unloading regions ", e);
516        return false;
517      } finally {
518        if (movedRegions != null) {
519          writeFile(filename, movedRegions);
520        }
521      }
522      return true;
523    });
524    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
525  }
526
527  @InterfaceAudience.Private
528  Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup,
529    Collection<ServerName> onlineServers) {
530    if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
531      return onlineServers;
532    }
533    List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size());
534    for (ServerName server : onlineServers) {
535      Address address = Address.fromParts(server.getHostname(), server.getPort());
536      if (rsgroup.containsServer(address)) {
537        serverLists.add(server);
538      }
539    }
540    return serverLists;
541  }
542
543  private void unloadRegions(ServerName server, List<ServerName> regionServers,
544    List<RegionInfo> movedRegions, List<String> isolateRegionIdArray) throws Exception {
545    while (true) {
546      List<RegionInfo> isolateRegionInfoList = Collections.synchronizedList(new ArrayList<>());
547      RegionInfo isolateRegionInfo = null;
548      if (isolateRegionIdArray != null && !isolateRegionIdArray.isEmpty()) {
549        // Region will be moved to target region server with Ack mode.
550        final ExecutorService isolateRegionPool = Executors.newFixedThreadPool(maxthreads);
551        List<Future<Boolean>> isolateRegionTaskList = new ArrayList<>();
552        List<RegionInfo> recentlyIsolatedRegion = Collections.synchronizedList(new ArrayList<>());
553        boolean allRegionOpsSuccessful = true;
554        boolean isMetaIsolated = false;
555        RegionInfo metaRegionInfo = RegionInfoBuilder.FIRST_META_REGIONINFO;
556        List<HRegionLocation> hRegionLocationRegionIsolation =
557          Collections.synchronizedList(new ArrayList<>());
558        for (String isolateRegionId : isolateRegionIdArray) {
559          if (isolateRegionId.equalsIgnoreCase(metaRegionInfo.getEncodedName())) {
560            isMetaIsolated = true;
561            continue;
562          }
563          Result result = MetaTableAccessor.scanByRegionEncodedName(conn, isolateRegionId);
564          HRegionLocation hRegionLocation =
565            MetaTableAccessor.getRegionLocation(conn, result.getRow());
566          if (hRegionLocation != null) {
567            hRegionLocationRegionIsolation.add(hRegionLocation);
568          } else {
569            LOG.error("Region " + isolateRegionId + " doesn't exists/can't fetch from"
570              + " meta...Quitting now");
571            // We only move the regions if all the regions were found.
572            allRegionOpsSuccessful = false;
573            break;
574          }
575        }
576
577        if (!allRegionOpsSuccessful) {
578          break;
579        }
580        // If hbase:meta region was isolated, then it needs to be part of isolateRegionInfoList.
581        if (isMetaIsolated) {
582          ZKWatcher zkWatcher = new ZKWatcher(conf, null, null);
583          List<HRegionLocation> result = new ArrayList<>();
584          for (String znode : zkWatcher.getMetaReplicaNodes()) {
585            String path = ZNodePaths.joinZNode(zkWatcher.getZNodePaths().baseZNode, znode);
586            int replicaId = zkWatcher.getZNodePaths().getMetaReplicaIdFromPath(path);
587            RegionState state = MetaTableLocator.getMetaRegionState(zkWatcher, replicaId);
588            result.add(new HRegionLocation(state.getRegion(), state.getServerName()));
589          }
590          ServerName metaSeverName = result.get(0).getServerName();
591          // For isolating hbase:meta, it should move explicitly in Ack mode,
592          // hence the forceMoveRegionByAck = true.
593          if (!metaSeverName.equals(server)) {
594            LOG.info("Region of {} {} is on server {} moving to {}", TableName.META_TABLE_NAME,
595              metaRegionInfo.getEncodedName(), metaSeverName, server);
596            submitRegionMovesWhileUnloading(metaSeverName, Collections.singletonList(server),
597              movedRegions, Collections.singletonList(metaRegionInfo), true);
598          } else {
599            LOG.info("Region of {} {} already exists on server: {}", TableName.META_TABLE_NAME,
600              metaRegionInfo.getEncodedName(), server);
601          }
602          isolateRegionInfoList.add(RegionInfoBuilder.FIRST_META_REGIONINFO);
603        }
604
605        if (!hRegionLocationRegionIsolation.isEmpty()) {
606          for (HRegionLocation hRegionLocation : hRegionLocationRegionIsolation) {
607            isolateRegionInfo = hRegionLocation.getRegion();
608            isolateRegionInfoList.add(isolateRegionInfo);
609            if (hRegionLocation.getServerName() == server) {
610              LOG.info("Region " + hRegionLocation.getRegion().getEncodedName() + " already exists"
611                + " on server : " + server.getHostname());
612            } else {
613              Future<Boolean> isolateRegionTask =
614                isolateRegionPool.submit(new MoveWithAck(conn, isolateRegionInfo,
615                  hRegionLocation.getServerName(), server, recentlyIsolatedRegion));
616              isolateRegionTaskList.add(isolateRegionTask);
617            }
618          }
619        }
620
621        if (!isolateRegionTaskList.isEmpty()) {
622          isolateRegionPool.shutdown();
623          // Now that we have fetched all the region's regionInfo, we can move them.
624          waitMoveTasksToFinish(isolateRegionPool, isolateRegionTaskList,
625            admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX));
626
627          Set<RegionInfo> currentRegionsOnTheServer = new HashSet<>(admin.getRegions(server));
628          if (!currentRegionsOnTheServer.containsAll(isolateRegionInfoList)) {
629            // If all the regions are not online on the target server,
630            // we don't put RS in decommission mode and exit from here.
631            LOG.error("One of the Region move failed OR stuck in transition...Quitting now");
632            break;
633          }
634        } else {
635          LOG.info("All regions already exists on server : " + server.getHostname());
636        }
637        // Once region has been moved to target RS, put the target RS into decommission mode,
638        // so master doesn't assign new region to the target RS while we unload the target RS.
639        // Also pass 'offload' flag as false since we don't want master to offload the target RS.
640        List<ServerName> listOfServer = new ArrayList<>();
641        listOfServer.add(server);
642        LOG.info("Putting server : " + server.getHostname() + " in decommission/draining mode");
643        admin.decommissionRegionServers(listOfServer, false);
644      }
645      List<RegionInfo> regionsToMove = admin.getRegions(server);
646      // Remove all the regions from the online Region list, that we just isolated.
647      // This will also include hbase:meta if it was isolated.
648      regionsToMove.removeAll(isolateRegionInfoList);
649      regionsToMove.removeAll(movedRegions);
650      if (regionsToMove.isEmpty()) {
651        LOG.info("No Regions to move....Quitting now");
652        break;
653      }
654      LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}",
655        regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack);
656
657      Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
658      if (metaRegion.isPresent()) {
659        RegionInfo meta = metaRegion.get();
660        // hbase:meta should move explicitly in Ack mode.
661        submitRegionMovesWhileUnloading(server, regionServers, movedRegions,
662          Collections.singletonList(meta), true);
663        regionsToMove.remove(meta);
664      }
665      submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove, false);
666    }
667  }
668
669  private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers,
670    List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove, boolean forceMoveRegionByAck)
671    throws Exception {
672    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
673    List<Future<Boolean>> taskList = new ArrayList<>();
674    int serverIndex = 0;
675    for (RegionInfo regionToMove : regionsToMove) {
676      // To move/isolate hbase:meta on a server, it should happen explicitly by Ack mode, hence the
677      // forceMoveRegionByAck = true.
678      if (ack || forceMoveRegionByAck) {
679        Future<Boolean> task = moveRegionsPool.submit(new MoveWithAck(conn, regionToMove, server,
680          regionServers.get(serverIndex), movedRegions));
681        taskList.add(task);
682      } else {
683        Future<Boolean> task = moveRegionsPool.submit(new MoveWithoutAck(admin, regionToMove,
684          server, regionServers.get(serverIndex), movedRegions));
685        taskList.add(task);
686      }
687      serverIndex = (serverIndex + 1) % regionServers.size();
688    }
689    moveRegionsPool.shutdown();
690    long timeoutInSeconds = regionsToMove.size()
691      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
692    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
693  }
694
695  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
696    throws TimeoutException, InterruptedException, ExecutionException {
697    pool.shutdown();
698    try {
699      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
700        LOG.warn("Timed out before finishing the " + operation + " operation. Timeout: "
701          + this.timeout + "sec");
702        pool.shutdownNow();
703      }
704    } catch (InterruptedException e) {
705      pool.shutdownNow();
706      Thread.currentThread().interrupt();
707    }
708    try {
709      return task.get(5, TimeUnit.SECONDS);
710    } catch (InterruptedException e) {
711      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
712      throw e;
713    } catch (ExecutionException e) {
714      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
715      throw e;
716    }
717  }
718
719  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
720    List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
721    try {
722      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
723        moveRegionsPool.shutdownNow();
724      }
725    } catch (InterruptedException e) {
726      moveRegionsPool.shutdownNow();
727      Thread.currentThread().interrupt();
728    }
729    for (Future<Boolean> future : taskList) {
730      try {
731        // if even after shutdownNow threads are stuck we wait for 5 secs max
732        if (!future.get(5, TimeUnit.SECONDS)) {
733          LOG.error("Was Not able to move region....Exiting Now");
734          throw new Exception("Could not move region Exception");
735        }
736      } catch (InterruptedException e) {
737        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
738        throw e;
739      } catch (ExecutionException e) {
740        boolean ignoreFailure = ignoreRegionMoveFailure(e);
741        if (ignoreFailure) {
742          LOG.debug("Ignore region move failure, it might have been split/merged.", e);
743        } else {
744          LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e);
745          throw e;
746        }
747      } catch (CancellationException e) {
748        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
749          + "secs", e);
750        throw e;
751      }
752    }
753  }
754
755  private boolean ignoreRegionMoveFailure(ExecutionException e) {
756    boolean ignoreFailure = false;
757    if (e.getCause() instanceof UnknownRegionException) {
758      // region does not exist anymore
759      ignoreFailure = true;
760    } else if (
761      e.getCause() instanceof DoNotRetryRegionException && e.getCause().getMessage() != null
762        && e.getCause().getMessage()
763          .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,")
764    ) {
765      // region is recently split
766      ignoreFailure = true;
767    }
768    return ignoreFailure;
769  }
770
771  private ServerName getTargetServer() throws Exception {
772    ServerName server = null;
773    int maxWaitInSeconds =
774      admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
775    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
776    while (EnvironmentEdgeManager.currentTime() < maxWait) {
777      try {
778        List<ServerName> regionServers = new ArrayList<>();
779        regionServers.addAll(admin.getRegionServers());
780        // Remove the host Region server from target Region Servers list
781        server = stripServer(regionServers, hostname, port);
782        if (server != null) {
783          break;
784        } else {
785          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
786        }
787      } catch (IOException e) {
788        LOG.warn("Could not get list of region servers", e);
789      }
790      Thread.sleep(500);
791    }
792    if (server == null) {
793      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
794      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
795    }
796    return server;
797  }
798
799  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
800    List<RegionInfo> regions = new ArrayList<>();
801    File f = new File(filename);
802    if (!f.exists()) {
803      return regions;
804    }
805    try (
806      DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(f)))) {
807      int numRegions = dis.readInt();
808      int index = 0;
809      while (index < numRegions) {
810        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
811        index++;
812      }
813    } catch (IOException e) {
814      LOG.error("Error while reading regions from file:" + filename, e);
815      throw e;
816    }
817    return regions;
818  }
819
820  /**
821   * Write the number of regions moved in the first line followed by regions moved in subsequent
822   * lines
823   */
824  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
825    try (DataOutputStream dos =
826      new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filename)))) {
827      dos.writeInt(movedRegions.size());
828      for (RegionInfo region : movedRegions) {
829        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
830      }
831    } catch (IOException e) {
832      LOG.error("ERROR: Was Not able to write regions moved to output file but moved "
833        + movedRegions.size() + " regions", e);
834      throw e;
835    }
836  }
837
838  private void deleteFile(String filename) {
839    File f = new File(filename);
840    if (f.exists()) {
841      f.delete();
842    }
843  }
844
845  /**
846   * @param filename The file should have 'host:port' per line
847   * @return List of servers from the file in format 'hostname:port'.
848   */
849  private List<String> readServersFromFile(String filename) throws IOException {
850    List<String> servers = new ArrayList<>();
851    if (filename != null) {
852      try {
853        Files.readAllLines(Paths.get(filename)).stream().map(String::trim)
854          .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
855          .forEach(servers::add);
856      } catch (IOException e) {
857        LOG.error("Exception while reading servers from file,", e);
858        throw e;
859      }
860    }
861    return servers;
862  }
863
864  /**
865   * Designates or excludes the servername whose hostname and port portion matches the list given in
866   * the file. Example:<br>
867   * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and
868   * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and RS3
869   * are removed from regionServers list so that regions can move to only RS1. If you want to
870   * exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. When we call
871   * includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from regionServers
872   * list so that regions can move to only RS2 and RS3.
873   */
874  private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers,
875    boolean isInclude) throws IOException {
876    if (fileName != null) {
877      List<String> servers = readServersFromFile(fileName);
878      if (servers.isEmpty()) {
879        LOG.warn("No servers provided in the file: {}." + fileName);
880        return;
881      }
882      Iterator<ServerName> i = regionServers.iterator();
883      while (i.hasNext()) {
884        String rs = i.next().getServerName();
885        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":"
886          + rs.split(ServerName.SERVERNAME_SEPARATOR)[1];
887        if (isInclude != servers.contains(rsPort)) {
888          i.remove();
889        }
890      }
891    }
892  }
893
894  /**
895   * Exclude master from list of RSs to move regions to
896   */
897  private void stripMaster(List<ServerName> regionServers) throws IOException {
898    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
899    stripServer(regionServers, master.getHostname(), master.getPort());
900  }
901
902  /**
903   * Remove the servername whose hostname and port portion matches from the passed array of servers.
904   * Returns as side-effect the servername removed.
905   * @return server removed from list of Region Servers
906   */
907  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
908    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
909      ServerName server = iter.next();
910      if (
911        server.getAddress().getHostName().equalsIgnoreCase(hostname)
912          && server.getAddress().getPort() == port
913      ) {
914        iter.remove();
915        return server;
916      }
917    }
918    return null;
919  }
920
921  @Override
922  protected void addOptions() {
923    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
924    this.addRequiredOptWithArg("o", "operation",
925      "Expected: load/unload/unload_from_rack/isolate_regions");
926    this.addOptWithArg("m", "maxthreads",
927      "Define the maximum number of threads to use to unload and reload the regions");
928    this.addOptWithArg("i", "isolateRegionIds",
929      "Comma separated list of Region IDs hash to isolate on a RegionServer and put region server"
930        + " in draining mode. This option should only be used with '-o isolate_regions'."
931        + " By putting region server in decommission/draining mode, master can't assign any"
932        + " new region on this server. If one or more regions are not found OR failed to isolate"
933        + " successfully, utility will exist without putting RS in draining/decommission mode."
934        + " Ex. --isolateRegionIds id1,id2,id3 OR -i id1,id2,id3");
935    this.addOptWithArg("x", "excludefile",
936      "File with <hostname:port> per line to exclude as unload targets; default excludes only "
937        + "target host; useful for rack decommisioning.");
938    this.addOptWithArg("d", "designatedfile",
939      "File with <hostname:port> per line as unload targets;" + "default is all online hosts");
940    this.addOptWithArg("f", "filename",
941      "File to save regions list into unloading, or read from loading; "
942        + "default /tmp/<usernamehostname:port>");
943    this.addOptNoArg("n", "noack",
944      "Turn on No-Ack mode(default: false) which won't check if region is online on target "
945        + "RegionServer, hence best effort. This is more performant in unloading and loading "
946        + "but might lead to region being unavailable for some time till master reassigns it "
947        + "in case the move failed");
948    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
949      + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
950  }
951
952  @Override
953  protected void processOptions(CommandLine cmd) {
954    String hostname = cmd.getOptionValue("r");
955    rmbuilder = new RegionMoverBuilder(hostname, getConf());
956    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
957    if (cmd.hasOption('m')) {
958      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
959    }
960    if (this.loadUnload.equals("isolate_regions") && cmd.hasOption("isolateRegionIds")) {
961      rmbuilder
962        .isolateRegionIdArray(Arrays.asList(cmd.getOptionValue("isolateRegionIds").split(",")));
963    }
964    if (cmd.hasOption('n')) {
965      rmbuilder.ack(false);
966    }
967    if (cmd.hasOption('f')) {
968      rmbuilder.filename(cmd.getOptionValue('f'));
969    }
970    if (cmd.hasOption('x')) {
971      rmbuilder.excludeFile(cmd.getOptionValue('x'));
972    }
973    if (cmd.hasOption('d')) {
974      rmbuilder.designatedFile(cmd.getOptionValue('d'));
975    }
976    if (cmd.hasOption('t')) {
977      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
978    }
979  }
980
981  @Override
982  protected int doWork() throws Exception {
983    boolean success;
984    try (RegionMover rm = rmbuilder.build()) {
985      if (loadUnload.equalsIgnoreCase("load")) {
986        success = rm.load();
987      } else if (loadUnload.equalsIgnoreCase("unload")) {
988        success = rm.unload();
989      } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) {
990        success = rm.unloadFromRack();
991      } else if (loadUnload.equalsIgnoreCase("isolate_regions")) {
992        if (rm.isolateRegionIdArray != null && !rm.isolateRegionIdArray.isEmpty()) {
993          success = rm.isolateRegions();
994        } else {
995          LOG.error("Missing -i/--isolate_regions option with '-o isolate_regions' option");
996          LOG.error("Use -h or --help for usage instructions");
997          printUsage();
998          success = false;
999        }
1000      } else {
1001        printUsage();
1002        success = false;
1003      }
1004    }
1005    return (success ? 0 : 1);
1006  }
1007
1008  public static void main(String[] args) {
1009    try (RegionMover mover = new RegionMover()) {
1010      mover.doStaticMain(args);
1011    }
1012  }
1013}