001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.io.BufferedInputStream;
023import java.io.BufferedOutputStream;
024import java.io.Closeable;
025import java.io.DataInputStream;
026import java.io.DataOutputStream;
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.FileOutputStream;
030import java.io.IOException;
031import java.nio.file.Files;
032import java.nio.file.Paths;
033import java.util.ArrayList;
034import java.util.Collection;
035import java.util.Collections;
036import java.util.EnumSet;
037import java.util.Iterator;
038import java.util.List;
039import java.util.Locale;
040import java.util.concurrent.Callable;
041import java.util.concurrent.CancellationException;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Executors;
045import java.util.concurrent.Future;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.TimeoutException;
048import java.util.function.Predicate;
049import org.apache.commons.io.IOUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.ClusterMetrics.Option;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.HRegionLocation;
055import org.apache.hadoop.hbase.ServerName;
056import org.apache.hadoop.hbase.client.Admin;
057import org.apache.hadoop.hbase.client.Connection;
058import org.apache.hadoop.hbase.client.ConnectionFactory;
059import org.apache.hadoop.hbase.client.RegionInfo;
060import org.apache.hadoop.hbase.client.ResultScanner;
061import org.apache.hadoop.hbase.client.Scan;
062import org.apache.hadoop.hbase.client.Table;
063import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
064import org.apache.hadoop.hbase.net.Address;
065import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
071import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
072
073/**
074 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
075 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
076 * acknowledges if regions are online after movement while noAck mode is best effort mode that
077 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
078 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
079 * anyways. This can also be used by constructiong an Object using the builder and then calling
080 * {@link #load()} or {@link #unload()} methods for the desired operations.
081 */
082@InterfaceAudience.Public
083public class RegionMover extends AbstractHBaseTool implements Closeable {
084  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
085  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
086  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
087  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
088  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
089  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
090  static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
091  private RegionMoverBuilder rmbuilder;
092  private boolean ack = true;
093  private int maxthreads = 1;
094  private int timeout;
095  private String loadUnload;
096  private String hostname;
097  private String filename;
098  private String excludeFile;
099  private int port;
100  private Connection conn;
101  private Admin admin;
102
103  private RegionMover(RegionMoverBuilder builder) throws IOException {
104    this.hostname = builder.hostname;
105    this.filename = builder.filename;
106    this.excludeFile = builder.excludeFile;
107    this.maxthreads = builder.maxthreads;
108    this.ack = builder.ack;
109    this.port = builder.port;
110    this.timeout = builder.timeout;
111    setConf(builder.conf);
112    this.conn = ConnectionFactory.createConnection(conf);
113    this.admin = conn.getAdmin();
114  }
115
116  private RegionMover() {
117  }
118
119  @Override
120  public void close() {
121    IOUtils.closeQuietly(this.admin);
122    IOUtils.closeQuietly(this.conn);
123  }
124
125  /**
126   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
127   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
128   * {@link #ack(boolean)}, {@link #timeout(int)} methods to set the corresponding options
129   */
130  public static class RegionMoverBuilder {
131    private boolean ack = true;
132    private int maxthreads = 1;
133    private int timeout = Integer.MAX_VALUE;
134    private String hostname;
135    private String filename;
136    private String excludeFile = null;
137    private String defaultDir = System.getProperty("java.io.tmpdir");
138    @VisibleForTesting
139    final int port;
140    private final Configuration conf;
141
142    public RegionMoverBuilder(String hostname) {
143      this(hostname, createConf());
144    }
145
146    /**
147     * Creates a new configuration and sets region mover specific overrides
148     */
149    private static Configuration createConf() {
150      Configuration conf = HBaseConfiguration.create();
151      conf.setInt("hbase.client.prefetch.limit", 1);
152      conf.setInt("hbase.client.pause", 500);
153      conf.setInt("hbase.client.retries.number", 100);
154      return conf;
155    }
156
157    /**
158     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname
159     *     or hostname:port.
160     * @param conf Configuration object
161     */
162    public RegionMoverBuilder(String hostname, Configuration conf) {
163      String[] splitHostname = hostname.toLowerCase().split(":");
164      this.hostname = splitHostname[0];
165      if (splitHostname.length == 2) {
166        this.port = Integer.parseInt(splitHostname[1]);
167      } else {
168        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
169      }
170      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
171        + ":" + Integer.toString(this.port);
172      this.conf = conf;
173    }
174
175    /**
176     * Path of file where regions will be written to during unloading/read from during loading
177     * @param filename
178     * @return RegionMoverBuilder object
179     */
180    public RegionMoverBuilder filename(String filename) {
181      this.filename = filename;
182      return this;
183    }
184
185    /**
186     * Set the max number of threads that will be used to move regions
187     */
188    public RegionMoverBuilder maxthreads(int threads) {
189      this.maxthreads = threads;
190      return this;
191    }
192
193    /**
194     * Path of file containing hostnames to be excluded during region movement. Exclude file should
195     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
196     * host.
197     */
198    public RegionMoverBuilder excludeFile(String excludefile) {
199      this.excludeFile = excludefile;
200      return this;
201    }
202
203    /**
204     * Set ack/noAck mode.
205     * <p>
206     * In ack mode regions are acknowledged before and after moving and the move is retried
207     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
208     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
209     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
210     * <p>
211     * @param ack
212     * @return RegionMoverBuilder object
213     */
214    public RegionMoverBuilder ack(boolean ack) {
215      this.ack = ack;
216      return this;
217    }
218
219    /**
220     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
221     * movers also have a separate time which is hbase.move.wait.max * number of regions to
222     * load/unload
223     * @param timeout in seconds
224     * @return RegionMoverBuilder object
225     */
226    public RegionMoverBuilder timeout(int timeout) {
227      this.timeout = timeout;
228      return this;
229    }
230
231    /**
232     * This method builds the appropriate RegionMover object which can then be used to load/unload
233     * using load and unload methods
234     * @return RegionMover object
235     */
236    public RegionMover build() throws IOException {
237      return new RegionMover(this);
238    }
239  }
240
241  /**
242   * Move Regions and make sure that they are up on the target server.If a region movement fails we
243   * exit as failure
244   */
245  private class MoveWithAck implements Callable<Boolean> {
246    private RegionInfo region;
247    private ServerName targetServer;
248    private List<RegionInfo> movedRegions;
249    private ServerName sourceServer;
250
251    public MoveWithAck(RegionInfo regionInfo, ServerName sourceServer,
252        ServerName targetServer, List<RegionInfo> movedRegions) {
253      this.region = regionInfo;
254      this.targetServer = targetServer;
255      this.movedRegions = movedRegions;
256      this.sourceServer = sourceServer;
257    }
258
259    @Override
260    public Boolean call() throws IOException, InterruptedException {
261      boolean moved = false;
262      int count = 0;
263      int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX);
264      int maxWaitInSeconds =
265          admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
266      long startTime = EnvironmentEdgeManager.currentTime();
267      boolean sameServer = true;
268      // Assert we can scan the region in its current location
269      isSuccessfulScan(region);
270      LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
271          + targetServer);
272      while (count < retries && sameServer) {
273        if (count > 0) {
274          LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries));
275        }
276        count = count + 1;
277        admin.move(region.getEncodedNameAsBytes(), targetServer);
278        long maxWait = startTime + (maxWaitInSeconds * 1000);
279        while (EnvironmentEdgeManager.currentTime() < maxWait) {
280          sameServer = isSameServer(region, sourceServer);
281          if (!sameServer) {
282            break;
283          }
284          Thread.sleep(100);
285        }
286      }
287      if (sameServer) {
288        LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer
289            + ",newServer=" + this.targetServer);
290      } else {
291        isSuccessfulScan(region);
292        LOG.info("Moved Region "
293            + region.getRegionNameAsString()
294            + " cost:"
295            + String.format("%.3f",
296            (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000));
297        moved = true;
298        movedRegions.add(region);
299      }
300      return moved;
301    }
302  }
303
304  /**
305   * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the
306   * RS down anyways and not abort on a stuck region. Improves movement performance
307   */
308  private class MoveWithoutAck implements Callable<Boolean> {
309    private RegionInfo region;
310    private ServerName targetServer;
311    private List<RegionInfo> movedRegions;
312    private ServerName sourceServer;
313
314    public MoveWithoutAck(RegionInfo regionInfo, ServerName sourceServer,
315        ServerName targetServer, List<RegionInfo> movedRegions) {
316      this.region = regionInfo;
317      this.targetServer = targetServer;
318      this.movedRegions = movedRegions;
319      this.sourceServer = sourceServer;
320    }
321
322    @Override
323    public Boolean call() {
324      try {
325        LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
326            + targetServer);
327        admin.move(region.getEncodedNameAsBytes(), targetServer);
328        LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to "
329            + targetServer);
330      } catch (Exception e) {
331        LOG.error("Error Moving Region:" + region.getEncodedName(), e);
332      } finally {
333        // we add region to the moved regions list in No Ack Mode since this is best effort
334        movedRegions.add(region);
335      }
336      return true;
337    }
338  }
339
340  /**
341   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
342   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
343   * @return true if loading succeeded, false otherwise
344   */
345  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
346    ExecutorService loadPool = Executors.newFixedThreadPool(1);
347    Future<Boolean> loadTask = loadPool.submit(() -> {
348      try {
349        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
350        if (regionsToMove.isEmpty()) {
351          LOG.info("No regions to load.Exiting");
352          return true;
353        }
354        loadRegions(regionsToMove);
355      } catch (Exception e) {
356        LOG.error("Error while loading regions to " + hostname, e);
357        return false;
358      }
359      return true;
360    });
361    return waitTaskToFinish(loadPool, loadTask, "loading");
362  }
363
364  private void loadRegions(List<RegionInfo> regionsToMove)
365      throws Exception {
366    ServerName server = getTargetServer();
367    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
368    LOG.info(
369        "Moving " + regionsToMove.size() + " regions to " + server + " using " + this.maxthreads
370            + " threads.Ack mode:" + this.ack);
371
372    ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
373    List<Future<Boolean>> taskList = new ArrayList<>();
374    int counter = 0;
375    while (counter < regionsToMove.size()) {
376      RegionInfo region = regionsToMove.get(counter);
377      ServerName currentServer = getServerNameForRegion(region);
378      if (currentServer == null) {
379        LOG.warn(
380            "Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
381        counter++;
382        continue;
383      } else if (server.equals(currentServer)) {
384        LOG.info(
385            "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
386        counter++;
387        continue;
388      }
389      if (ack) {
390        Future<Boolean> task =
391            moveRegionsPool.submit(new MoveWithAck(region, currentServer, server, movedRegions));
392        taskList.add(task);
393      } else {
394        Future<Boolean> task =
395            moveRegionsPool.submit(new MoveWithoutAck(region, currentServer, server, movedRegions));
396        taskList.add(task);
397      }
398      counter++;
399    }
400
401    moveRegionsPool.shutdown();
402    long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
403        .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
404    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
405  }
406
407  /**
408   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
409   * noAck mode we do not make sure that region is successfully online on the target region
410   * server,hence it is best effort.We do not unload regions to hostnames given in
411   * {@link #excludeFile}.
412   * @return true if unloading succeeded, false otherwise
413   */
414  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
415    deleteFile(this.filename);
416    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
417    Future<Boolean> unloadTask = unloadPool.submit(() -> {
418      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
419      try {
420        // Get Online RegionServers
421        List<ServerName> regionServers = new ArrayList<>();
422        RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port));
423        LOG.info("{} belongs to {}", hostname, rsgroup.getName());
424        regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers()));
425        // Remove the host Region server from target Region Servers list
426        ServerName server = stripServer(regionServers, hostname, port);
427        if (server == null) {
428          LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
429              hostname, port);
430          LOG.debug("List of region servers: {}", regionServers);
431          return false;
432        }
433        // Remove RS present in the exclude file
434        stripExcludes(regionServers);
435        stripMaster(regionServers);
436        if (regionServers.isEmpty()) {
437          LOG.warn("No Regions were moved - no servers available");
438          return false;
439        } else {
440          LOG.info("Available servers {}", regionServers);
441        }
442        unloadRegions(server, regionServers, movedRegions);
443      } catch (Exception e) {
444        LOG.error("Error while unloading regions ", e);
445        return false;
446      } finally {
447        if (movedRegions != null) {
448          writeFile(filename, movedRegions);
449        }
450      }
451      return true;
452    });
453    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
454  }
455
456  @VisibleForTesting
457   Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup,
458      Collection<ServerName> onlineServers) {
459    if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
460      return onlineServers;
461    }
462    List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size());
463    for (ServerName server : onlineServers) {
464      Address address = Address.fromParts(server.getHostname(), server.getPort());
465      if (rsgroup.containsServer(address)) {
466        serverLists.add(server);
467      }
468    }
469    return serverLists;
470  }
471
472  private void unloadRegions(ServerName server, List<ServerName> regionServers,
473      List<RegionInfo> movedRegions) throws Exception {
474    while (true) {
475      List<RegionInfo> regionsToMove = admin.getRegions(server);
476      regionsToMove.removeAll(movedRegions);
477      if (regionsToMove.isEmpty()) {
478        LOG.info("No Regions to move....Quitting now");
479        break;
480      }
481      int counter = 0;
482      LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to "
483          + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:"
484          + ack);
485      ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
486      List<Future<Boolean>> taskList = new ArrayList<>();
487      int serverIndex = 0;
488      while (counter < regionsToMove.size()) {
489        if (ack) {
490          Future<Boolean> task = moveRegionsPool.submit(
491              new MoveWithAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
492                  movedRegions));
493          taskList.add(task);
494        } else {
495          Future<Boolean> task = moveRegionsPool.submit(
496              new MoveWithoutAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
497                  movedRegions));
498          taskList.add(task);
499        }
500        counter++;
501        serverIndex = (serverIndex + 1) % regionServers.size();
502      }
503      moveRegionsPool.shutdown();
504      long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
505          .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
506      waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
507    }
508  }
509
510  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
511      throws TimeoutException, InterruptedException, ExecutionException {
512    pool.shutdown();
513    try {
514      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
515        LOG.warn(
516            "Timed out before finishing the " + operation + " operation. Timeout: " + this.timeout
517                + "sec");
518        pool.shutdownNow();
519      }
520    } catch (InterruptedException e) {
521      pool.shutdownNow();
522      Thread.currentThread().interrupt();
523    }
524    try {
525      return task.get(5, TimeUnit.SECONDS);
526    } catch (InterruptedException e) {
527      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
528      throw e;
529    } catch (ExecutionException e) {
530      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
531      throw e;
532    }
533  }
534
535  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
536      List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
537    try {
538      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
539        moveRegionsPool.shutdownNow();
540      }
541    } catch (InterruptedException e) {
542      moveRegionsPool.shutdownNow();
543      Thread.currentThread().interrupt();
544    }
545    for (Future<Boolean> future : taskList) {
546      try {
547        // if even after shutdownNow threads are stuck we wait for 5 secs max
548        if (!future.get(5, TimeUnit.SECONDS)) {
549          LOG.error("Was Not able to move region....Exiting Now");
550          throw new Exception("Could not move region Exception");
551        }
552      } catch (InterruptedException e) {
553        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
554        throw e;
555      } catch (ExecutionException e) {
556        LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e);
557        throw e;
558      } catch (CancellationException e) {
559        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
560            + "secs", e);
561        throw e;
562      }
563    }
564  }
565
566  private ServerName getTargetServer() throws Exception {
567    ServerName server = null;
568    int maxWaitInSeconds =
569        admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
570    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
571    while (EnvironmentEdgeManager.currentTime() < maxWait) {
572      try {
573        List<ServerName> regionServers = new ArrayList<>();
574        regionServers.addAll(admin.getRegionServers());
575        // Remove the host Region server from target Region Servers list
576        server = stripServer(regionServers, hostname, port);
577        if (server != null) {
578          break;
579        } else {
580          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
581        }
582      } catch (IOException e) {
583        LOG.warn("Could not get list of region servers", e);
584      }
585      Thread.sleep(500);
586    }
587    if (server == null) {
588      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
589      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
590    }
591    return server;
592  }
593
594  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
595    List<RegionInfo> regions = new ArrayList<>();
596    File f = new File(filename);
597    if (!f.exists()) {
598      return regions;
599    }
600    try (DataInputStream dis = new DataInputStream(
601        new BufferedInputStream(new FileInputStream(f)))) {
602      int numRegions = dis.readInt();
603      int index = 0;
604      while (index < numRegions) {
605        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
606        index++;
607      }
608    } catch (IOException e) {
609      LOG.error("Error while reading regions from file:" + filename, e);
610      throw e;
611    }
612    return regions;
613  }
614
615  /**
616   * Write the number of regions moved in the first line followed by regions moved in subsequent
617   * lines
618   */
619  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
620    try (DataOutputStream dos = new DataOutputStream(
621        new BufferedOutputStream(new FileOutputStream(filename)))) {
622      dos.writeInt(movedRegions.size());
623      for (RegionInfo region : movedRegions) {
624        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
625      }
626    } catch (IOException e) {
627      LOG.error(
628          "ERROR: Was Not able to write regions moved to output file but moved " + movedRegions
629              .size() + " regions", e);
630      throw e;
631    }
632  }
633
634  private void deleteFile(String filename) {
635    File f = new File(filename);
636    if (f.exists()) {
637      f.delete();
638    }
639  }
640
641  /**
642   * @return List of servers from the exclude file in format 'hostname:port'.
643   */
644  private List<String> readExcludes(String excludeFile) throws IOException {
645    List<String> excludeServers = new ArrayList<>();
646    if (excludeFile == null) {
647      return excludeServers;
648    } else {
649      try {
650        Files.readAllLines(Paths.get(excludeFile)).stream().map(String::trim)
651            .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
652            .forEach(excludeServers::add);
653      } catch (IOException e) {
654        LOG.warn("Exception while reading excludes file, continuing anyways", e);
655      }
656      return excludeServers;
657    }
658  }
659
660  /**
661   * Excludes the servername whose hostname and port portion matches the list given in exclude file
662   */
663  private void stripExcludes(List<ServerName> regionServers) throws IOException {
664    if (excludeFile != null) {
665      List<String> excludes = readExcludes(excludeFile);
666      Iterator<ServerName> i = regionServers.iterator();
667      while (i.hasNext()) {
668        String rs = i.next().getServerName();
669        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" + rs
670            .split(ServerName.SERVERNAME_SEPARATOR)[1];
671        if (excludes.contains(rsPort)) {
672          i.remove();
673        }
674      }
675      LOG.info("Valid Region server targets are:" + regionServers.toString());
676      LOG.info("Excluded Servers are" + excludes.toString());
677    }
678  }
679
680  /**
681   * Exclude master from list of RSs to move regions to
682   */
683  private void stripMaster(List<ServerName> regionServers) throws IOException {
684    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
685    stripServer(regionServers, master.getHostname(), master.getPort());
686  }
687
688  /**
689   * Remove the servername whose hostname and port portion matches from the passed array of servers.
690   * Returns as side-effect the servername removed.
691   * @return server removed from list of Region Servers
692   */
693  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
694    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
695      ServerName server = iter.next();
696      if (server.getAddress().getHostname().equalsIgnoreCase(hostname) &&
697        server.getAddress().getPort() == port) {
698        iter.remove();
699        return server;
700      }
701    }
702    return null;
703  }
704
705  /**
706   * Tries to scan a row from passed region
707   */
708  private void isSuccessfulScan(RegionInfo region) throws IOException {
709    Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit()
710        .setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter())
711        .setCacheBlocks(false);
712    try (Table table = conn.getTable(region.getTable());
713        ResultScanner scanner = table.getScanner(scan)) {
714      scanner.next();
715    } catch (IOException e) {
716      LOG.error("Could not scan region:" + region.getEncodedName(), e);
717      throw e;
718    }
719  }
720
721  /**
722   * Returns true if passed region is still on serverName when we look at hbase:meta.
723   * @return true if region is hosted on serverName otherwise false
724   */
725  private boolean isSameServer(RegionInfo region, ServerName serverName)
726      throws IOException {
727    ServerName serverForRegion = getServerNameForRegion(region);
728    if (serverForRegion != null && serverForRegion.equals(serverName)) {
729      return true;
730    }
731    return false;
732  }
733
734  /**
735   * Get servername that is up in hbase:meta hosting the given region. this is hostname + port +
736   * startcode comma-delimited. Can return null
737   * @return regionServer hosting the given region
738   */
739  private ServerName getServerNameForRegion(RegionInfo region) throws IOException {
740    if (!admin.isTableEnabled(region.getTable())) {
741      return null;
742    }
743    HRegionLocation loc =
744      conn.getRegionLocator(region.getTable()).getRegionLocation(region.getStartKey(), true);
745    if (loc != null) {
746      return loc.getServerName();
747    } else {
748      return null;
749    }
750  }
751
752  @Override
753  protected void addOptions() {
754    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
755    this.addRequiredOptWithArg("o", "operation", "Expected: load/unload");
756    this.addOptWithArg("m", "maxthreads",
757        "Define the maximum number of threads to use to unload and reload the regions");
758    this.addOptWithArg("x", "excludefile",
759        "File with <hostname:port> per line to exclude as unload targets; default excludes only "
760            + "target host; useful for rack decommisioning.");
761    this.addOptWithArg("f", "filename",
762        "File to save regions list into unloading, or read from loading; "
763            + "default /tmp/<usernamehostname:port>");
764    this.addOptNoArg("n", "noack",
765        "Turn on No-Ack mode(default: false) which won't check if region is online on target "
766            + "RegionServer, hence best effort. This is more performant in unloading and loading "
767            + "but might lead to region being unavailable for some time till master reassigns it "
768            + "in case the move failed");
769    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
770        + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
771  }
772
773  @Override
774  protected void processOptions(CommandLine cmd) {
775    String hostname = cmd.getOptionValue("r");
776    rmbuilder = new RegionMoverBuilder(hostname);
777    if (cmd.hasOption('m')) {
778      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
779    }
780    if (cmd.hasOption('n')) {
781      rmbuilder.ack(false);
782    }
783    if (cmd.hasOption('f')) {
784      rmbuilder.filename(cmd.getOptionValue('f'));
785    }
786    if (cmd.hasOption('x')) {
787      rmbuilder.excludeFile(cmd.getOptionValue('x'));
788    }
789    if (cmd.hasOption('t')) {
790      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
791    }
792    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
793  }
794
795  @Override
796  protected int doWork() throws Exception {
797    boolean success;
798    try (RegionMover rm = rmbuilder.build()) {
799      if (loadUnload.equalsIgnoreCase("load")) {
800        success = rm.load();
801      } else if (loadUnload.equalsIgnoreCase("unload")) {
802        success = rm.unload();
803      } else {
804        printUsage();
805        success = false;
806      }
807    }
808    return (success ? 0 : 1);
809  }
810
811  public static void main(String[] args) {
812    try (RegionMover mover = new RegionMover()) {
813      mover.doStaticMain(args);
814    }
815  }
816}