001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.BufferedInputStream;
021import java.io.BufferedOutputStream;
022import java.io.Closeable;
023import java.io.DataInputStream;
024import java.io.DataOutputStream;
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.net.InetAddress;
030import java.nio.file.Files;
031import java.nio.file.Paths;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collection;
035import java.util.Collections;
036import java.util.EnumSet;
037import java.util.HashSet;
038import java.util.Iterator;
039import java.util.List;
040import java.util.Locale;
041import java.util.Optional;
042import java.util.Set;
043import java.util.concurrent.Callable;
044import java.util.concurrent.CancellationException;
045import java.util.concurrent.ExecutionException;
046import java.util.concurrent.ExecutorService;
047import java.util.concurrent.Executors;
048import java.util.concurrent.Future;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.TimeoutException;
051import java.util.function.Predicate;
052import org.apache.commons.io.IOUtils;
053import org.apache.hadoop.conf.Configuration;
054import org.apache.hadoop.hbase.ClusterMetrics.Option;
055import org.apache.hadoop.hbase.HBaseConfiguration;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionLocation;
058import org.apache.hadoop.hbase.MetaTableAccessor;
059import org.apache.hadoop.hbase.ServerName;
060import org.apache.hadoop.hbase.UnknownRegionException;
061import org.apache.hadoop.hbase.client.Admin;
062import org.apache.hadoop.hbase.client.Connection;
063import org.apache.hadoop.hbase.client.ConnectionFactory;
064import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
065import org.apache.hadoop.hbase.client.RegionInfo;
066import org.apache.hadoop.hbase.client.RegionInfoBuilder;
067import org.apache.hadoop.hbase.client.Result;
068import org.apache.hadoop.hbase.master.RackManager;
069import org.apache.hadoop.hbase.master.RegionState;
070import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
071import org.apache.hadoop.hbase.net.Address;
072import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
073import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
074import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
075import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
076import org.apache.yetus.audience.InterfaceAudience;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
081import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
082import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
083
084/**
085 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
086 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
087 * acknowledges if regions are online after movement while noAck mode is best effort mode that
088 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
089 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
090 * anyways. This can also be used by constructiong an Object using the builder and then calling
091 * {@link #load()} or {@link #unload()} methods for the desired operations.
092 */
093@InterfaceAudience.Public
094public class RegionMover extends AbstractHBaseTool implements Closeable {
095  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
096  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
097  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
098  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
099  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
100  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
101
102  private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
103
104  private RegionMoverBuilder rmbuilder;
105  private boolean ack = true;
106  private int maxthreads = 1;
107  private int timeout;
108  private List<String> isolateRegionIdArray;
109  private String loadUnload;
110  private String hostname;
111  private String filename;
112  private String excludeFile;
113  private String designatedFile;
114  private int port;
115  private Connection conn;
116  private Admin admin;
117  private RackManager rackManager;
118
119  private RegionMover(RegionMoverBuilder builder) throws IOException {
120    this.hostname = builder.hostname;
121    this.filename = builder.filename;
122    this.excludeFile = builder.excludeFile;
123    this.designatedFile = builder.designatedFile;
124    this.maxthreads = builder.maxthreads;
125    this.isolateRegionIdArray = builder.isolateRegionIdArray;
126    this.ack = builder.ack;
127    this.port = builder.port;
128    this.timeout = builder.timeout;
129    setConf(builder.conf);
130    this.conn = ConnectionFactory.createConnection(conf);
131    this.admin = conn.getAdmin();
132
133    // if the hostname of master is ip, it indicates that the master/RS has enabled use-ip, we need
134    // to resolve the current hostname to ip to ensure that the RegionMover logic can be executed
135    // normally, see HBASE-27304 for details.
136    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
137    if (InetAddresses.isInetAddress(master.getHostname())) {
138      if (!InetAddresses.isInetAddress(this.hostname)) {
139        this.hostname = InetAddress.getByName(this.hostname).getHostAddress();
140      }
141    }
142
143    // Only while running unit tests, builder.rackManager will not be null for the convenience of
144    // providing custom rackManager. Otherwise for regular workflow/user triggered action,
145    // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is
146    // provided as @InterfaceAudience.Private and it is commented that this is just
147    // to be used by unit test.
148    rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager;
149  }
150
151  private RegionMover() {
152  }
153
154  @Override
155  public void close() {
156    IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e));
157    IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e));
158  }
159
160  /**
161   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
162   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
163   * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set
164   * the corresponding options.
165   */
166  public static class RegionMoverBuilder {
167    private boolean ack = true;
168    private int maxthreads = 1;
169    private int timeout = Integer.MAX_VALUE;
170    private List<String> isolateRegionIdArray = new ArrayList<>();
171    private String hostname;
172    private String filename;
173    private String excludeFile = null;
174    private String designatedFile = null;
175    private String defaultDir = System.getProperty("java.io.tmpdir");
176    @InterfaceAudience.Private
177    final int port;
178    private final Configuration conf;
179    private RackManager rackManager;
180
181    public RegionMoverBuilder(String hostname) {
182      this(hostname, createConf());
183    }
184
185    /**
186     * Creates a new configuration and sets region mover specific overrides
187     */
188    private static Configuration createConf() {
189      Configuration conf = HBaseConfiguration.create();
190      conf.setInt("hbase.client.prefetch.limit", 1);
191      conf.setInt("hbase.client.pause", 500);
192      conf.setInt("hbase.client.retries.number", 100);
193      return conf;
194    }
195
196    /**
197     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname or
198     *                 hostname:port.
199     * @param conf     Configuration object
200     */
201    public RegionMoverBuilder(String hostname, Configuration conf) {
202      String[] splitHostname = hostname.toLowerCase().split(":");
203      this.hostname = splitHostname[0];
204      if (splitHostname.length == 2) {
205        this.port = Integer.parseInt(splitHostname[1]);
206      } else {
207        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
208      }
209      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
210        + ":" + Integer.toString(this.port);
211      this.conf = conf;
212    }
213
214    /**
215     * Path of file where regions will be written to during unloading/read from during loading
216     * @return RegionMoverBuilder object
217     */
218    public RegionMoverBuilder filename(String filename) {
219      this.filename = filename;
220      return this;
221    }
222
223    /**
224     * Set the max number of threads that will be used to move regions
225     */
226    public RegionMoverBuilder maxthreads(int threads) {
227      this.maxthreads = threads;
228      return this;
229    }
230
231    /**
232     * Set the region ID to isolate on the region server.
233     */
234    public RegionMoverBuilder isolateRegionIdArray(List<String> isolateRegionIdArray) {
235      this.isolateRegionIdArray = isolateRegionIdArray;
236      return this;
237    }
238
239    /**
240     * Path of file containing hostnames to be excluded during region movement. Exclude file should
241     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
242     * host.
243     */
244    public RegionMoverBuilder excludeFile(String excludefile) {
245      this.excludeFile = excludefile;
246      return this;
247    }
248
249    /**
250     * Set the designated file. Designated file contains hostnames where region moves. Designated
251     * file should have 'host:port' per line. Port is mandatory here as we can have many RS running
252     * on a single host.
253     * @param designatedFile The designated file
254     * @return RegionMoverBuilder object
255     */
256    public RegionMoverBuilder designatedFile(String designatedFile) {
257      this.designatedFile = designatedFile;
258      return this;
259    }
260
261    /**
262     * Set ack/noAck mode.
263     * <p>
264     * In ack mode regions are acknowledged before and after moving and the move is retried
265     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
266     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
267     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
268     * <p>
269     * @return RegionMoverBuilder object
270     */
271    public RegionMoverBuilder ack(boolean ack) {
272      this.ack = ack;
273      return this;
274    }
275
276    /**
277     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
278     * movers also have a separate time which is hbase.move.wait.max * number of regions to
279     * load/unload
280     * @param timeout in seconds
281     * @return RegionMoverBuilder object
282     */
283    public RegionMoverBuilder timeout(int timeout) {
284      this.timeout = timeout;
285      return this;
286    }
287
288    /**
289     * Set specific rackManager implementation. This setter method is for testing purpose only.
290     * @param rackManager rackManager impl
291     * @return RegionMoverBuilder object
292     */
293    @InterfaceAudience.Private
294    public RegionMoverBuilder rackManager(RackManager rackManager) {
295      this.rackManager = rackManager;
296      return this;
297    }
298
299    /**
300     * This method builds the appropriate RegionMover object which can then be used to load/unload
301     * using load and unload methods
302     * @return RegionMover object
303     */
304    public RegionMover build() throws IOException {
305      return new RegionMover(this);
306    }
307  }
308
309  /**
310   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
311   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
312   * @return true if loading succeeded, false otherwise
313   */
314  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
315    ExecutorService loadPool = Executors.newFixedThreadPool(1);
316    Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan());
317    boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading");
318    if (!isMetaMoved) {
319      return false;
320    }
321    loadPool = Executors.newFixedThreadPool(1);
322    loadTask = loadPool.submit(getNonMetaRegionsMovePlan());
323    return waitTaskToFinish(loadPool, loadTask, "loading");
324  }
325
326  private Callable<Boolean> getMetaRegionMovePlan() {
327    return getRegionsMovePlan(true);
328  }
329
330  private Callable<Boolean> getNonMetaRegionsMovePlan() {
331    return getRegionsMovePlan(false);
332  }
333
334  private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) {
335    return () -> {
336      try {
337        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
338        if (regionsToMove.isEmpty()) {
339          LOG.info("No regions to load.Exiting");
340          return true;
341        }
342        Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
343        if (moveMetaRegion) {
344          if (metaRegion.isPresent()) {
345            loadRegions(Collections.singletonList(metaRegion.get()));
346          }
347        } else {
348          metaRegion.ifPresent(regionsToMove::remove);
349          loadRegions(regionsToMove);
350        }
351      } catch (Exception e) {
352        LOG.error("Error while loading regions to " + hostname, e);
353        return false;
354      }
355      return true;
356    };
357  }
358
359  private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) {
360    return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst();
361  }
362
363  private void loadRegions(List<RegionInfo> regionsToMove) throws Exception {
364    ServerName server = getTargetServer();
365    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
366    LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using "
367      + this.maxthreads + " threads.Ack mode:" + this.ack);
368
369    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
370    List<Future<Boolean>> taskList = new ArrayList<>();
371    int counter = 0;
372    while (counter < regionsToMove.size()) {
373      RegionInfo region = regionsToMove.get(counter);
374      ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn);
375      if (currentServer == null) {
376        LOG
377          .warn("Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
378        counter++;
379        continue;
380      } else if (server.equals(currentServer)) {
381        LOG.info(
382          "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
383        counter++;
384        continue;
385      }
386      if (ack) {
387        Future<Boolean> task = moveRegionsPool
388          .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions));
389        taskList.add(task);
390      } else {
391        Future<Boolean> task = moveRegionsPool
392          .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions));
393        taskList.add(task);
394      }
395      counter++;
396    }
397
398    moveRegionsPool.shutdown();
399    long timeoutInSeconds = regionsToMove.size()
400      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
401    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
402  }
403
404  /**
405   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
406   * noAck mode we do not make sure that region is successfully online on the target region
407   * server,hence it is best effort.We do not unload regions to hostnames given in
408   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
409   * to hostnames provided in {@link #designatedFile}
410   * @return true if unloading succeeded, false otherwise
411   */
412  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
413    return unloadRegions(false);
414  }
415
416  /**
417   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
418   * noAck mode we do not make sure that region is successfully online on the target region
419   * server,hence it is best effort.We do not unload regions to hostnames given in
420   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
421   * to hostnames provided in {@link #designatedFile}. While unloading regions, destination
422   * RegionServers are selected from different rack i.e regions should not move to any RegionServers
423   * that belong to same rack as source RegionServer.
424   * @return true if unloading succeeded, false otherwise
425   */
426  public boolean unloadFromRack()
427    throws InterruptedException, ExecutionException, TimeoutException {
428    return unloadRegions(true);
429  }
430
431  private boolean unloadRegions(boolean unloadFromRack)
432    throws ExecutionException, InterruptedException, TimeoutException {
433    return unloadRegions(unloadFromRack, null);
434  }
435
436  /**
437   * Isolated regions specified in {@link #isolateRegionIdArray} on {@link #hostname} in ack Mode
438   * and Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.
439   * In noAck mode we do not make sure that region is successfully online on the target region
440   * server,hence it is the best effort. We do not unload regions to hostnames given in
441   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
442   * to hostnames provided in {@link #designatedFile}
443   * @return true if region isolation succeeded, false otherwise
444   */
445  public boolean isolateRegions()
446    throws ExecutionException, InterruptedException, TimeoutException {
447    return unloadRegions(false, isolateRegionIdArray);
448  }
449
450  private boolean unloadRegions(boolean unloadFromRack, List<String> isolateRegionIdArray)
451    throws InterruptedException, ExecutionException, TimeoutException {
452    deleteFile(this.filename);
453    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
454    Future<Boolean> unloadTask = unloadPool.submit(() -> {
455      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
456      try {
457        // Get Online RegionServers
458        List<ServerName> regionServers = new ArrayList<>();
459        RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port));
460        LOG.info("{} belongs to {}", hostname, rsgroup.getName());
461        regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers()));
462        // Remove the host Region server from target Region Servers list
463        ServerName server = stripServer(regionServers, hostname, port);
464        if (server == null) {
465          LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
466            hostname, port);
467          LOG.debug("List of region servers: {}", regionServers);
468          return false;
469        }
470        // Remove RS not present in the designated file
471        includeExcludeRegionServers(designatedFile, regionServers, true);
472
473        // Remove RS present in the exclude file
474        includeExcludeRegionServers(excludeFile, regionServers, false);
475
476        if (unloadFromRack) {
477          // remove regionServers that belong to same rack (as source host) since the goal is to
478          // unload regions from source regionServer to destination regionServers
479          // that belong to different rack only.
480          String sourceRack = rackManager.getRack(server);
481          List<String> racks = rackManager.getRack(regionServers);
482          Iterator<ServerName> iterator = regionServers.iterator();
483          int i = 0;
484          while (iterator.hasNext()) {
485            iterator.next();
486            if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) {
487              iterator.remove();
488            }
489            i++;
490          }
491        }
492
493        // Remove decommissioned RS
494        Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers());
495        if (CollectionUtils.isNotEmpty(decommissionedRS)) {
496          regionServers.removeIf(decommissionedRS::contains);
497          LOG.debug("Excluded RegionServers from unloading regions to because they "
498            + "are marked as decommissioned. Servers: {}", decommissionedRS);
499        }
500
501        stripMaster(regionServers);
502        if (regionServers.isEmpty()) {
503          LOG.warn("No Regions were moved - no servers available");
504          return false;
505        } else {
506          LOG.info("Available servers {}", regionServers);
507        }
508        unloadRegions(server, regionServers, movedRegions, isolateRegionIdArray);
509      } catch (Exception e) {
510        LOG.error("Error while unloading regions ", e);
511        return false;
512      } finally {
513        if (movedRegions != null) {
514          writeFile(filename, movedRegions);
515        }
516      }
517      return true;
518    });
519    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
520  }
521
522  @InterfaceAudience.Private
523  Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup,
524    Collection<ServerName> onlineServers) {
525    if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
526      return onlineServers;
527    }
528    List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size());
529    for (ServerName server : onlineServers) {
530      Address address = Address.fromParts(server.getHostname(), server.getPort());
531      if (rsgroup.containsServer(address)) {
532        serverLists.add(server);
533      }
534    }
535    return serverLists;
536  }
537
538  private void unloadRegions(ServerName server, List<ServerName> regionServers,
539    List<RegionInfo> movedRegions, List<String> isolateRegionIdArray) throws Exception {
540    while (true) {
541      List<RegionInfo> isolateRegionInfoList = Collections.synchronizedList(new ArrayList<>());
542      RegionInfo isolateRegionInfo = null;
543      if (isolateRegionIdArray != null && !isolateRegionIdArray.isEmpty()) {
544        // Region will be moved to target region server with Ack mode.
545        final ExecutorService isolateRegionPool = Executors.newFixedThreadPool(maxthreads);
546        List<Future<Boolean>> isolateRegionTaskList = new ArrayList<>();
547        List<RegionInfo> recentlyIsolatedRegion = Collections.synchronizedList(new ArrayList<>());
548        boolean allRegionOpsSuccessful = true;
549        boolean isMetaIsolated = false;
550        RegionInfo metaRegionInfo = RegionInfoBuilder.FIRST_META_REGIONINFO;
551        List<HRegionLocation> hRegionLocationRegionIsolation =
552          Collections.synchronizedList(new ArrayList<>());
553        for (String isolateRegionId : isolateRegionIdArray) {
554          if (isolateRegionId.equalsIgnoreCase(metaRegionInfo.getEncodedName())) {
555            isMetaIsolated = true;
556            continue;
557          }
558          Result result = MetaTableAccessor.scanByRegionEncodedName(conn, isolateRegionId);
559          HRegionLocation hRegionLocation =
560            MetaTableAccessor.getRegionLocation(conn, result.getRow());
561          if (hRegionLocation != null) {
562            hRegionLocationRegionIsolation.add(hRegionLocation);
563          } else {
564            LOG.error("Region " + isolateRegionId + " doesn't exists/can't fetch from"
565              + " meta...Quitting now");
566            // We only move the regions if all the regions were found.
567            allRegionOpsSuccessful = false;
568            break;
569          }
570        }
571
572        if (!allRegionOpsSuccessful) {
573          break;
574        }
575        // If hbase:meta region was isolated, then it needs to be part of isolateRegionInfoList.
576        if (isMetaIsolated) {
577          ZKWatcher zkWatcher = new ZKWatcher(conf, null, null);
578          List<HRegionLocation> result = new ArrayList<>();
579          for (String znode : zkWatcher.getMetaReplicaNodes()) {
580            String path = ZNodePaths.joinZNode(zkWatcher.getZNodePaths().baseZNode, znode);
581            int replicaId = zkWatcher.getZNodePaths().getMetaReplicaIdFromPath(path);
582            RegionState state = MetaTableLocator.getMetaRegionState(zkWatcher, replicaId);
583            result.add(new HRegionLocation(state.getRegion(), state.getServerName()));
584          }
585          ServerName metaSeverName = result.get(0).getServerName();
586          // For isolating hbase:meta, it should move explicitly in Ack mode,
587          // hence the forceMoveRegionByAck = true.
588          if (!metaSeverName.equals(server)) {
589            LOG.info("Region of hbase:meta " + metaRegionInfo.getEncodedName() + " is on server "
590              + metaSeverName + " moving to " + server);
591            submitRegionMovesWhileUnloading(metaSeverName, Collections.singletonList(server),
592              movedRegions, Collections.singletonList(metaRegionInfo), true);
593          } else {
594            LOG.info("Region of hbase:meta " + metaRegionInfo.getEncodedName() + " already exists"
595              + " on server : " + server);
596          }
597          isolateRegionInfoList.add(RegionInfoBuilder.FIRST_META_REGIONINFO);
598        }
599
600        if (!hRegionLocationRegionIsolation.isEmpty()) {
601          for (HRegionLocation hRegionLocation : hRegionLocationRegionIsolation) {
602            isolateRegionInfo = hRegionLocation.getRegion();
603            isolateRegionInfoList.add(isolateRegionInfo);
604            if (hRegionLocation.getServerName() == server) {
605              LOG.info("Region " + hRegionLocation.getRegion().getEncodedName() + " already exists"
606                + " on server : " + server.getHostname());
607            } else {
608              Future<Boolean> isolateRegionTask =
609                isolateRegionPool.submit(new MoveWithAck(conn, isolateRegionInfo,
610                  hRegionLocation.getServerName(), server, recentlyIsolatedRegion));
611              isolateRegionTaskList.add(isolateRegionTask);
612            }
613          }
614        }
615
616        if (!isolateRegionTaskList.isEmpty()) {
617          isolateRegionPool.shutdown();
618          // Now that we have fetched all the region's regionInfo, we can move them.
619          waitMoveTasksToFinish(isolateRegionPool, isolateRegionTaskList,
620            admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX));
621
622          Set<RegionInfo> currentRegionsOnTheServer = new HashSet<>(admin.getRegions(server));
623          if (!currentRegionsOnTheServer.containsAll(isolateRegionInfoList)) {
624            // If all the regions are not online on the target server,
625            // we don't put RS in decommission mode and exit from here.
626            LOG.error("One of the Region move failed OR stuck in transition...Quitting now");
627            break;
628          }
629        } else {
630          LOG.info("All regions already exists on server : " + server.getHostname());
631        }
632        // Once region has been moved to target RS, put the target RS into decommission mode,
633        // so master doesn't assign new region to the target RS while we unload the target RS.
634        // Also pass 'offload' flag as false since we don't want master to offload the target RS.
635        List<ServerName> listOfServer = new ArrayList<>();
636        listOfServer.add(server);
637        LOG.info("Putting server : " + server.getHostname() + " in decommission/draining mode");
638        admin.decommissionRegionServers(listOfServer, false);
639      }
640      List<RegionInfo> regionsToMove = admin.getRegions(server);
641      // Remove all the regions from the online Region list, that we just isolated.
642      // This will also include hbase:meta if it was isolated.
643      regionsToMove.removeAll(isolateRegionInfoList);
644      regionsToMove.removeAll(movedRegions);
645      if (regionsToMove.isEmpty()) {
646        LOG.info("No Regions to move....Quitting now");
647        break;
648      }
649      LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}",
650        regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack);
651
652      Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
653      if (metaRegion.isPresent()) {
654        RegionInfo meta = metaRegion.get();
655        // hbase:meta should move explicitly in Ack mode.
656        submitRegionMovesWhileUnloading(server, regionServers, movedRegions,
657          Collections.singletonList(meta), true);
658        regionsToMove.remove(meta);
659      }
660      submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove, false);
661    }
662  }
663
664  private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers,
665    List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove, boolean forceMoveRegionByAck)
666    throws Exception {
667    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
668    List<Future<Boolean>> taskList = new ArrayList<>();
669    int serverIndex = 0;
670    for (RegionInfo regionToMove : regionsToMove) {
671      // To move/isolate hbase:meta on a server, it should happen explicitly by Ack mode, hence the
672      // forceMoveRegionByAck = true.
673      if (ack || forceMoveRegionByAck) {
674        Future<Boolean> task = moveRegionsPool.submit(new MoveWithAck(conn, regionToMove, server,
675          regionServers.get(serverIndex), movedRegions));
676        taskList.add(task);
677      } else {
678        Future<Boolean> task = moveRegionsPool.submit(new MoveWithoutAck(admin, regionToMove,
679          server, regionServers.get(serverIndex), movedRegions));
680        taskList.add(task);
681      }
682      serverIndex = (serverIndex + 1) % regionServers.size();
683    }
684    moveRegionsPool.shutdown();
685    long timeoutInSeconds = regionsToMove.size()
686      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
687    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
688  }
689
690  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
691    throws TimeoutException, InterruptedException, ExecutionException {
692    pool.shutdown();
693    try {
694      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
695        LOG.warn("Timed out before finishing the " + operation + " operation. Timeout: "
696          + this.timeout + "sec");
697        pool.shutdownNow();
698      }
699    } catch (InterruptedException e) {
700      pool.shutdownNow();
701      Thread.currentThread().interrupt();
702    }
703    try {
704      return task.get(5, TimeUnit.SECONDS);
705    } catch (InterruptedException e) {
706      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
707      throw e;
708    } catch (ExecutionException e) {
709      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
710      throw e;
711    }
712  }
713
714  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
715    List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
716    try {
717      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
718        moveRegionsPool.shutdownNow();
719      }
720    } catch (InterruptedException e) {
721      moveRegionsPool.shutdownNow();
722      Thread.currentThread().interrupt();
723    }
724    for (Future<Boolean> future : taskList) {
725      try {
726        // if even after shutdownNow threads are stuck we wait for 5 secs max
727        if (!future.get(5, TimeUnit.SECONDS)) {
728          LOG.error("Was Not able to move region....Exiting Now");
729          throw new Exception("Could not move region Exception");
730        }
731      } catch (InterruptedException e) {
732        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
733        throw e;
734      } catch (ExecutionException e) {
735        boolean ignoreFailure = ignoreRegionMoveFailure(e);
736        if (ignoreFailure) {
737          LOG.debug("Ignore region move failure, it might have been split/merged.", e);
738        } else {
739          LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e);
740          throw e;
741        }
742      } catch (CancellationException e) {
743        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
744          + "secs", e);
745        throw e;
746      }
747    }
748  }
749
750  private boolean ignoreRegionMoveFailure(ExecutionException e) {
751    boolean ignoreFailure = false;
752    if (e.getCause() instanceof UnknownRegionException) {
753      // region does not exist anymore
754      ignoreFailure = true;
755    } else if (
756      e.getCause() instanceof DoNotRetryRegionException && e.getCause().getMessage() != null
757        && e.getCause().getMessage()
758          .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,")
759    ) {
760      // region is recently split
761      ignoreFailure = true;
762    }
763    return ignoreFailure;
764  }
765
766  private ServerName getTargetServer() throws Exception {
767    ServerName server = null;
768    int maxWaitInSeconds =
769      admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
770    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
771    while (EnvironmentEdgeManager.currentTime() < maxWait) {
772      try {
773        List<ServerName> regionServers = new ArrayList<>();
774        regionServers.addAll(admin.getRegionServers());
775        // Remove the host Region server from target Region Servers list
776        server = stripServer(regionServers, hostname, port);
777        if (server != null) {
778          break;
779        } else {
780          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
781        }
782      } catch (IOException e) {
783        LOG.warn("Could not get list of region servers", e);
784      }
785      Thread.sleep(500);
786    }
787    if (server == null) {
788      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
789      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
790    }
791    return server;
792  }
793
794  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
795    List<RegionInfo> regions = new ArrayList<>();
796    File f = new File(filename);
797    if (!f.exists()) {
798      return regions;
799    }
800    try (
801      DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(f)))) {
802      int numRegions = dis.readInt();
803      int index = 0;
804      while (index < numRegions) {
805        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
806        index++;
807      }
808    } catch (IOException e) {
809      LOG.error("Error while reading regions from file:" + filename, e);
810      throw e;
811    }
812    return regions;
813  }
814
815  /**
816   * Write the number of regions moved in the first line followed by regions moved in subsequent
817   * lines
818   */
819  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
820    try (DataOutputStream dos =
821      new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filename)))) {
822      dos.writeInt(movedRegions.size());
823      for (RegionInfo region : movedRegions) {
824        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
825      }
826    } catch (IOException e) {
827      LOG.error("ERROR: Was Not able to write regions moved to output file but moved "
828        + movedRegions.size() + " regions", e);
829      throw e;
830    }
831  }
832
833  private void deleteFile(String filename) {
834    File f = new File(filename);
835    if (f.exists()) {
836      f.delete();
837    }
838  }
839
840  /**
841   * @param filename The file should have 'host:port' per line
842   * @return List of servers from the file in format 'hostname:port'.
843   */
844  private List<String> readServersFromFile(String filename) throws IOException {
845    List<String> servers = new ArrayList<>();
846    if (filename != null) {
847      try {
848        Files.readAllLines(Paths.get(filename)).stream().map(String::trim)
849          .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
850          .forEach(servers::add);
851      } catch (IOException e) {
852        LOG.error("Exception while reading servers from file,", e);
853        throw e;
854      }
855    }
856    return servers;
857  }
858
859  /**
860   * Designates or excludes the servername whose hostname and port portion matches the list given in
861   * the file. Example:<br>
862   * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and
863   * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and RS3
864   * are removed from regionServers list so that regions can move to only RS1. If you want to
865   * exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. When we call
866   * includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from regionServers
867   * list so that regions can move to only RS2 and RS3.
868   */
869  private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers,
870    boolean isInclude) throws IOException {
871    if (fileName != null) {
872      List<String> servers = readServersFromFile(fileName);
873      if (servers.isEmpty()) {
874        LOG.warn("No servers provided in the file: {}." + fileName);
875        return;
876      }
877      Iterator<ServerName> i = regionServers.iterator();
878      while (i.hasNext()) {
879        String rs = i.next().getServerName();
880        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":"
881          + rs.split(ServerName.SERVERNAME_SEPARATOR)[1];
882        if (isInclude != servers.contains(rsPort)) {
883          i.remove();
884        }
885      }
886    }
887  }
888
889  /**
890   * Exclude master from list of RSs to move regions to
891   */
892  private void stripMaster(List<ServerName> regionServers) throws IOException {
893    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
894    stripServer(regionServers, master.getHostname(), master.getPort());
895  }
896
897  /**
898   * Remove the servername whose hostname and port portion matches from the passed array of servers.
899   * Returns as side-effect the servername removed.
900   * @return server removed from list of Region Servers
901   */
902  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
903    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
904      ServerName server = iter.next();
905      if (
906        server.getAddress().getHostName().equalsIgnoreCase(hostname)
907          && server.getAddress().getPort() == port
908      ) {
909        iter.remove();
910        return server;
911      }
912    }
913    return null;
914  }
915
916  @Override
917  protected void addOptions() {
918    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
919    this.addRequiredOptWithArg("o", "operation",
920      "Expected: load/unload/unload_from_rack/isolate_regions");
921    this.addOptWithArg("m", "maxthreads",
922      "Define the maximum number of threads to use to unload and reload the regions");
923    this.addOptWithArg("i", "isolateRegionIds",
924      "Comma separated list of Region IDs hash to isolate on a RegionServer and put region server"
925        + " in draining mode. This option should only be used with '-o isolate_regions'."
926        + " By putting region server in decommission/draining mode, master can't assign any"
927        + " new region on this server. If one or more regions are not found OR failed to isolate"
928        + " successfully, utility will exist without putting RS in draining/decommission mode."
929        + " Ex. --isolateRegionIds id1,id2,id3 OR -i id1,id2,id3");
930    this.addOptWithArg("x", "excludefile",
931      "File with <hostname:port> per line to exclude as unload targets; default excludes only "
932        + "target host; useful for rack decommisioning.");
933    this.addOptWithArg("d", "designatedfile",
934      "File with <hostname:port> per line as unload targets;" + "default is all online hosts");
935    this.addOptWithArg("f", "filename",
936      "File to save regions list into unloading, or read from loading; "
937        + "default /tmp/<usernamehostname:port>");
938    this.addOptNoArg("n", "noack",
939      "Turn on No-Ack mode(default: false) which won't check if region is online on target "
940        + "RegionServer, hence best effort. This is more performant in unloading and loading "
941        + "but might lead to region being unavailable for some time till master reassigns it "
942        + "in case the move failed");
943    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
944      + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
945  }
946
947  @Override
948  protected void processOptions(CommandLine cmd) {
949    String hostname = cmd.getOptionValue("r");
950    rmbuilder = new RegionMoverBuilder(hostname);
951    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
952    if (cmd.hasOption('m')) {
953      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
954    }
955    if (this.loadUnload.equals("isolate_regions") && cmd.hasOption("isolateRegionIds")) {
956      rmbuilder
957        .isolateRegionIdArray(Arrays.asList(cmd.getOptionValue("isolateRegionIds").split(",")));
958    }
959    if (cmd.hasOption('n')) {
960      rmbuilder.ack(false);
961    }
962    if (cmd.hasOption('f')) {
963      rmbuilder.filename(cmd.getOptionValue('f'));
964    }
965    if (cmd.hasOption('x')) {
966      rmbuilder.excludeFile(cmd.getOptionValue('x'));
967    }
968    if (cmd.hasOption('d')) {
969      rmbuilder.designatedFile(cmd.getOptionValue('d'));
970    }
971    if (cmd.hasOption('t')) {
972      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
973    }
974  }
975
976  @Override
977  protected int doWork() throws Exception {
978    boolean success;
979    try (RegionMover rm = rmbuilder.build()) {
980      if (loadUnload.equalsIgnoreCase("load")) {
981        success = rm.load();
982      } else if (loadUnload.equalsIgnoreCase("unload")) {
983        success = rm.unload();
984      } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) {
985        success = rm.unloadFromRack();
986      } else if (loadUnload.equalsIgnoreCase("isolate_regions")) {
987        if (rm.isolateRegionIdArray != null && !rm.isolateRegionIdArray.isEmpty()) {
988          success = rm.isolateRegions();
989        } else {
990          LOG.error("Missing -i/--isolate_regions option with '-o isolate_regions' option");
991          LOG.error("Use -h or --help for usage instructions");
992          printUsage();
993          success = false;
994        }
995      } else {
996        printUsage();
997        success = false;
998      }
999    }
1000    return (success ? 0 : 1);
1001  }
1002
1003  public static void main(String[] args) {
1004    try (RegionMover mover = new RegionMover()) {
1005      mover.doStaticMain(args);
1006    }
1007  }
1008}