001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.io.BufferedInputStream;
023import java.io.BufferedOutputStream;
024import java.io.Closeable;
025import java.io.DataInputStream;
026import java.io.DataOutputStream;
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.FileOutputStream;
030import java.io.IOException;
031import java.nio.file.Files;
032import java.nio.file.Paths;
033import java.util.ArrayList;
034import java.util.Collections;
035import java.util.EnumSet;
036import java.util.Iterator;
037import java.util.List;
038import java.util.Locale;
039import java.util.concurrent.Callable;
040import java.util.concurrent.CancellationException;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.Executors;
044import java.util.concurrent.Future;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.TimeoutException;
047import java.util.function.Predicate;
048import org.apache.commons.io.IOUtils;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.ClusterMetrics.Option;
051import org.apache.hadoop.hbase.HBaseConfiguration;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.HRegionLocation;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.hadoop.hbase.client.Admin;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.RegionInfo;
059import org.apache.hadoop.hbase.client.ResultScanner;
060import org.apache.hadoop.hbase.client.Scan;
061import org.apache.hadoop.hbase.client.Table;
062import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
068import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
069
070/**
071 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
072 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
073 * acknowledges if regions are online after movement while noAck mode is best effort mode that
074 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
075 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
076 * anyways. This can also be used by constructiong an Object using the builder and then calling
077 * {@link #load()} or {@link #unload()} methods for the desired operations.
078 */
079@InterfaceAudience.Public
080public class RegionMover extends AbstractHBaseTool implements Closeable {
081  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
082  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
083  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
084  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
085  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
086  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
087  static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
088  private RegionMoverBuilder rmbuilder;
089  private boolean ack = true;
090  private int maxthreads = 1;
091  private int timeout;
092  private String loadUnload;
093  private String hostname;
094  private String filename;
095  private String excludeFile;
096  private int port;
097  private Connection conn;
098  private Admin admin;
099
100  private RegionMover(RegionMoverBuilder builder) throws IOException {
101    this.hostname = builder.hostname;
102    this.filename = builder.filename;
103    this.excludeFile = builder.excludeFile;
104    this.maxthreads = builder.maxthreads;
105    this.ack = builder.ack;
106    this.port = builder.port;
107    this.timeout = builder.timeout;
108    setConf(builder.conf);
109    this.conn = ConnectionFactory.createConnection(conf);
110    this.admin = conn.getAdmin();
111  }
112
113  private RegionMover() {
114  }
115
116  @Override
117  public void close() {
118    IOUtils.closeQuietly(this.admin);
119    IOUtils.closeQuietly(this.conn);
120  }
121
122  /**
123   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
124   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
125   * {@link #ack(boolean)}, {@link #timeout(int)} methods to set the corresponding options
126   */
127  public static class RegionMoverBuilder {
128    private boolean ack = true;
129    private int maxthreads = 1;
130    private int timeout = Integer.MAX_VALUE;
131    private String hostname;
132    private String filename;
133    private String excludeFile = null;
134    private String defaultDir = System.getProperty("java.io.tmpdir");
135    @VisibleForTesting
136    final int port;
137    private final Configuration conf;
138
139    public RegionMoverBuilder(String hostname) {
140      this(hostname, createConf());
141    }
142
143    /**
144     * Creates a new configuration and sets region mover specific overrides
145     */
146    private static Configuration createConf() {
147      Configuration conf = HBaseConfiguration.create();
148      conf.setInt("hbase.client.prefetch.limit", 1);
149      conf.setInt("hbase.client.pause", 500);
150      conf.setInt("hbase.client.retries.number", 100);
151      return conf;
152    }
153
154    /**
155     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname
156     *     or hostname:port.
157     * @param conf Configuration object
158     */
159    public RegionMoverBuilder(String hostname, Configuration conf) {
160      String[] splitHostname = hostname.toLowerCase().split(":");
161      this.hostname = splitHostname[0];
162      if (splitHostname.length == 2) {
163        this.port = Integer.parseInt(splitHostname[1]);
164      } else {
165        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
166      }
167      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
168        + ":" + Integer.toString(this.port);
169      this.conf = conf;
170    }
171
172    /**
173     * Path of file where regions will be written to during unloading/read from during loading
174     * @param filename
175     * @return RegionMoverBuilder object
176     */
177    public RegionMoverBuilder filename(String filename) {
178      this.filename = filename;
179      return this;
180    }
181
182    /**
183     * Set the max number of threads that will be used to move regions
184     */
185    public RegionMoverBuilder maxthreads(int threads) {
186      this.maxthreads = threads;
187      return this;
188    }
189
190    /**
191     * Path of file containing hostnames to be excluded during region movement. Exclude file should
192     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
193     * host.
194     */
195    public RegionMoverBuilder excludeFile(String excludefile) {
196      this.excludeFile = excludefile;
197      return this;
198    }
199
200    /**
201     * Set ack/noAck mode.
202     * <p>
203     * In ack mode regions are acknowledged before and after moving and the move is retried
204     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
205     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
206     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
207     * <p>
208     * @param ack
209     * @return RegionMoverBuilder object
210     */
211    public RegionMoverBuilder ack(boolean ack) {
212      this.ack = ack;
213      return this;
214    }
215
216    /**
217     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
218     * movers also have a separate time which is hbase.move.wait.max * number of regions to
219     * load/unload
220     * @param timeout in seconds
221     * @return RegionMoverBuilder object
222     */
223    public RegionMoverBuilder timeout(int timeout) {
224      this.timeout = timeout;
225      return this;
226    }
227
228    /**
229     * This method builds the appropriate RegionMover object which can then be used to load/unload
230     * using load and unload methods
231     * @return RegionMover object
232     */
233    public RegionMover build() throws IOException {
234      return new RegionMover(this);
235    }
236  }
237
238  /**
239   * Move Regions and make sure that they are up on the target server.If a region movement fails we
240   * exit as failure
241   */
242  private class MoveWithAck implements Callable<Boolean> {
243    private RegionInfo region;
244    private ServerName targetServer;
245    private List<RegionInfo> movedRegions;
246    private ServerName sourceServer;
247
248    public MoveWithAck(RegionInfo regionInfo, ServerName sourceServer,
249        ServerName targetServer, List<RegionInfo> movedRegions) {
250      this.region = regionInfo;
251      this.targetServer = targetServer;
252      this.movedRegions = movedRegions;
253      this.sourceServer = sourceServer;
254    }
255
256    @Override
257    public Boolean call() throws IOException, InterruptedException {
258      boolean moved = false;
259      int count = 0;
260      int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX);
261      int maxWaitInSeconds =
262          admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
263      long startTime = EnvironmentEdgeManager.currentTime();
264      boolean sameServer = true;
265      // Assert we can scan the region in its current location
266      isSuccessfulScan(region);
267      LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
268          + targetServer);
269      while (count < retries && sameServer) {
270        if (count > 0) {
271          LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries));
272        }
273        count = count + 1;
274        admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer.getServerName()));
275        long maxWait = startTime + (maxWaitInSeconds * 1000);
276        while (EnvironmentEdgeManager.currentTime() < maxWait) {
277          sameServer = isSameServer(region, sourceServer);
278          if (!sameServer) {
279            break;
280          }
281          Thread.sleep(100);
282        }
283      }
284      if (sameServer) {
285        LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer
286            + ",newServer=" + this.targetServer);
287      } else {
288        isSuccessfulScan(region);
289        LOG.info("Moved Region "
290            + region.getRegionNameAsString()
291            + " cost:"
292            + String.format("%.3f",
293            (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000));
294        moved = true;
295        movedRegions.add(region);
296      }
297      return moved;
298    }
299  }
300
301  /**
302   * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the
303   * RS down anyways and not abort on a stuck region. Improves movement performance
304   */
305  private class MoveWithoutAck implements Callable<Boolean> {
306    private RegionInfo region;
307    private ServerName targetServer;
308    private List<RegionInfo> movedRegions;
309    private ServerName sourceServer;
310
311    public MoveWithoutAck(RegionInfo regionInfo, ServerName sourceServer,
312        ServerName targetServer, List<RegionInfo> movedRegions) {
313      this.region = regionInfo;
314      this.targetServer = targetServer;
315      this.movedRegions = movedRegions;
316      this.sourceServer = sourceServer;
317    }
318
319    @Override
320    public Boolean call() {
321      try {
322        LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
323            + targetServer);
324        admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer.getServerName()));
325        LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to "
326            + targetServer);
327      } catch (Exception e) {
328        LOG.error("Error Moving Region:" + region.getEncodedName(), e);
329      } finally {
330        // we add region to the moved regions list in No Ack Mode since this is best effort
331        movedRegions.add(region);
332      }
333      return true;
334    }
335  }
336
337  /**
338   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
339   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
340   * @return true if loading succeeded, false otherwise
341   */
342  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
343    ExecutorService loadPool = Executors.newFixedThreadPool(1);
344    Future<Boolean> loadTask = loadPool.submit(() -> {
345      try {
346        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
347        if (regionsToMove.isEmpty()) {
348          LOG.info("No regions to load.Exiting");
349          return true;
350        }
351        loadRegions(regionsToMove);
352      } catch (Exception e) {
353        LOG.error("Error while loading regions to " + hostname, e);
354        return false;
355      }
356      return true;
357    });
358    return waitTaskToFinish(loadPool, loadTask, "loading");
359  }
360
361  private void loadRegions(List<RegionInfo> regionsToMove)
362      throws Exception {
363    ServerName server = getTargetServer();
364    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
365    LOG.info(
366        "Moving " + regionsToMove.size() + " regions to " + server + " using " + this.maxthreads
367            + " threads.Ack mode:" + this.ack);
368
369    ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
370    List<Future<Boolean>> taskList = new ArrayList<>();
371    int counter = 0;
372    while (counter < regionsToMove.size()) {
373      RegionInfo region = regionsToMove.get(counter);
374      ServerName currentServer = getServerNameForRegion(region);
375      if (currentServer == null) {
376        LOG.warn(
377            "Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
378        counter++;
379        continue;
380      } else if (server.equals(currentServer)) {
381        LOG.info(
382            "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
383        counter++;
384        continue;
385      }
386      if (ack) {
387        Future<Boolean> task =
388            moveRegionsPool.submit(new MoveWithAck(region, currentServer, server, movedRegions));
389        taskList.add(task);
390      } else {
391        Future<Boolean> task =
392            moveRegionsPool.submit(new MoveWithoutAck(region, currentServer, server, movedRegions));
393        taskList.add(task);
394      }
395      counter++;
396    }
397
398    moveRegionsPool.shutdown();
399    long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
400        .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
401    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
402  }
403
404  /**
405   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
406   * noAck mode we do not make sure that region is successfully online on the target region
407   * server,hence it is best effort.We do not unload regions to hostnames given in
408   * {@link #excludeFile}.
409   * @return true if unloading succeeded, false otherwise
410   */
411  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
412    deleteFile(this.filename);
413    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
414    Future<Boolean> unloadTask = unloadPool.submit(() -> {
415      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
416      try {
417        // Get Online RegionServers
418        List<ServerName> regionServers = new ArrayList<>();
419        regionServers.addAll(
420            admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics()
421                .keySet());
422        // Remove the host Region server from target Region Servers list
423        ServerName server = stripServer(regionServers, hostname, port);
424        // Remove RS present in the exclude file
425        stripExcludes(regionServers);
426        stripMaster(regionServers);
427        if (regionServers.isEmpty()) {
428          LOG.warn("No Regions were moved - no servers available");
429          return false;
430        }
431        unloadRegions(server, regionServers, movedRegions);
432      } catch (Exception e) {
433        LOG.error("Error while unloading regions ", e);
434        return false;
435      } finally {
436        if (movedRegions != null) {
437          writeFile(filename, movedRegions);
438        }
439      }
440      return true;
441    });
442    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
443  }
444
445  private void unloadRegions(ServerName server, List<ServerName> regionServers,
446      List<RegionInfo> movedRegions) throws Exception {
447    while (true) {
448      List<RegionInfo> regionsToMove = admin.getRegions(server);
449      regionsToMove.removeAll(movedRegions);
450      if (regionsToMove.isEmpty()) {
451        LOG.info("No Regions to move....Quitting now");
452        break;
453      }
454      int counter = 0;
455      LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to "
456          + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:"
457          + ack);
458      ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
459      List<Future<Boolean>> taskList = new ArrayList<>();
460      int serverIndex = 0;
461      while (counter < regionsToMove.size()) {
462        if (ack) {
463          Future<Boolean> task = moveRegionsPool.submit(
464              new MoveWithAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
465                  movedRegions));
466          taskList.add(task);
467        } else {
468          Future<Boolean> task = moveRegionsPool.submit(
469              new MoveWithoutAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
470                  movedRegions));
471          taskList.add(task);
472        }
473        counter++;
474        serverIndex = (serverIndex + 1) % regionServers.size();
475      }
476      moveRegionsPool.shutdown();
477      long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
478          .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
479      waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
480    }
481  }
482
483  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
484      throws TimeoutException, InterruptedException, ExecutionException {
485    pool.shutdown();
486    try {
487      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
488        LOG.warn(
489            "Timed out before finishing the " + operation + " operation. Timeout: " + this.timeout
490                + "sec");
491        pool.shutdownNow();
492      }
493    } catch (InterruptedException e) {
494      pool.shutdownNow();
495      Thread.currentThread().interrupt();
496    }
497    try {
498      return task.get(5, TimeUnit.SECONDS);
499    } catch (InterruptedException e) {
500      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
501      throw e;
502    } catch (ExecutionException e) {
503      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
504      throw e;
505    }
506  }
507
508  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
509      List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
510    try {
511      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
512        moveRegionsPool.shutdownNow();
513      }
514    } catch (InterruptedException e) {
515      moveRegionsPool.shutdownNow();
516      Thread.currentThread().interrupt();
517    }
518    for (Future<Boolean> future : taskList) {
519      try {
520        // if even after shutdownNow threads are stuck we wait for 5 secs max
521        if (!future.get(5, TimeUnit.SECONDS)) {
522          LOG.error("Was Not able to move region....Exiting Now");
523          throw new Exception("Could not move region Exception");
524        }
525      } catch (InterruptedException e) {
526        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
527        throw e;
528      } catch (ExecutionException e) {
529        LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e);
530        throw e;
531      } catch (CancellationException e) {
532        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
533            + "secs", e);
534        throw e;
535      }
536    }
537  }
538
539  private ServerName getTargetServer() throws Exception {
540    ServerName server = null;
541    int maxWaitInSeconds =
542        admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
543    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
544    while (EnvironmentEdgeManager.currentTime() < maxWait) {
545      try {
546        List<ServerName> regionServers = new ArrayList<>();
547        regionServers.addAll(
548            admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics()
549                .keySet());
550        // Remove the host Region server from target Region Servers list
551        server = stripServer(regionServers, hostname, port);
552        if (server != null) {
553          break;
554        } else {
555          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
556        }
557      } catch (IOException e) {
558        LOG.warn("Could not get list of region servers", e);
559      }
560      Thread.sleep(500);
561    }
562    if (server == null) {
563      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
564      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
565    }
566    return server;
567  }
568
569  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
570    List<RegionInfo> regions = new ArrayList<>();
571    File f = new File(filename);
572    if (!f.exists()) {
573      return regions;
574    }
575    try (DataInputStream dis = new DataInputStream(
576        new BufferedInputStream(new FileInputStream(f)))) {
577      int numRegions = dis.readInt();
578      int index = 0;
579      while (index < numRegions) {
580        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
581        index++;
582      }
583    } catch (IOException e) {
584      LOG.error("Error while reading regions from file:" + filename, e);
585      throw e;
586    }
587    return regions;
588  }
589
590  /**
591   * Write the number of regions moved in the first line followed by regions moved in subsequent
592   * lines
593   */
594  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
595    try (DataOutputStream dos = new DataOutputStream(
596        new BufferedOutputStream(new FileOutputStream(filename)))) {
597      dos.writeInt(movedRegions.size());
598      for (RegionInfo region : movedRegions) {
599        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
600      }
601    } catch (IOException e) {
602      LOG.error(
603          "ERROR: Was Not able to write regions moved to output file but moved " + movedRegions
604              .size() + " regions", e);
605      throw e;
606    }
607  }
608
609  private void deleteFile(String filename) {
610    File f = new File(filename);
611    if (f.exists()) {
612      f.delete();
613    }
614  }
615
616  /**
617   * @return List of servers from the exclude file in format 'hostname:port'.
618   */
619  private List<String> readExcludes(String excludeFile) throws IOException {
620    List<String> excludeServers = new ArrayList<>();
621    if (excludeFile == null) {
622      return excludeServers;
623    } else {
624      try {
625        Files.readAllLines(Paths.get(excludeFile)).stream().map(String::trim)
626            .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
627            .forEach(excludeServers::add);
628      } catch (IOException e) {
629        LOG.warn("Exception while reading excludes file, continuing anyways", e);
630      }
631      return excludeServers;
632    }
633  }
634
635  /**
636   * Excludes the servername whose hostname and port portion matches the list given in exclude file
637   */
638  private void stripExcludes(List<ServerName> regionServers) throws IOException {
639    if (excludeFile != null) {
640      List<String> excludes = readExcludes(excludeFile);
641      Iterator<ServerName> i = regionServers.iterator();
642      while (i.hasNext()) {
643        String rs = i.next().getServerName();
644        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" + rs
645            .split(ServerName.SERVERNAME_SEPARATOR)[1];
646        if (excludes.contains(rsPort)) {
647          i.remove();
648        }
649      }
650      LOG.info("Valid Region server targets are:" + regionServers.toString());
651      LOG.info("Excluded Servers are" + excludes.toString());
652    }
653  }
654
655  /**
656   * Exclude master from list of RSs to move regions to
657   */
658  private void stripMaster(List<ServerName> regionServers) throws IOException {
659    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
660    stripServer(regionServers, master.getHostname(), master.getPort());
661  }
662
663  /**
664   * Remove the servername whose hostname and port portion matches from the passed array of servers.
665   * Returns as side-effect the servername removed.
666   * @return server removed from list of Region Servers
667   */
668  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
669    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
670      ServerName server = iter.next();
671      if (server.getAddress().getHostname().equalsIgnoreCase(hostname) &&
672        server.getAddress().getPort() == port) {
673        iter.remove();
674        return server;
675      }
676    }
677    return null;
678  }
679
680  /**
681   * Tries to scan a row from passed region
682   */
683  private void isSuccessfulScan(RegionInfo region) throws IOException {
684    Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit()
685        .setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter())
686        .setCacheBlocks(false);
687    try (Table table = conn.getTable(region.getTable());
688        ResultScanner scanner = table.getScanner(scan)) {
689      scanner.next();
690    } catch (IOException e) {
691      LOG.error("Could not scan region:" + region.getEncodedName(), e);
692      throw e;
693    }
694  }
695
696  /**
697   * Returns true if passed region is still on serverName when we look at hbase:meta.
698   * @return true if region is hosted on serverName otherwise false
699   */
700  private boolean isSameServer(RegionInfo region, ServerName serverName)
701      throws IOException {
702    ServerName serverForRegion = getServerNameForRegion(region);
703    if (serverForRegion != null && serverForRegion.equals(serverName)) {
704      return true;
705    }
706    return false;
707  }
708
709  /**
710   * Get servername that is up in hbase:meta hosting the given region. this is hostname + port +
711   * startcode comma-delimited. Can return null
712   * @return regionServer hosting the given region
713   */
714  private ServerName getServerNameForRegion(RegionInfo region) throws IOException {
715    if (!admin.isTableEnabled(region.getTable())) {
716      return null;
717    }
718    HRegionLocation loc =
719      conn.getRegionLocator(region.getTable()).getRegionLocation(region.getStartKey(), true);
720    if (loc != null) {
721      return loc.getServerName();
722    } else {
723      return null;
724    }
725  }
726
727  @Override
728  protected void addOptions() {
729    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
730    this.addRequiredOptWithArg("o", "operation", "Expected: load/unload");
731    this.addOptWithArg("m", "maxthreads",
732        "Define the maximum number of threads to use to unload and reload the regions");
733    this.addOptWithArg("x", "excludefile",
734        "File with <hostname:port> per line to exclude as unload targets; default excludes only "
735            + "target host; useful for rack decommisioning.");
736    this.addOptWithArg("f", "filename",
737        "File to save regions list into unloading, or read from loading; "
738            + "default /tmp/<usernamehostname:port>");
739    this.addOptNoArg("n", "noack",
740        "Turn on No-Ack mode(default: false) which won't check if region is online on target "
741            + "RegionServer, hence best effort. This is more performant in unloading and loading "
742            + "but might lead to region being unavailable for some time till master reassigns it "
743            + "in case the move failed");
744    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
745        + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
746  }
747
748  @Override
749  protected void processOptions(CommandLine cmd) {
750    String hostname = cmd.getOptionValue("r");
751    rmbuilder = new RegionMoverBuilder(hostname);
752    if (cmd.hasOption('m')) {
753      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
754    }
755    if (cmd.hasOption('n')) {
756      rmbuilder.ack(false);
757    }
758    if (cmd.hasOption('f')) {
759      rmbuilder.filename(cmd.getOptionValue('f'));
760    }
761    if (cmd.hasOption('x')) {
762      rmbuilder.excludeFile(cmd.getOptionValue('x'));
763    }
764    if (cmd.hasOption('t')) {
765      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
766    }
767    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
768  }
769
770  @Override
771  protected int doWork() throws Exception {
772    boolean success;
773    try (RegionMover rm = rmbuilder.build()) {
774      if (loadUnload.equalsIgnoreCase("load")) {
775        success = rm.load();
776      } else if (loadUnload.equalsIgnoreCase("unload")) {
777        success = rm.unload();
778      } else {
779        printUsage();
780        success = false;
781      }
782    }
783    return (success ? 0 : 1);
784  }
785
786  public static void main(String[] args) {
787    try (RegionMover mover = new RegionMover()) {
788      mover.doStaticMain(args);
789    }
790  }
791}