001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.net.URI;
022import java.net.URISyntaxException;
023import java.util.HashMap;
024import java.util.Map;
025import java.util.UUID;
026import org.apache.hadoop.conf.Configured;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
037import org.apache.hadoop.hbase.mapreduce.Import.Importer;
038import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
039import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.CommonFSUtils;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.util.Tool;
044import org.apache.hadoop.util.ToolRunner;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Tool used to copy a table to another one which can be on a different setup. It is also
051 * configurable with a start and time as well as a specification of the region server implementation
052 * if different from the local cluster.
053 */
054@InterfaceAudience.Public
055public class CopyTable extends Configured implements Tool {
056  private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class);
057
058  final static String NAME = "copytable";
059  long startTime = 0;
060  long endTime = HConstants.LATEST_TIMESTAMP;
061  int batch = Integer.MAX_VALUE;
062  int cacheRow = -1;
063  int versions = -1;
064  String tableName = null;
065  String startRow = null;
066  String stopRow = null;
067  String dstTableName = null;
068  URI peerUri = null;
069  /**
070   * @deprecated Since 3.0.0, will be removed in 4.0.0. Use {@link #peerUri} instead.
071   */
072  @Deprecated
073  String peerAddress = null;
074  String families = null;
075  boolean allCells = false;
076  static boolean shuffle = false;
077
078  boolean bulkload = false;
079  Path bulkloadDir = null;
080
081  boolean readingSnapshot = false;
082  String snapshot = null;
083
084  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
085
086  private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
087    FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
088    Path dir = new Path(fs.getWorkingDirectory(), NAME);
089    if (!fs.exists(dir)) {
090      fs.mkdirs(dir);
091    }
092    Path newDir = new Path(dir, UUID.randomUUID().toString());
093    if (withDirCreated) {
094      fs.mkdirs(newDir);
095    }
096    return newDir;
097  }
098
099  private void initCopyTableMapperJob(Job job, Scan scan) throws IOException {
100    Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class;
101    if (readingSnapshot) {
102      TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
103        generateUniqTempDir(true));
104    } else {
105      TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job);
106    }
107  }
108
109  /**
110   * Sets up the actual job.
111   * @param args The command line parameters.
112   * @return The newly created job.
113   * @throws IOException When setting up the job fails.
114   */
115  public Job createSubmittableJob(String[] args) throws IOException {
116    if (!doCommandLine(args)) {
117      return null;
118    }
119
120    String jobName = NAME + "_" + (tableName == null ? snapshot : tableName);
121    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName));
122    job.setJarByClass(CopyTable.class);
123    Scan scan = new Scan();
124
125    scan.setBatch(batch);
126    scan.setCacheBlocks(false);
127
128    if (cacheRow > 0) {
129      scan.setCaching(cacheRow);
130    } else {
131      scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100));
132    }
133
134    scan.setTimeRange(startTime, endTime);
135
136    if (allCells) {
137      scan.setRaw(true);
138    }
139    if (shuffle) {
140      job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
141    }
142    if (versions >= 0) {
143      scan.readVersions(versions);
144    }
145
146    if (startRow != null) {
147      scan.withStartRow(Bytes.toBytesBinary(startRow));
148    }
149
150    if (stopRow != null) {
151      scan.withStopRow(Bytes.toBytesBinary(stopRow));
152    }
153
154    if (families != null) {
155      String[] fams = families.split(",");
156      Map<String, String> cfRenameMap = new HashMap<>();
157      for (String fam : fams) {
158        String sourceCf;
159        if (fam.contains(":")) {
160          // fam looks like "sourceCfName:destCfName"
161          String[] srcAndDest = fam.split(":", 2);
162          sourceCf = srcAndDest[0];
163          String destCf = srcAndDest[1];
164          cfRenameMap.put(sourceCf, destCf);
165        } else {
166          // fam is just "sourceCf"
167          sourceCf = fam;
168        }
169        scan.addFamily(Bytes.toBytes(sourceCf));
170      }
171      Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
172    }
173    job.setNumReduceTasks(0);
174
175    if (bulkload) {
176      initCopyTableMapperJob(job, scan);
177
178      // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
179      TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
180
181      bulkloadDir = generateUniqTempDir(false);
182      LOG.info("HFiles will be stored at " + this.bulkloadDir);
183      HFileOutputFormat2.setOutputPath(job, bulkloadDir);
184      try (Connection conn = ConnectionFactory.createConnection(getConf());
185        Admin admin = conn.getAdmin()) {
186        HFileOutputFormat2.configureIncrementalLoadMap(job,
187          admin.getDescriptor((TableName.valueOf(dstTableName))));
188      }
189    } else {
190      initCopyTableMapperJob(job, scan);
191      if (peerUri != null) {
192        TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerUri);
193      } else if (peerAddress != null) {
194        TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress);
195      } else {
196        TableMapReduceUtil.initTableReducerJob(dstTableName, null, job);
197      }
198
199    }
200
201    return job;
202  }
203
204  /*
205   * @param errorMsg Error message. Can be null.
206   */
207  private static void printUsage(final String errorMsg) {
208    if (errorMsg != null && errorMsg.length() > 0) {
209      System.err.println("ERROR: " + errorMsg);
210    }
211    System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] "
212      + "[--new.name=NEW] [--peer.uri=URI|--peer.adr=ADR] <tablename | snapshotName>");
213    System.err.println();
214    System.err.println("Options:");
215    System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
216    System.err.println("              specify if different from current cluster");
217    System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
218    System.err.println(" startrow     the start row");
219    System.err.println(" stoprow      the stop row");
220    System.err.println(" starttime    beginning of the time range (unixtime in millis)");
221    System.err.println("              without endtime means from starttime to forever");
222    System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
223    System.err.println(" versions     number of cell versions to copy");
224    System.err.println(" new.name     new table's name");
225    System.err.println(" peer.uri     The URI of the peer cluster");
226    System.err.println(" peer.adr     Address of the peer cluster given in the format");
227    System.err.println("              hbase.zookeeper.quorum:hbase.zookeeper.client"
228      + ".port:zookeeper.znode.parent");
229    System.err.println("              Do not take effect if peer.uri is specified");
230    System.err.println("              Deprecated, please use peer.uri instead");
231    System.err.println(" families     comma-separated list of families to copy");
232    System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
233    System.err.println("              To keep the same name, just give \"cfName\"");
234    System.err.println(" all.cells    also copy delete markers and deleted cells");
235    System.err
236      .println(" bulkload     Write input into HFiles and bulk load to the destination " + "table");
237    System.err.println(" snapshot     Copy the data from snapshot to destination table.");
238    System.err.println();
239    System.err.println("Args:");
240    System.err.println(" tablename    Name of the table to copy");
241    System.err.println();
242    System.err.println("Examples:");
243    System.err
244      .println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
245    System.err.println(" $ hbase "
246      + "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 "
247      + "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
248    System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': ");
249    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
250      + "--snapshot --new.name=destTable sourceTableSnapshot");
251    System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': ");
252    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
253      + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot");
254    System.err.println();
255    System.err.println(
256      " To copy the data of 'TestTable' from the secured local cluster to a non-secured peer"
257        + " cluster (cluster-b)");
258    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
259      + "-Dhbase.mapred.output.hbase.security.authentication=simple "
260      + "--peer.adr=cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
261      + "2181:/cluster-b" + " TestTable");
262    System.err.println();
263    System.err.println(
264      " To copy the data of 'TestTable' from the local secured cluster to a peer secured cluster "
265        + "in a different Kerberos realm.");
266    System.err.println(" Assume cluster-b uses a different Kerberos principal "
267      + "(cluster-b/_HOST@EXAMPLE.COM) for its master and regionserver.");
268    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
269      + "-Dhbase.mapred.output.hbase.regionserver.kerberos.principal="
270      + "cluster-b/_HOST@EXAMPLE.COM "
271      + "-Dhbase.mapred.output.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM "
272      + "--peer.adr=cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
273      + "2181:/cluster-b" + " TestTable");
274    System.err.println();
275    System.err.println(
276      " To copy the data of 'TestTable' from a non-secured local cluster to a secured peer cluster"
277        + " (cluster-b)");
278    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
279      + "-Dhbase.mapred.output.hbase.security.authentication=kerberos "
280      + "-Dhbase.mapred.output.hbase.regionserver.kerberos.principal="
281      + "cluster-b/_HOST@EXAMPLE.COM "
282      + "-Dhbase.mapred.output.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM "
283      + "--peer.adr=cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
284      + "2181:/cluster-b" + " TestTable");
285    System.err.println();
286    System.err.println("For performance consider the following general option:\n"
287      + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
288      + "  decreases the round trip time to the server and may increase performance.\n"
289      + "    -Dhbase.client.scanner.caching=100\n"
290      + "  The following should always be set to false, to prevent writing data twice, which may produce \n"
291      + "  inaccurate results.\n" + "    -Dmapreduce.map.speculative=false");
292  }
293
294  private boolean doCommandLine(final String[] args) {
295    if (args.length < 1) {
296      printUsage(null);
297      return false;
298    }
299    for (int i = 0; i < args.length; i++) {
300      String cmd = args[i];
301      if (cmd.equals("-h") || cmd.startsWith("--h")) {
302        printUsage(null);
303        return false;
304      }
305
306      final String startRowArgKey = "--startrow=";
307      if (cmd.startsWith(startRowArgKey)) {
308        startRow = cmd.substring(startRowArgKey.length());
309        continue;
310      }
311
312      final String stopRowArgKey = "--stoprow=";
313      if (cmd.startsWith(stopRowArgKey)) {
314        stopRow = cmd.substring(stopRowArgKey.length());
315        continue;
316      }
317
318      final String startTimeArgKey = "--starttime=";
319      if (cmd.startsWith(startTimeArgKey)) {
320        startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
321        continue;
322      }
323
324      final String endTimeArgKey = "--endtime=";
325      if (cmd.startsWith(endTimeArgKey)) {
326        endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
327        continue;
328      }
329
330      final String batchArgKey = "--batch=";
331      if (cmd.startsWith(batchArgKey)) {
332        batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
333        continue;
334      }
335
336      final String cacheRowArgKey = "--cacheRow=";
337      if (cmd.startsWith(cacheRowArgKey)) {
338        cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
339        continue;
340      }
341
342      final String versionsArgKey = "--versions=";
343      if (cmd.startsWith(versionsArgKey)) {
344        versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
345        continue;
346      }
347
348      final String newNameArgKey = "--new.name=";
349      if (cmd.startsWith(newNameArgKey)) {
350        dstTableName = cmd.substring(newNameArgKey.length());
351        continue;
352      }
353
354      final String peerUriArgKey = "--peer.uri=";
355      if (cmd.startsWith(peerUriArgKey)) {
356        try {
357          peerUri = new URI(cmd.substring(peerUriArgKey.length()));
358        } catch (URISyntaxException e) {
359          LOG.error("Malformed peer uri specified: {}", cmd, e);
360          return false;
361        }
362        continue;
363      }
364
365      final String peerAdrArgKey = "--peer.adr=";
366      if (cmd.startsWith(peerAdrArgKey)) {
367        peerAddress = cmd.substring(peerAdrArgKey.length());
368        continue;
369      }
370
371      final String familiesArgKey = "--families=";
372      if (cmd.startsWith(familiesArgKey)) {
373        families = cmd.substring(familiesArgKey.length());
374        continue;
375      }
376
377      if (cmd.startsWith("--all.cells")) {
378        allCells = true;
379        continue;
380      }
381
382      if (cmd.startsWith("--bulkload")) {
383        bulkload = true;
384        continue;
385      }
386
387      if (cmd.startsWith("--shuffle")) {
388        shuffle = true;
389        continue;
390      }
391
392      if (cmd.startsWith("--snapshot")) {
393        readingSnapshot = true;
394        continue;
395      }
396
397      if (i == args.length - 1) {
398        if (readingSnapshot) {
399          snapshot = cmd;
400        } else {
401          tableName = cmd;
402        }
403      } else {
404        printUsage("Invalid argument '" + cmd + "'");
405        return false;
406      }
407    }
408    if (dstTableName == null && peerAddress == null) {
409      printUsage("At least a new table name or a peer address must be specified");
410      return false;
411    }
412    if ((endTime != 0) && (startTime > endTime)) {
413      printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
414      return false;
415    }
416
417    if (bulkload && (peerUri != null || peerAddress != null)) {
418      printUsage("Remote bulkload is not supported!");
419      return false;
420    }
421
422    if (readingSnapshot && (peerUri != null || peerAddress != null)) {
423      printUsage("Loading data from snapshot to remote peer cluster is not supported.");
424      return false;
425    }
426
427    if (readingSnapshot && dstTableName == null) {
428      printUsage("The --new.name=<table> for destination table should be "
429        + "provided when copying data from snapshot .");
430      return false;
431    }
432
433    if (readingSnapshot && snapshot == null) {
434      printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
435      return false;
436    }
437
438    // set dstTableName if necessary
439    if (dstTableName == null) {
440      dstTableName = tableName;
441    }
442    return true;
443  }
444
445  /**
446   * Main entry point.
447   * @param args The command line parameters.
448   * @throws Exception When running the job fails.
449   */
450  public static void main(String[] args) throws Exception {
451    int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args);
452    System.exit(ret);
453  }
454
455  @Override
456  public int run(String[] args) throws Exception {
457    Job job = createSubmittableJob(args);
458    if (job == null) {
459      return 1;
460    }
461    if (!job.waitForCompletion(true)) {
462      LOG.info("Map-reduce job failed!");
463      if (bulkload) {
464        LOG.info("Files are not bulkloaded!");
465      }
466      return 1;
467    }
468    int code = 0;
469    if (bulkload) {
470      LOG.info("Trying to bulk load data to destination table: " + dstTableName);
471      LOG.info("command: ./bin/hbase {} {} {}", BulkLoadHFilesTool.NAME,
472        this.bulkloadDir.toString(), this.dstTableName);
473      if (
474        !BulkLoadHFiles.create(getConf()).bulkLoad(TableName.valueOf(dstTableName), bulkloadDir)
475          .isEmpty()
476      ) {
477        // bulkloadDir is deleted only BulkLoadHFiles was successful so that one can rerun
478        // BulkLoadHFiles.
479        FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
480        if (!fs.delete(this.bulkloadDir, true)) {
481          LOG.error("Deleting folder " + bulkloadDir + " failed!");
482          code = 1;
483        }
484      }
485    }
486    return code;
487  }
488}