001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.io.BufferedInputStream;
023import java.io.BufferedOutputStream;
024import java.io.Closeable;
025import java.io.DataInputStream;
026import java.io.DataOutputStream;
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.FileOutputStream;
030import java.io.IOException;
031import java.nio.file.Files;
032import java.nio.file.Paths;
033import java.util.ArrayList;
034import java.util.Collections;
035import java.util.EnumSet;
036import java.util.HashSet;
037import java.util.Iterator;
038import java.util.List;
039import java.util.Locale;
040import java.util.Set;
041import java.util.concurrent.Callable;
042import java.util.concurrent.CancellationException;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.Executors;
046import java.util.concurrent.Future;
047import java.util.concurrent.TimeUnit;
048import java.util.concurrent.TimeoutException;
049import java.util.function.Predicate;
050import org.apache.commons.io.IOUtils;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.hbase.ClusterMetrics.Option;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.HRegionLocation;
056import org.apache.hadoop.hbase.ServerName;
057import org.apache.hadoop.hbase.client.Admin;
058import org.apache.hadoop.hbase.client.Connection;
059import org.apache.hadoop.hbase.client.ConnectionFactory;
060import org.apache.hadoop.hbase.client.RegionInfo;
061import org.apache.hadoop.hbase.client.ResultScanner;
062import org.apache.hadoop.hbase.client.Scan;
063import org.apache.hadoop.hbase.client.Table;
064import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
070import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
071import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
072
073/**
074 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
075 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
076 * acknowledges if regions are online after movement while noAck mode is best effort mode that
077 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
078 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
079 * anyways. This can also be used by constructiong an Object using the builder and then calling
080 * {@link #load()} or {@link #unload()} methods for the desired operations.
081 */
082@InterfaceAudience.Public
083public class RegionMover extends AbstractHBaseTool implements Closeable {
084  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
085  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
086  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
087  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
088  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
089  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
090
091  private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
092
093  private RegionMoverBuilder rmbuilder;
094  private boolean ack = true;
095  private int maxthreads = 1;
096  private int timeout;
097  private String loadUnload;
098  private String hostname;
099  private String filename;
100  private String excludeFile;
101  private int port;
102  private Connection conn;
103  private Admin admin;
104
105  private RegionMover(RegionMoverBuilder builder) throws IOException {
106    this.hostname = builder.hostname;
107    this.filename = builder.filename;
108    this.excludeFile = builder.excludeFile;
109    this.maxthreads = builder.maxthreads;
110    this.ack = builder.ack;
111    this.port = builder.port;
112    this.timeout = builder.timeout;
113    setConf(builder.conf);
114    this.conn = ConnectionFactory.createConnection(conf);
115    this.admin = conn.getAdmin();
116  }
117
118  private RegionMover() {
119  }
120
121  @Override
122  public void close() {
123    IOUtils.closeQuietly(this.admin);
124    IOUtils.closeQuietly(this.conn);
125  }
126
127  /**
128   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
129   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
130   * {@link #ack(boolean)}, {@link #timeout(int)} methods to set the corresponding options
131   */
132  public static class RegionMoverBuilder {
133    private boolean ack = true;
134    private int maxthreads = 1;
135    private int timeout = Integer.MAX_VALUE;
136    private String hostname;
137    private String filename;
138    private String excludeFile = null;
139    private String defaultDir = System.getProperty("java.io.tmpdir");
140    @VisibleForTesting
141    final int port;
142    private final Configuration conf;
143
144    public RegionMoverBuilder(String hostname) {
145      this(hostname, createConf());
146    }
147
148    /**
149     * Creates a new configuration and sets region mover specific overrides
150     */
151    private static Configuration createConf() {
152      Configuration conf = HBaseConfiguration.create();
153      conf.setInt("hbase.client.prefetch.limit", 1);
154      conf.setInt("hbase.client.pause", 500);
155      conf.setInt("hbase.client.retries.number", 100);
156      return conf;
157    }
158
159    /**
160     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname
161     *     or hostname:port.
162     * @param conf Configuration object
163     */
164    public RegionMoverBuilder(String hostname, Configuration conf) {
165      String[] splitHostname = hostname.toLowerCase().split(":");
166      this.hostname = splitHostname[0];
167      if (splitHostname.length == 2) {
168        this.port = Integer.parseInt(splitHostname[1]);
169      } else {
170        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
171      }
172      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
173        + ":" + Integer.toString(this.port);
174      this.conf = conf;
175    }
176
177    /**
178     * Path of file where regions will be written to during unloading/read from during loading
179     * @param filename
180     * @return RegionMoverBuilder object
181     */
182    public RegionMoverBuilder filename(String filename) {
183      this.filename = filename;
184      return this;
185    }
186
187    /**
188     * Set the max number of threads that will be used to move regions
189     */
190    public RegionMoverBuilder maxthreads(int threads) {
191      this.maxthreads = threads;
192      return this;
193    }
194
195    /**
196     * Path of file containing hostnames to be excluded during region movement. Exclude file should
197     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
198     * host.
199     */
200    public RegionMoverBuilder excludeFile(String excludefile) {
201      this.excludeFile = excludefile;
202      return this;
203    }
204
205    /**
206     * Set ack/noAck mode.
207     * <p>
208     * In ack mode regions are acknowledged before and after moving and the move is retried
209     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
210     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
211     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
212     * <p>
213     * @param ack
214     * @return RegionMoverBuilder object
215     */
216    public RegionMoverBuilder ack(boolean ack) {
217      this.ack = ack;
218      return this;
219    }
220
221    /**
222     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
223     * movers also have a separate time which is hbase.move.wait.max * number of regions to
224     * load/unload
225     * @param timeout in seconds
226     * @return RegionMoverBuilder object
227     */
228    public RegionMoverBuilder timeout(int timeout) {
229      this.timeout = timeout;
230      return this;
231    }
232
233    /**
234     * This method builds the appropriate RegionMover object which can then be used to load/unload
235     * using load and unload methods
236     * @return RegionMover object
237     */
238    public RegionMover build() throws IOException {
239      return new RegionMover(this);
240    }
241  }
242
243  /**
244   * Move Regions and make sure that they are up on the target server.If a region movement fails we
245   * exit as failure
246   */
247  private class MoveWithAck implements Callable<Boolean> {
248    private RegionInfo region;
249    private ServerName targetServer;
250    private List<RegionInfo> movedRegions;
251    private ServerName sourceServer;
252
253    public MoveWithAck(RegionInfo regionInfo, ServerName sourceServer,
254        ServerName targetServer, List<RegionInfo> movedRegions) {
255      this.region = regionInfo;
256      this.targetServer = targetServer;
257      this.movedRegions = movedRegions;
258      this.sourceServer = sourceServer;
259    }
260
261    @Override
262    public Boolean call() throws IOException, InterruptedException {
263      boolean moved = false;
264      int count = 0;
265      int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX);
266      int maxWaitInSeconds =
267          admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
268      long startTime = EnvironmentEdgeManager.currentTime();
269      boolean sameServer = true;
270      // Assert we can scan the region in its current location
271      isSuccessfulScan(region);
272      LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
273          + targetServer);
274      while (count < retries && sameServer) {
275        if (count > 0) {
276          LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries));
277        }
278        count = count + 1;
279        admin.move(region.getEncodedNameAsBytes(), targetServer);
280        long maxWait = startTime + (maxWaitInSeconds * 1000);
281        while (EnvironmentEdgeManager.currentTime() < maxWait) {
282          sameServer = isSameServer(region, sourceServer);
283          if (!sameServer) {
284            break;
285          }
286          Thread.sleep(100);
287        }
288      }
289      if (sameServer) {
290        LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer
291            + ",newServer=" + this.targetServer);
292      } else {
293        isSuccessfulScan(region);
294        LOG.info("Moved Region "
295            + region.getRegionNameAsString()
296            + " cost:"
297            + String.format("%.3f",
298            (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000));
299        moved = true;
300        movedRegions.add(region);
301      }
302      return moved;
303    }
304  }
305
306  /**
307   * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the
308   * RS down anyways and not abort on a stuck region. Improves movement performance
309   */
310  private class MoveWithoutAck implements Callable<Boolean> {
311    private RegionInfo region;
312    private ServerName targetServer;
313    private List<RegionInfo> movedRegions;
314    private ServerName sourceServer;
315
316    public MoveWithoutAck(RegionInfo regionInfo, ServerName sourceServer,
317        ServerName targetServer, List<RegionInfo> movedRegions) {
318      this.region = regionInfo;
319      this.targetServer = targetServer;
320      this.movedRegions = movedRegions;
321      this.sourceServer = sourceServer;
322    }
323
324    @Override
325    public Boolean call() {
326      try {
327        LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
328            + targetServer);
329        admin.move(region.getEncodedNameAsBytes(), targetServer);
330        LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to "
331            + targetServer);
332      } catch (Exception e) {
333        LOG.error("Error Moving Region:" + region.getEncodedName(), e);
334      } finally {
335        // we add region to the moved regions list in No Ack Mode since this is best effort
336        movedRegions.add(region);
337      }
338      return true;
339    }
340  }
341
342  /**
343   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
344   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
345   * @return true if loading succeeded, false otherwise
346   */
347  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
348    ExecutorService loadPool = Executors.newFixedThreadPool(1);
349    Future<Boolean> loadTask = loadPool.submit(() -> {
350      try {
351        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
352        if (regionsToMove.isEmpty()) {
353          LOG.info("No regions to load.Exiting");
354          return true;
355        }
356        loadRegions(regionsToMove);
357      } catch (Exception e) {
358        LOG.error("Error while loading regions to " + hostname, e);
359        return false;
360      }
361      return true;
362    });
363    return waitTaskToFinish(loadPool, loadTask, "loading");
364  }
365
366  private void loadRegions(List<RegionInfo> regionsToMove)
367      throws Exception {
368    ServerName server = getTargetServer();
369    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
370    LOG.info(
371        "Moving " + regionsToMove.size() + " regions to " + server + " using " + this.maxthreads
372            + " threads.Ack mode:" + this.ack);
373
374    ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
375    List<Future<Boolean>> taskList = new ArrayList<>();
376    int counter = 0;
377    while (counter < regionsToMove.size()) {
378      RegionInfo region = regionsToMove.get(counter);
379      ServerName currentServer = getServerNameForRegion(region);
380      if (currentServer == null) {
381        LOG.warn(
382            "Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
383        counter++;
384        continue;
385      } else if (server.equals(currentServer)) {
386        LOG.info(
387            "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
388        counter++;
389        continue;
390      }
391      if (ack) {
392        Future<Boolean> task =
393            moveRegionsPool.submit(new MoveWithAck(region, currentServer, server, movedRegions));
394        taskList.add(task);
395      } else {
396        Future<Boolean> task =
397            moveRegionsPool.submit(new MoveWithoutAck(region, currentServer, server, movedRegions));
398        taskList.add(task);
399      }
400      counter++;
401    }
402
403    moveRegionsPool.shutdown();
404    long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
405        .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
406    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
407  }
408
409  /**
410   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
411   * noAck mode we do not make sure that region is successfully online on the target region
412   * server,hence it is best effort.We do not unload regions to hostnames given in
413   * {@link #excludeFile}.
414   * @return true if unloading succeeded, false otherwise
415   */
416  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
417    deleteFile(this.filename);
418    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
419    Future<Boolean> unloadTask = unloadPool.submit(() -> {
420      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
421      try {
422        // Get Online RegionServers
423        List<ServerName> regionServers = new ArrayList<>();
424        regionServers.addAll(admin.getRegionServers());
425        // Remove the host Region server from target Region Servers list
426        ServerName server = stripServer(regionServers, hostname, port);
427        if (server == null) {
428          LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
429              hostname, port);
430          LOG.debug("List of region servers: {}", regionServers);
431          return false;
432        }
433        // Remove RS present in the exclude file
434        stripExcludes(regionServers);
435
436        // Remove decommissioned RS
437        Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers());
438        if (CollectionUtils.isNotEmpty(decommissionedRS)) {
439          regionServers.removeIf(decommissionedRS::contains);
440          LOG.debug("Excluded RegionServers from unloading regions to because they " +
441            "are marked as decommissioned. Servers: {}", decommissionedRS);
442        }
443
444        stripMaster(regionServers);
445        if (regionServers.isEmpty()) {
446          LOG.warn("No Regions were moved - no servers available");
447          return false;
448        }
449        unloadRegions(server, regionServers, movedRegions);
450      } catch (Exception e) {
451        LOG.error("Error while unloading regions ", e);
452        return false;
453      } finally {
454        if (movedRegions != null) {
455          writeFile(filename, movedRegions);
456        }
457      }
458      return true;
459    });
460    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
461  }
462
463  private void unloadRegions(ServerName server, List<ServerName> regionServers,
464      List<RegionInfo> movedRegions) throws Exception {
465    while (true) {
466      List<RegionInfo> regionsToMove = admin.getRegions(server);
467      regionsToMove.removeAll(movedRegions);
468      if (regionsToMove.isEmpty()) {
469        LOG.info("No Regions to move....Quitting now");
470        break;
471      }
472      int counter = 0;
473      LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to "
474          + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:"
475          + ack);
476      ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
477      List<Future<Boolean>> taskList = new ArrayList<>();
478      int serverIndex = 0;
479      while (counter < regionsToMove.size()) {
480        if (ack) {
481          Future<Boolean> task = moveRegionsPool.submit(
482              new MoveWithAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
483                  movedRegions));
484          taskList.add(task);
485        } else {
486          Future<Boolean> task = moveRegionsPool.submit(
487              new MoveWithoutAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
488                  movedRegions));
489          taskList.add(task);
490        }
491        counter++;
492        serverIndex = (serverIndex + 1) % regionServers.size();
493      }
494      moveRegionsPool.shutdown();
495      long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
496          .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
497      waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
498    }
499  }
500
501  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
502      throws TimeoutException, InterruptedException, ExecutionException {
503    pool.shutdown();
504    try {
505      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
506        LOG.warn(
507            "Timed out before finishing the " + operation + " operation. Timeout: " + this.timeout
508                + "sec");
509        pool.shutdownNow();
510      }
511    } catch (InterruptedException e) {
512      pool.shutdownNow();
513      Thread.currentThread().interrupt();
514    }
515    try {
516      return task.get(5, TimeUnit.SECONDS);
517    } catch (InterruptedException e) {
518      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
519      throw e;
520    } catch (ExecutionException e) {
521      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
522      throw e;
523    }
524  }
525
526  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
527      List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
528    try {
529      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
530        moveRegionsPool.shutdownNow();
531      }
532    } catch (InterruptedException e) {
533      moveRegionsPool.shutdownNow();
534      Thread.currentThread().interrupt();
535    }
536    for (Future<Boolean> future : taskList) {
537      try {
538        // if even after shutdownNow threads are stuck we wait for 5 secs max
539        if (!future.get(5, TimeUnit.SECONDS)) {
540          LOG.error("Was Not able to move region....Exiting Now");
541          throw new Exception("Could not move region Exception");
542        }
543      } catch (InterruptedException e) {
544        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
545        throw e;
546      } catch (ExecutionException e) {
547        LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e);
548        throw e;
549      } catch (CancellationException e) {
550        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
551            + "secs", e);
552        throw e;
553      }
554    }
555  }
556
557  private ServerName getTargetServer() throws Exception {
558    ServerName server = null;
559    int maxWaitInSeconds =
560        admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
561    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
562    while (EnvironmentEdgeManager.currentTime() < maxWait) {
563      try {
564        List<ServerName> regionServers = new ArrayList<>();
565        regionServers.addAll(admin.getRegionServers());
566        // Remove the host Region server from target Region Servers list
567        server = stripServer(regionServers, hostname, port);
568        if (server != null) {
569          break;
570        } else {
571          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
572        }
573      } catch (IOException e) {
574        LOG.warn("Could not get list of region servers", e);
575      }
576      Thread.sleep(500);
577    }
578    if (server == null) {
579      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
580      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
581    }
582    return server;
583  }
584
585  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
586    List<RegionInfo> regions = new ArrayList<>();
587    File f = new File(filename);
588    if (!f.exists()) {
589      return regions;
590    }
591    try (DataInputStream dis = new DataInputStream(
592        new BufferedInputStream(new FileInputStream(f)))) {
593      int numRegions = dis.readInt();
594      int index = 0;
595      while (index < numRegions) {
596        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
597        index++;
598      }
599    } catch (IOException e) {
600      LOG.error("Error while reading regions from file:" + filename, e);
601      throw e;
602    }
603    return regions;
604  }
605
606  /**
607   * Write the number of regions moved in the first line followed by regions moved in subsequent
608   * lines
609   */
610  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
611    try (DataOutputStream dos = new DataOutputStream(
612        new BufferedOutputStream(new FileOutputStream(filename)))) {
613      dos.writeInt(movedRegions.size());
614      for (RegionInfo region : movedRegions) {
615        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
616      }
617    } catch (IOException e) {
618      LOG.error(
619          "ERROR: Was Not able to write regions moved to output file but moved " + movedRegions
620              .size() + " regions", e);
621      throw e;
622    }
623  }
624
625  private void deleteFile(String filename) {
626    File f = new File(filename);
627    if (f.exists()) {
628      f.delete();
629    }
630  }
631
632  /**
633   * @return List of servers from the exclude file in format 'hostname:port'.
634   */
635  private List<String> readExcludes(String excludeFile) throws IOException {
636    List<String> excludeServers = new ArrayList<>();
637    if (excludeFile == null) {
638      return excludeServers;
639    } else {
640      try {
641        Files.readAllLines(Paths.get(excludeFile)).stream().map(String::trim)
642            .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
643            .forEach(excludeServers::add);
644      } catch (IOException e) {
645        LOG.warn("Exception while reading excludes file, continuing anyways", e);
646      }
647      return excludeServers;
648    }
649  }
650
651  /**
652   * Excludes the servername whose hostname and port portion matches the list given in exclude file
653   */
654  private void stripExcludes(List<ServerName> regionServers) throws IOException {
655    if (excludeFile != null) {
656      List<String> excludes = readExcludes(excludeFile);
657      Iterator<ServerName> i = regionServers.iterator();
658      while (i.hasNext()) {
659        String rs = i.next().getServerName();
660        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" + rs
661            .split(ServerName.SERVERNAME_SEPARATOR)[1];
662        if (excludes.contains(rsPort)) {
663          i.remove();
664        }
665      }
666      LOG.info("Valid Region server targets are:" + regionServers.toString());
667      LOG.info("Excluded Servers are" + excludes.toString());
668    }
669  }
670
671  /**
672   * Exclude master from list of RSs to move regions to
673   */
674  private void stripMaster(List<ServerName> regionServers) throws IOException {
675    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
676    stripServer(regionServers, master.getHostname(), master.getPort());
677  }
678
679  /**
680   * Remove the servername whose hostname and port portion matches from the passed array of servers.
681   * Returns as side-effect the servername removed.
682   * @return server removed from list of Region Servers
683   */
684  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
685    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
686      ServerName server = iter.next();
687      if (server.getAddress().getHostname().equalsIgnoreCase(hostname) &&
688        server.getAddress().getPort() == port) {
689        iter.remove();
690        return server;
691      }
692    }
693    return null;
694  }
695
696  /**
697   * Tries to scan a row from passed region
698   */
699  private void isSuccessfulScan(RegionInfo region) throws IOException {
700    Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit()
701        .setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter())
702        .setCacheBlocks(false);
703    try (Table table = conn.getTable(region.getTable());
704        ResultScanner scanner = table.getScanner(scan)) {
705      scanner.next();
706    } catch (IOException e) {
707      LOG.error("Could not scan region:" + region.getEncodedName(), e);
708      throw e;
709    }
710  }
711
712  /**
713   * Returns true if passed region is still on serverName when we look at hbase:meta.
714   * @return true if region is hosted on serverName otherwise false
715   */
716  private boolean isSameServer(RegionInfo region, ServerName serverName)
717      throws IOException {
718    ServerName serverForRegion = getServerNameForRegion(region);
719    if (serverForRegion != null && serverForRegion.equals(serverName)) {
720      return true;
721    }
722    return false;
723  }
724
725  /**
726   * Get servername that is up in hbase:meta hosting the given region. this is hostname + port +
727   * startcode comma-delimited. Can return null
728   * @return regionServer hosting the given region
729   */
730  private ServerName getServerNameForRegion(RegionInfo region) throws IOException {
731    if (!admin.isTableEnabled(region.getTable())) {
732      return null;
733    }
734    HRegionLocation loc =
735      conn.getRegionLocator(region.getTable()).getRegionLocation(region.getStartKey(),
736        region.getReplicaId(),true);
737    if (loc != null) {
738      return loc.getServerName();
739    } else {
740      return null;
741    }
742  }
743
744  @Override
745  protected void addOptions() {
746    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
747    this.addRequiredOptWithArg("o", "operation", "Expected: load/unload");
748    this.addOptWithArg("m", "maxthreads",
749        "Define the maximum number of threads to use to unload and reload the regions");
750    this.addOptWithArg("x", "excludefile",
751        "File with <hostname:port> per line to exclude as unload targets; default excludes only "
752            + "target host; useful for rack decommisioning.");
753    this.addOptWithArg("f", "filename",
754        "File to save regions list into unloading, or read from loading; "
755            + "default /tmp/<usernamehostname:port>");
756    this.addOptNoArg("n", "noack",
757        "Turn on No-Ack mode(default: false) which won't check if region is online on target "
758            + "RegionServer, hence best effort. This is more performant in unloading and loading "
759            + "but might lead to region being unavailable for some time till master reassigns it "
760            + "in case the move failed");
761    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
762        + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
763  }
764
765  @Override
766  protected void processOptions(CommandLine cmd) {
767    String hostname = cmd.getOptionValue("r");
768    rmbuilder = new RegionMoverBuilder(hostname);
769    if (cmd.hasOption('m')) {
770      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
771    }
772    if (cmd.hasOption('n')) {
773      rmbuilder.ack(false);
774    }
775    if (cmd.hasOption('f')) {
776      rmbuilder.filename(cmd.getOptionValue('f'));
777    }
778    if (cmd.hasOption('x')) {
779      rmbuilder.excludeFile(cmd.getOptionValue('x'));
780    }
781    if (cmd.hasOption('t')) {
782      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
783    }
784    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
785  }
786
787  @Override
788  protected int doWork() throws Exception {
789    boolean success;
790    try (RegionMover rm = rmbuilder.build()) {
791      if (loadUnload.equalsIgnoreCase("load")) {
792        success = rm.load();
793      } else if (loadUnload.equalsIgnoreCase("unload")) {
794        success = rm.unload();
795      } else {
796        printUsage();
797        success = false;
798      }
799    }
800    return (success ? 0 : 1);
801  }
802
803  public static void main(String[] args) {
804    try (RegionMover mover = new RegionMover()) {
805      mover.doStaticMain(args);
806    }
807  }
808}