001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.BufferedInputStream;
021import java.io.BufferedOutputStream;
022import java.io.Closeable;
023import java.io.DataInputStream;
024import java.io.DataOutputStream;
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.net.InetAddress;
030import java.nio.file.Files;
031import java.nio.file.Paths;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collection;
035import java.util.Collections;
036import java.util.EnumSet;
037import java.util.HashSet;
038import java.util.Iterator;
039import java.util.List;
040import java.util.Locale;
041import java.util.Optional;
042import java.util.Set;
043import java.util.concurrent.Callable;
044import java.util.concurrent.CancellationException;
045import java.util.concurrent.ExecutionException;
046import java.util.concurrent.ExecutorService;
047import java.util.concurrent.Executors;
048import java.util.concurrent.Future;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.TimeoutException;
051import java.util.function.Predicate;
052import org.apache.commons.io.IOUtils;
053import org.apache.hadoop.conf.Configuration;
054import org.apache.hadoop.hbase.ClusterMetrics.Option;
055import org.apache.hadoop.hbase.HBaseConfiguration;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionLocation;
058import org.apache.hadoop.hbase.MetaTableAccessor;
059import org.apache.hadoop.hbase.ServerName;
060import org.apache.hadoop.hbase.TableName;
061import org.apache.hadoop.hbase.UnknownRegionException;
062import org.apache.hadoop.hbase.client.Admin;
063import org.apache.hadoop.hbase.client.Connection;
064import org.apache.hadoop.hbase.client.ConnectionFactory;
065import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
066import org.apache.hadoop.hbase.client.RegionInfo;
067import org.apache.hadoop.hbase.client.RegionInfoBuilder;
068import org.apache.hadoop.hbase.client.Result;
069import org.apache.hadoop.hbase.master.RackManager;
070import org.apache.hadoop.hbase.master.RegionState;
071import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
072import org.apache.hadoop.hbase.net.Address;
073import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
074import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
075import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
076import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
082import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
083import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
084
085/**
086 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
087 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
088 * acknowledges if regions are online after movement while noAck mode is best effort mode that
089 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
090 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
091 * anyways. This can also be used by constructiong an Object using the builder and then calling
092 * {@link #load()} or {@link #unload()} methods for the desired operations.
093 */
094@InterfaceAudience.Public
095public class RegionMover extends AbstractHBaseTool implements Closeable {
096  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
097  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
098  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
099  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
100  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
101  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
102
103  private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
104
105  private RegionMoverBuilder rmbuilder;
106  private boolean ack = true;
107  private int maxthreads = 1;
108  private int timeout;
109  private List<String> isolateRegionIdArray;
110  private String loadUnload;
111  private String hostname;
112  private String filename;
113  private String excludeFile;
114  private String designatedFile;
115  private int port;
116  private Connection conn;
117  private Admin admin;
118  private RackManager rackManager;
119
120  private RegionMover(RegionMoverBuilder builder) throws IOException {
121    this.hostname = builder.hostname;
122    this.filename = builder.filename;
123    this.excludeFile = builder.excludeFile;
124    this.designatedFile = builder.designatedFile;
125    this.maxthreads = builder.maxthreads;
126    this.isolateRegionIdArray = builder.isolateRegionIdArray;
127    this.ack = builder.ack;
128    this.port = builder.port;
129    this.timeout = builder.timeout;
130    setConf(builder.conf);
131    this.conn = ConnectionFactory.createConnection(conf);
132    this.admin = conn.getAdmin();
133
134    // if the hostname of master is ip, it indicates that the master/RS has enabled use-ip, we need
135    // to resolve the current hostname to ip to ensure that the RegionMover logic can be executed
136    // normally, see HBASE-27304 for details.
137    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
138    if (InetAddresses.isInetAddress(master.getHostname())) {
139      if (!InetAddresses.isInetAddress(this.hostname)) {
140        this.hostname = InetAddress.getByName(this.hostname).getHostAddress();
141      }
142    }
143
144    // Only while running unit tests, builder.rackManager will not be null for the convenience of
145    // providing custom rackManager. Otherwise for regular workflow/user triggered action,
146    // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is
147    // provided as @InterfaceAudience.Private and it is commented that this is just
148    // to be used by unit test.
149    rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager;
150  }
151
152  private RegionMover() {
153  }
154
155  @Override
156  public void close() {
157    IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e));
158    IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e));
159  }
160
161  /**
162   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
163   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
164   * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set
165   * the corresponding options.
166   */
167  public static class RegionMoverBuilder {
168    private boolean ack = true;
169    private int maxthreads = 1;
170    private int timeout = Integer.MAX_VALUE;
171    private List<String> isolateRegionIdArray = new ArrayList<>();
172    private String hostname;
173    private String filename;
174    private String excludeFile = null;
175    private String designatedFile = null;
176    private String defaultDir = System.getProperty("java.io.tmpdir");
177    @InterfaceAudience.Private
178    final int port;
179    private final Configuration conf;
180    private RackManager rackManager;
181
182    public RegionMoverBuilder(String hostname) {
183      this(hostname, createConf());
184    }
185
186    /**
187     * Creates a new configuration and sets region mover specific overrides
188     */
189    private static Configuration createConf() {
190      Configuration conf = HBaseConfiguration.create();
191      conf.setInt("hbase.client.prefetch.limit", 1);
192      conf.setInt("hbase.client.pause", 500);
193      conf.setInt("hbase.client.retries.number", 100);
194      return conf;
195    }
196
197    /**
198     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname or
199     *                 hostname:port.
200     * @param conf     Configuration object
201     */
202    public RegionMoverBuilder(String hostname, Configuration conf) {
203      String[] splitHostname = hostname.toLowerCase().split(":");
204      this.hostname = splitHostname[0];
205      if (splitHostname.length == 2) {
206        this.port = Integer.parseInt(splitHostname[1]);
207      } else {
208        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
209      }
210      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
211        + ":" + Integer.toString(this.port);
212      this.conf = conf;
213    }
214
215    /**
216     * Path of file where regions will be written to during unloading/read from during loading
217     * @return RegionMoverBuilder object
218     */
219    public RegionMoverBuilder filename(String filename) {
220      this.filename = filename;
221      return this;
222    }
223
224    /**
225     * Set the max number of threads that will be used to move regions
226     */
227    public RegionMoverBuilder maxthreads(int threads) {
228      this.maxthreads = threads;
229      return this;
230    }
231
232    /**
233     * Set the region ID to isolate on the region server.
234     */
235    public RegionMoverBuilder isolateRegionIdArray(List<String> isolateRegionIdArray) {
236      this.isolateRegionIdArray = isolateRegionIdArray;
237      return this;
238    }
239
240    /**
241     * Path of file containing hostnames to be excluded during region movement. Exclude file should
242     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
243     * host.
244     */
245    public RegionMoverBuilder excludeFile(String excludefile) {
246      this.excludeFile = excludefile;
247      return this;
248    }
249
250    /**
251     * Set the designated file. Designated file contains hostnames where region moves. Designated
252     * file should have 'host:port' per line. Port is mandatory here as we can have many RS running
253     * on a single host.
254     * @param designatedFile The designated file
255     * @return RegionMoverBuilder object
256     */
257    public RegionMoverBuilder designatedFile(String designatedFile) {
258      this.designatedFile = designatedFile;
259      return this;
260    }
261
262    /**
263     * Set ack/noAck mode.
264     * <p>
265     * In ack mode regions are acknowledged before and after moving and the move is retried
266     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
267     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
268     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
269     * <p>
270     * @return RegionMoverBuilder object
271     */
272    public RegionMoverBuilder ack(boolean ack) {
273      this.ack = ack;
274      return this;
275    }
276
277    /**
278     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
279     * movers also have a separate time which is hbase.move.wait.max * number of regions to
280     * load/unload
281     * @param timeout in seconds
282     * @return RegionMoverBuilder object
283     */
284    public RegionMoverBuilder timeout(int timeout) {
285      this.timeout = timeout;
286      return this;
287    }
288
289    /**
290     * Set specific rackManager implementation. This setter method is for testing purpose only.
291     * @param rackManager rackManager impl
292     * @return RegionMoverBuilder object
293     */
294    @InterfaceAudience.Private
295    public RegionMoverBuilder rackManager(RackManager rackManager) {
296      this.rackManager = rackManager;
297      return this;
298    }
299
300    /**
301     * This method builds the appropriate RegionMover object which can then be used to load/unload
302     * using load and unload methods
303     * @return RegionMover object
304     */
305    public RegionMover build() throws IOException {
306      return new RegionMover(this);
307    }
308  }
309
310  /**
311   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
312   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
313   * @return true if loading succeeded, false otherwise
314   */
315  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
316    ExecutorService loadPool = Executors.newFixedThreadPool(1);
317    Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan());
318    boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading");
319    if (!isMetaMoved) {
320      return false;
321    }
322    loadPool = Executors.newFixedThreadPool(1);
323    loadTask = loadPool.submit(getNonMetaRegionsMovePlan());
324    return waitTaskToFinish(loadPool, loadTask, "loading");
325  }
326
327  private Callable<Boolean> getMetaRegionMovePlan() {
328    return getRegionsMovePlan(true);
329  }
330
331  private Callable<Boolean> getNonMetaRegionsMovePlan() {
332    return getRegionsMovePlan(false);
333  }
334
335  private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) {
336    return () -> {
337      try {
338        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
339        if (regionsToMove.isEmpty()) {
340          LOG.info("No regions to load.Exiting");
341          return true;
342        }
343        Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
344        if (moveMetaRegion) {
345          if (metaRegion.isPresent()) {
346            loadRegions(Collections.singletonList(metaRegion.get()));
347          }
348        } else {
349          metaRegion.ifPresent(regionsToMove::remove);
350          loadRegions(regionsToMove);
351        }
352      } catch (Exception e) {
353        LOG.error("Error while loading regions to " + hostname, e);
354        return false;
355      }
356      return true;
357    };
358  }
359
360  private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) {
361    return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst();
362  }
363
364  private void loadRegions(List<RegionInfo> regionsToMove) throws Exception {
365    ServerName server = getTargetServer();
366    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
367    LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using "
368      + this.maxthreads + " threads.Ack mode:" + this.ack);
369
370    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
371    List<Future<Boolean>> taskList = new ArrayList<>();
372    int counter = 0;
373    while (counter < regionsToMove.size()) {
374      RegionInfo region = regionsToMove.get(counter);
375      ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn);
376      if (currentServer == null) {
377        LOG
378          .warn("Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
379        counter++;
380        continue;
381      } else if (server.equals(currentServer)) {
382        LOG.info(
383          "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
384        counter++;
385        continue;
386      }
387      if (ack) {
388        Future<Boolean> task = moveRegionsPool
389          .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions));
390        taskList.add(task);
391      } else {
392        Future<Boolean> task = moveRegionsPool
393          .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions));
394        taskList.add(task);
395      }
396      counter++;
397    }
398
399    moveRegionsPool.shutdown();
400    long timeoutInSeconds = regionsToMove.size()
401      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
402    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
403  }
404
405  /**
406   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
407   * noAck mode we do not make sure that region is successfully online on the target region
408   * server,hence it is best effort.We do not unload regions to hostnames given in
409   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
410   * to hostnames provided in {@link #designatedFile}
411   * @return true if unloading succeeded, false otherwise
412   */
413  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
414    return unloadRegions(false);
415  }
416
417  /**
418   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
419   * noAck mode we do not make sure that region is successfully online on the target region
420   * server,hence it is best effort.We do not unload regions to hostnames given in
421   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
422   * to hostnames provided in {@link #designatedFile}. While unloading regions, destination
423   * RegionServers are selected from different rack i.e regions should not move to any RegionServers
424   * that belong to same rack as source RegionServer.
425   * @return true if unloading succeeded, false otherwise
426   */
427  public boolean unloadFromRack()
428    throws InterruptedException, ExecutionException, TimeoutException {
429    return unloadRegions(true);
430  }
431
432  private boolean unloadRegions(boolean unloadFromRack)
433    throws ExecutionException, InterruptedException, TimeoutException {
434    return unloadRegions(unloadFromRack, null);
435  }
436
437  /**
438   * Isolated regions specified in {@link #isolateRegionIdArray} on {@link #hostname} in ack Mode
439   * and Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.
440   * In noAck mode we do not make sure that region is successfully online on the target region
441   * server,hence it is the best effort. We do not unload regions to hostnames given in
442   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
443   * to hostnames provided in {@link #designatedFile}
444   * @return true if region isolation succeeded, false otherwise
445   */
446  public boolean isolateRegions()
447    throws ExecutionException, InterruptedException, TimeoutException {
448    return unloadRegions(false, isolateRegionIdArray);
449  }
450
451  private boolean unloadRegions(boolean unloadFromRack, List<String> isolateRegionIdArray)
452    throws InterruptedException, ExecutionException, TimeoutException {
453    deleteFile(this.filename);
454    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
455    Future<Boolean> unloadTask = unloadPool.submit(() -> {
456      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
457      try {
458        // Get Online RegionServers
459        List<ServerName> regionServers = new ArrayList<>();
460        RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port));
461        LOG.info("{} belongs to {}", hostname, rsgroup.getName());
462        regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers()));
463        // Remove the host Region server from target Region Servers list
464        ServerName server = stripServer(regionServers, hostname, port);
465        if (server == null) {
466          LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
467            hostname, port);
468          LOG.debug("List of region servers: {}", regionServers);
469          return false;
470        }
471        // Remove RS not present in the designated file
472        includeExcludeRegionServers(designatedFile, regionServers, true);
473
474        // Remove RS present in the exclude file
475        includeExcludeRegionServers(excludeFile, regionServers, false);
476
477        if (unloadFromRack) {
478          // remove regionServers that belong to same rack (as source host) since the goal is to
479          // unload regions from source regionServer to destination regionServers
480          // that belong to different rack only.
481          String sourceRack = rackManager.getRack(server);
482          List<String> racks = rackManager.getRack(regionServers);
483          Iterator<ServerName> iterator = regionServers.iterator();
484          int i = 0;
485          while (iterator.hasNext()) {
486            iterator.next();
487            if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) {
488              iterator.remove();
489            }
490            i++;
491          }
492        }
493
494        // Remove decommissioned RS
495        Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers());
496        if (CollectionUtils.isNotEmpty(decommissionedRS)) {
497          regionServers.removeIf(decommissionedRS::contains);
498          LOG.debug("Excluded RegionServers from unloading regions to because they "
499            + "are marked as decommissioned. Servers: {}", decommissionedRS);
500        }
501
502        stripMaster(regionServers);
503        if (regionServers.isEmpty()) {
504          LOG.warn("No Regions were moved - no servers available");
505          return false;
506        } else {
507          LOG.info("Available servers {}", regionServers);
508        }
509        unloadRegions(server, regionServers, movedRegions, isolateRegionIdArray);
510      } catch (Exception e) {
511        LOG.error("Error while unloading regions ", e);
512        return false;
513      } finally {
514        if (movedRegions != null) {
515          writeFile(filename, movedRegions);
516        }
517      }
518      return true;
519    });
520    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
521  }
522
523  @InterfaceAudience.Private
524  Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup,
525    Collection<ServerName> onlineServers) {
526    if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
527      return onlineServers;
528    }
529    List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size());
530    for (ServerName server : onlineServers) {
531      Address address = Address.fromParts(server.getHostname(), server.getPort());
532      if (rsgroup.containsServer(address)) {
533        serverLists.add(server);
534      }
535    }
536    return serverLists;
537  }
538
539  private void unloadRegions(ServerName server, List<ServerName> regionServers,
540    List<RegionInfo> movedRegions, List<String> isolateRegionIdArray) throws Exception {
541    while (true) {
542      List<RegionInfo> isolateRegionInfoList = Collections.synchronizedList(new ArrayList<>());
543      RegionInfo isolateRegionInfo = null;
544      if (isolateRegionIdArray != null && !isolateRegionIdArray.isEmpty()) {
545        // Region will be moved to target region server with Ack mode.
546        final ExecutorService isolateRegionPool = Executors.newFixedThreadPool(maxthreads);
547        List<Future<Boolean>> isolateRegionTaskList = new ArrayList<>();
548        List<RegionInfo> recentlyIsolatedRegion = Collections.synchronizedList(new ArrayList<>());
549        boolean allRegionOpsSuccessful = true;
550        boolean isMetaIsolated = false;
551        RegionInfo metaRegionInfo = RegionInfoBuilder.FIRST_META_REGIONINFO;
552        List<HRegionLocation> hRegionLocationRegionIsolation =
553          Collections.synchronizedList(new ArrayList<>());
554        for (String isolateRegionId : isolateRegionIdArray) {
555          if (isolateRegionId.equalsIgnoreCase(metaRegionInfo.getEncodedName())) {
556            isMetaIsolated = true;
557            continue;
558          }
559          Result result = MetaTableAccessor.scanByRegionEncodedName(conn, isolateRegionId);
560          HRegionLocation hRegionLocation =
561            MetaTableAccessor.getRegionLocation(conn, result.getRow());
562          if (hRegionLocation != null) {
563            hRegionLocationRegionIsolation.add(hRegionLocation);
564          } else {
565            LOG.error("Region " + isolateRegionId + " doesn't exists/can't fetch from"
566              + " meta...Quitting now");
567            // We only move the regions if all the regions were found.
568            allRegionOpsSuccessful = false;
569            break;
570          }
571        }
572
573        if (!allRegionOpsSuccessful) {
574          break;
575        }
576        // If hbase:meta region was isolated, then it needs to be part of isolateRegionInfoList.
577        if (isMetaIsolated) {
578          ZKWatcher zkWatcher = new ZKWatcher(conf, null, null);
579          List<HRegionLocation> result = new ArrayList<>();
580          for (String znode : zkWatcher.getMetaReplicaNodes()) {
581            String path = ZNodePaths.joinZNode(zkWatcher.getZNodePaths().baseZNode, znode);
582            int replicaId = zkWatcher.getZNodePaths().getMetaReplicaIdFromPath(path);
583            RegionState state = MetaTableLocator.getMetaRegionState(zkWatcher, replicaId);
584            result.add(new HRegionLocation(state.getRegion(), state.getServerName()));
585          }
586          ServerName metaSeverName = result.get(0).getServerName();
587          // For isolating hbase:meta, it should move explicitly in Ack mode,
588          // hence the forceMoveRegionByAck = true.
589          if (!metaSeverName.equals(server)) {
590            LOG.info("Region of {} {} is on server {} moving to {}", TableName.META_TABLE_NAME,
591              metaRegionInfo.getEncodedName(), metaSeverName, server);
592            submitRegionMovesWhileUnloading(metaSeverName, Collections.singletonList(server),
593              movedRegions, Collections.singletonList(metaRegionInfo), true);
594          } else {
595            LOG.info("Region of {} {} already exists on server: {}", TableName.META_TABLE_NAME,
596              metaRegionInfo.getEncodedName(), server);
597          }
598          isolateRegionInfoList.add(RegionInfoBuilder.FIRST_META_REGIONINFO);
599        }
600
601        if (!hRegionLocationRegionIsolation.isEmpty()) {
602          for (HRegionLocation hRegionLocation : hRegionLocationRegionIsolation) {
603            isolateRegionInfo = hRegionLocation.getRegion();
604            isolateRegionInfoList.add(isolateRegionInfo);
605            if (hRegionLocation.getServerName() == server) {
606              LOG.info("Region " + hRegionLocation.getRegion().getEncodedName() + " already exists"
607                + " on server : " + server.getHostname());
608            } else {
609              Future<Boolean> isolateRegionTask =
610                isolateRegionPool.submit(new MoveWithAck(conn, isolateRegionInfo,
611                  hRegionLocation.getServerName(), server, recentlyIsolatedRegion));
612              isolateRegionTaskList.add(isolateRegionTask);
613            }
614          }
615        }
616
617        if (!isolateRegionTaskList.isEmpty()) {
618          isolateRegionPool.shutdown();
619          // Now that we have fetched all the region's regionInfo, we can move them.
620          waitMoveTasksToFinish(isolateRegionPool, isolateRegionTaskList,
621            admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX));
622
623          Set<RegionInfo> currentRegionsOnTheServer = new HashSet<>(admin.getRegions(server));
624          if (!currentRegionsOnTheServer.containsAll(isolateRegionInfoList)) {
625            // If all the regions are not online on the target server,
626            // we don't put RS in decommission mode and exit from here.
627            LOG.error("One of the Region move failed OR stuck in transition...Quitting now");
628            break;
629          }
630        } else {
631          LOG.info("All regions already exists on server : " + server.getHostname());
632        }
633        // Once region has been moved to target RS, put the target RS into decommission mode,
634        // so master doesn't assign new region to the target RS while we unload the target RS.
635        // Also pass 'offload' flag as false since we don't want master to offload the target RS.
636        List<ServerName> listOfServer = new ArrayList<>();
637        listOfServer.add(server);
638        LOG.info("Putting server : " + server.getHostname() + " in decommission/draining mode");
639        admin.decommissionRegionServers(listOfServer, false);
640      }
641      List<RegionInfo> regionsToMove = admin.getRegions(server);
642      // Remove all the regions from the online Region list, that we just isolated.
643      // This will also include hbase:meta if it was isolated.
644      regionsToMove.removeAll(isolateRegionInfoList);
645      regionsToMove.removeAll(movedRegions);
646      if (regionsToMove.isEmpty()) {
647        LOG.info("No Regions to move....Quitting now");
648        break;
649      }
650      LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}",
651        regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack);
652
653      Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
654      if (metaRegion.isPresent()) {
655        RegionInfo meta = metaRegion.get();
656        // hbase:meta should move explicitly in Ack mode.
657        submitRegionMovesWhileUnloading(server, regionServers, movedRegions,
658          Collections.singletonList(meta), true);
659        regionsToMove.remove(meta);
660      }
661      submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove, false);
662    }
663  }
664
665  private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers,
666    List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove, boolean forceMoveRegionByAck)
667    throws Exception {
668    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
669    List<Future<Boolean>> taskList = new ArrayList<>();
670    int serverIndex = 0;
671    for (RegionInfo regionToMove : regionsToMove) {
672      // To move/isolate hbase:meta on a server, it should happen explicitly by Ack mode, hence the
673      // forceMoveRegionByAck = true.
674      if (ack || forceMoveRegionByAck) {
675        Future<Boolean> task = moveRegionsPool.submit(new MoveWithAck(conn, regionToMove, server,
676          regionServers.get(serverIndex), movedRegions));
677        taskList.add(task);
678      } else {
679        Future<Boolean> task = moveRegionsPool.submit(new MoveWithoutAck(admin, regionToMove,
680          server, regionServers.get(serverIndex), movedRegions));
681        taskList.add(task);
682      }
683      serverIndex = (serverIndex + 1) % regionServers.size();
684    }
685    moveRegionsPool.shutdown();
686    long timeoutInSeconds = regionsToMove.size()
687      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
688    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
689  }
690
691  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
692    throws TimeoutException, InterruptedException, ExecutionException {
693    pool.shutdown();
694    try {
695      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
696        LOG.warn("Timed out before finishing the " + operation + " operation. Timeout: "
697          + this.timeout + "sec");
698        pool.shutdownNow();
699      }
700    } catch (InterruptedException e) {
701      pool.shutdownNow();
702      Thread.currentThread().interrupt();
703    }
704    try {
705      return task.get(5, TimeUnit.SECONDS);
706    } catch (InterruptedException e) {
707      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
708      throw e;
709    } catch (ExecutionException e) {
710      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
711      throw e;
712    }
713  }
714
715  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
716    List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
717    try {
718      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
719        moveRegionsPool.shutdownNow();
720      }
721    } catch (InterruptedException e) {
722      moveRegionsPool.shutdownNow();
723      Thread.currentThread().interrupt();
724    }
725    for (Future<Boolean> future : taskList) {
726      try {
727        // if even after shutdownNow threads are stuck we wait for 5 secs max
728        if (!future.get(5, TimeUnit.SECONDS)) {
729          LOG.error("Was Not able to move region....Exiting Now");
730          throw new Exception("Could not move region Exception");
731        }
732      } catch (InterruptedException e) {
733        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
734        throw e;
735      } catch (ExecutionException e) {
736        boolean ignoreFailure = ignoreRegionMoveFailure(e);
737        if (ignoreFailure) {
738          LOG.debug("Ignore region move failure, it might have been split/merged.", e);
739        } else {
740          LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e);
741          throw e;
742        }
743      } catch (CancellationException e) {
744        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
745          + "secs", e);
746        throw e;
747      }
748    }
749  }
750
751  private boolean ignoreRegionMoveFailure(ExecutionException e) {
752    boolean ignoreFailure = false;
753    if (e.getCause() instanceof UnknownRegionException) {
754      // region does not exist anymore
755      ignoreFailure = true;
756    } else if (
757      e.getCause() instanceof DoNotRetryRegionException && e.getCause().getMessage() != null
758        && e.getCause().getMessage()
759          .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,")
760    ) {
761      // region is recently split
762      ignoreFailure = true;
763    }
764    return ignoreFailure;
765  }
766
767  private ServerName getTargetServer() throws Exception {
768    ServerName server = null;
769    int maxWaitInSeconds =
770      admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
771    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
772    while (EnvironmentEdgeManager.currentTime() < maxWait) {
773      try {
774        List<ServerName> regionServers = new ArrayList<>();
775        regionServers.addAll(admin.getRegionServers());
776        // Remove the host Region server from target Region Servers list
777        server = stripServer(regionServers, hostname, port);
778        if (server != null) {
779          break;
780        } else {
781          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
782        }
783      } catch (IOException e) {
784        LOG.warn("Could not get list of region servers", e);
785      }
786      Thread.sleep(500);
787    }
788    if (server == null) {
789      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
790      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
791    }
792    return server;
793  }
794
795  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
796    List<RegionInfo> regions = new ArrayList<>();
797    File f = new File(filename);
798    if (!f.exists()) {
799      return regions;
800    }
801    try (
802      DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(f)))) {
803      int numRegions = dis.readInt();
804      int index = 0;
805      while (index < numRegions) {
806        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
807        index++;
808      }
809    } catch (IOException e) {
810      LOG.error("Error while reading regions from file:" + filename, e);
811      throw e;
812    }
813    return regions;
814  }
815
816  /**
817   * Write the number of regions moved in the first line followed by regions moved in subsequent
818   * lines
819   */
820  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
821    try (DataOutputStream dos =
822      new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filename)))) {
823      dos.writeInt(movedRegions.size());
824      for (RegionInfo region : movedRegions) {
825        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
826      }
827    } catch (IOException e) {
828      LOG.error("ERROR: Was Not able to write regions moved to output file but moved "
829        + movedRegions.size() + " regions", e);
830      throw e;
831    }
832  }
833
834  private void deleteFile(String filename) {
835    File f = new File(filename);
836    if (f.exists()) {
837      f.delete();
838    }
839  }
840
841  /**
842   * @param filename The file should have 'host:port' per line
843   * @return List of servers from the file in format 'hostname:port'.
844   */
845  private List<String> readServersFromFile(String filename) throws IOException {
846    List<String> servers = new ArrayList<>();
847    if (filename != null) {
848      try {
849        Files.readAllLines(Paths.get(filename)).stream().map(String::trim)
850          .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
851          .forEach(servers::add);
852      } catch (IOException e) {
853        LOG.error("Exception while reading servers from file,", e);
854        throw e;
855      }
856    }
857    return servers;
858  }
859
860  /**
861   * Designates or excludes the servername whose hostname and port portion matches the list given in
862   * the file. Example:<br>
863   * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and
864   * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and RS3
865   * are removed from regionServers list so that regions can move to only RS1. If you want to
866   * exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. When we call
867   * includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from regionServers
868   * list so that regions can move to only RS2 and RS3.
869   */
870  private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers,
871    boolean isInclude) throws IOException {
872    if (fileName != null) {
873      List<String> servers = readServersFromFile(fileName);
874      if (servers.isEmpty()) {
875        LOG.warn("No servers provided in the file: {}." + fileName);
876        return;
877      }
878      Iterator<ServerName> i = regionServers.iterator();
879      while (i.hasNext()) {
880        String rs = i.next().getServerName();
881        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":"
882          + rs.split(ServerName.SERVERNAME_SEPARATOR)[1];
883        if (isInclude != servers.contains(rsPort)) {
884          i.remove();
885        }
886      }
887    }
888  }
889
890  /**
891   * Exclude master from list of RSs to move regions to
892   */
893  private void stripMaster(List<ServerName> regionServers) throws IOException {
894    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
895    stripServer(regionServers, master.getHostname(), master.getPort());
896  }
897
898  /**
899   * Remove the servername whose hostname and port portion matches from the passed array of servers.
900   * Returns as side-effect the servername removed.
901   * @return server removed from list of Region Servers
902   */
903  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
904    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
905      ServerName server = iter.next();
906      if (
907        server.getAddress().getHostName().equalsIgnoreCase(hostname)
908          && server.getAddress().getPort() == port
909      ) {
910        iter.remove();
911        return server;
912      }
913    }
914    return null;
915  }
916
917  @Override
918  protected void addOptions() {
919    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
920    this.addRequiredOptWithArg("o", "operation",
921      "Expected: load/unload/unload_from_rack/isolate_regions");
922    this.addOptWithArg("m", "maxthreads",
923      "Define the maximum number of threads to use to unload and reload the regions");
924    this.addOptWithArg("i", "isolateRegionIds",
925      "Comma separated list of Region IDs hash to isolate on a RegionServer and put region server"
926        + " in draining mode. This option should only be used with '-o isolate_regions'."
927        + " By putting region server in decommission/draining mode, master can't assign any"
928        + " new region on this server. If one or more regions are not found OR failed to isolate"
929        + " successfully, utility will exist without putting RS in draining/decommission mode."
930        + " Ex. --isolateRegionIds id1,id2,id3 OR -i id1,id2,id3");
931    this.addOptWithArg("x", "excludefile",
932      "File with <hostname:port> per line to exclude as unload targets; default excludes only "
933        + "target host; useful for rack decommisioning.");
934    this.addOptWithArg("d", "designatedfile",
935      "File with <hostname:port> per line as unload targets;" + "default is all online hosts");
936    this.addOptWithArg("f", "filename",
937      "File to save regions list into unloading, or read from loading; "
938        + "default /tmp/<usernamehostname:port>");
939    this.addOptNoArg("n", "noack",
940      "Turn on No-Ack mode(default: false) which won't check if region is online on target "
941        + "RegionServer, hence best effort. This is more performant in unloading and loading "
942        + "but might lead to region being unavailable for some time till master reassigns it "
943        + "in case the move failed");
944    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
945      + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
946  }
947
948  @Override
949  protected void processOptions(CommandLine cmd) {
950    String hostname = cmd.getOptionValue("r");
951    rmbuilder = new RegionMoverBuilder(hostname);
952    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
953    if (cmd.hasOption('m')) {
954      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
955    }
956    if (this.loadUnload.equals("isolate_regions") && cmd.hasOption("isolateRegionIds")) {
957      rmbuilder
958        .isolateRegionIdArray(Arrays.asList(cmd.getOptionValue("isolateRegionIds").split(",")));
959    }
960    if (cmd.hasOption('n')) {
961      rmbuilder.ack(false);
962    }
963    if (cmd.hasOption('f')) {
964      rmbuilder.filename(cmd.getOptionValue('f'));
965    }
966    if (cmd.hasOption('x')) {
967      rmbuilder.excludeFile(cmd.getOptionValue('x'));
968    }
969    if (cmd.hasOption('d')) {
970      rmbuilder.designatedFile(cmd.getOptionValue('d'));
971    }
972    if (cmd.hasOption('t')) {
973      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
974    }
975  }
976
977  @Override
978  protected int doWork() throws Exception {
979    boolean success;
980    try (RegionMover rm = rmbuilder.build()) {
981      if (loadUnload.equalsIgnoreCase("load")) {
982        success = rm.load();
983      } else if (loadUnload.equalsIgnoreCase("unload")) {
984        success = rm.unload();
985      } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) {
986        success = rm.unloadFromRack();
987      } else if (loadUnload.equalsIgnoreCase("isolate_regions")) {
988        if (rm.isolateRegionIdArray != null && !rm.isolateRegionIdArray.isEmpty()) {
989          success = rm.isolateRegions();
990        } else {
991          LOG.error("Missing -i/--isolate_regions option with '-o isolate_regions' option");
992          LOG.error("Use -h or --help for usage instructions");
993          printUsage();
994          success = false;
995        }
996      } else {
997        printUsage();
998        success = false;
999      }
1000    }
1001    return (success ? 0 : 1);
1002  }
1003
1004  public static void main(String[] args) {
1005    try (RegionMover mover = new RegionMover()) {
1006      mover.doStaticMain(args);
1007    }
1008  }
1009}