001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.net.URI;
022import java.net.URISyntaxException;
023import java.util.HashMap;
024import java.util.Map;
025import java.util.UUID;
026import org.apache.hadoop.conf.Configured;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
037import org.apache.hadoop.hbase.mapreduce.Import.Importer;
038import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
039import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.CommonFSUtils;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.util.Tool;
044import org.apache.hadoop.util.ToolRunner;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Tool used to copy a table to another one which can be on a different setup. It is also
051 * configurable with a start and time as well as a specification of the region server implementation
052 * if different from the local cluster.
053 */
054@InterfaceAudience.Public
055public class CopyTable extends Configured implements Tool {
056  private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class);
057
058  final static String NAME = "copytable";
059  long startTime = 0;
060  long endTime = HConstants.LATEST_TIMESTAMP;
061  int batch = Integer.MAX_VALUE;
062  int cacheRow = -1;
063  int versions = -1;
064  String tableName = null;
065  String startRow = null;
066  String stopRow = null;
067  String dstTableName = null;
068  URI peerUri = null;
069  /**
070   * @deprecated Since 3.0.0, will be removed in 4.0.0. Use {@link #peerUri} instead.
071   */
072  @Deprecated
073  String peerAddress = null;
074  String families = null;
075  boolean allCells = false;
076  static boolean shuffle = false;
077
078  boolean bulkload = false;
079  Path bulkloadDir = null;
080
081  boolean readingSnapshot = false;
082  String snapshot = null;
083
084  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
085
086  private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
087    FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
088    Path dir = new Path(fs.getWorkingDirectory(), NAME);
089    if (!fs.exists(dir)) {
090      fs.mkdirs(dir);
091    }
092    Path newDir = new Path(dir, UUID.randomUUID().toString());
093    if (withDirCreated) {
094      fs.mkdirs(newDir);
095    }
096    return newDir;
097  }
098
099  private void initCopyTableMapperJob(Job job, Scan scan) throws IOException {
100    Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class;
101    if (readingSnapshot) {
102      TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
103        generateUniqTempDir(true));
104    } else {
105      TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job);
106    }
107  }
108
109  /**
110   * Sets up the actual job.
111   * @param args The command line parameters.
112   * @return The newly created job.
113   * @throws IOException When setting up the job fails.
114   */
115  public Job createSubmittableJob(String[] args) throws IOException {
116    if (!doCommandLine(args)) {
117      return null;
118    }
119
120    String jobName = NAME + "_" + (tableName == null ? snapshot : tableName);
121    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName));
122    job.setJarByClass(CopyTable.class);
123    Scan scan = new Scan();
124
125    scan.setBatch(batch);
126    scan.setCacheBlocks(false);
127
128    if (cacheRow > 0) {
129      scan.setCaching(cacheRow);
130    } else {
131      scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100));
132    }
133
134    scan.setTimeRange(startTime, endTime);
135
136    if (allCells) {
137      scan.setRaw(true);
138    }
139    if (shuffle) {
140      job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
141    }
142    if (versions >= 0) {
143      scan.readVersions(versions);
144    }
145
146    if (startRow != null) {
147      scan.withStartRow(Bytes.toBytesBinary(startRow));
148    }
149
150    if (stopRow != null) {
151      scan.withStopRow(Bytes.toBytesBinary(stopRow));
152    }
153
154    if (families != null) {
155      String[] fams = families.split(",");
156      Map<String, String> cfRenameMap = new HashMap<>();
157      for (String fam : fams) {
158        String sourceCf;
159        if (fam.contains(":")) {
160          // fam looks like "sourceCfName:destCfName"
161          String[] srcAndDest = fam.split(":", 2);
162          sourceCf = srcAndDest[0];
163          String destCf = srcAndDest[1];
164          cfRenameMap.put(sourceCf, destCf);
165        } else {
166          // fam is just "sourceCf"
167          sourceCf = fam;
168        }
169        scan.addFamily(Bytes.toBytes(sourceCf));
170      }
171      Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
172    }
173    job.setNumReduceTasks(0);
174
175    if (bulkload) {
176      initCopyTableMapperJob(job, scan);
177
178      // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
179      TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
180
181      bulkloadDir = generateUniqTempDir(false);
182      LOG.info("HFiles will be stored at " + this.bulkloadDir);
183      HFileOutputFormat2.setOutputPath(job, bulkloadDir);
184      try (Connection conn = ConnectionFactory.createConnection(getConf());
185        Admin admin = conn.getAdmin()) {
186        HFileOutputFormat2.configureIncrementalLoadMap(job,
187          admin.getDescriptor((TableName.valueOf(dstTableName))));
188      }
189    } else {
190      initCopyTableMapperJob(job, scan);
191      if (peerUri != null) {
192        TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerUri);
193      } else if (peerAddress != null) {
194        TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress);
195      } else {
196        TableMapReduceUtil.initTableReducerJob(dstTableName, null, job);
197      }
198
199    }
200
201    return job;
202  }
203
204  /*
205   * @param errorMsg Error message. Can be null.
206   */
207  private static void printUsage(final String errorMsg) {
208    if (errorMsg != null && errorMsg.length() > 0) {
209      System.err.println("ERROR: " + errorMsg);
210    }
211    System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] "
212      + "[--new.name=NEW] [--peer.uri=URI|--peer.adr=ADR] <tablename | snapshotName>");
213    System.err.println();
214    System.err.println("Options:");
215    System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
216    System.err.println("              specify if different from current cluster");
217    System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
218    System.err.println(" startrow     the start row");
219    System.err.println(" stoprow      the stop row");
220    System.err.println(" starttime    beginning of the time range (unixtime in millis)");
221    System.err.println("              without endtime means from starttime to forever");
222    System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
223    System.err.println(" versions     number of cell versions to copy");
224    System.err.println(" new.name     new table's name");
225    System.err.println(" peer.uri     The URI of the peer cluster");
226    System.err.println(" peer.adr     Address of the peer cluster given in the format");
227    System.err.println("              hbase.zookeeper.quorum:hbase.zookeeper.client"
228      + ".port:zookeeper.znode.parent");
229    System.err.println("              Do not take effect if peer.uri is specified");
230    System.err.println("              Deprecated, please use peer.uri instead");
231    System.err.println(" families     comma-separated list of families to copy");
232    System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
233    System.err.println("              To keep the same name, just give \"cfName\"");
234    System.err.println(" all.cells    also copy delete markers and deleted cells");
235    System.err
236      .println(" bulkload     Write input into HFiles and bulk load to the destination " + "table");
237    System.err.println(" snapshot     Copy the data from snapshot to destination table.");
238    System.err.println();
239    System.err.println("Args:");
240    System.err.println(" tablename    Name of the table to copy");
241    System.err.println();
242    System.err.println("Examples:");
243    System.err
244      .println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
245    System.err.println(" $ hbase "
246      + "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 "
247      + "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
248    System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': ");
249    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
250      + "--snapshot --new.name=destTable sourceTableSnapshot");
251    System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': ");
252    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
253      + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot");
254    System.err.println("For performance consider the following general option:\n"
255      + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
256      + "  decreases the round trip time to the server and may increase performance.\n"
257      + "    -Dhbase.client.scanner.caching=100\n"
258      + "  The following should always be set to false, to prevent writing data twice, which may produce \n"
259      + "  inaccurate results.\n" + "    -Dmapreduce.map.speculative=false");
260  }
261
262  private boolean doCommandLine(final String[] args) {
263    if (args.length < 1) {
264      printUsage(null);
265      return false;
266    }
267    for (int i = 0; i < args.length; i++) {
268      String cmd = args[i];
269      if (cmd.equals("-h") || cmd.startsWith("--h")) {
270        printUsage(null);
271        return false;
272      }
273
274      final String startRowArgKey = "--startrow=";
275      if (cmd.startsWith(startRowArgKey)) {
276        startRow = cmd.substring(startRowArgKey.length());
277        continue;
278      }
279
280      final String stopRowArgKey = "--stoprow=";
281      if (cmd.startsWith(stopRowArgKey)) {
282        stopRow = cmd.substring(stopRowArgKey.length());
283        continue;
284      }
285
286      final String startTimeArgKey = "--starttime=";
287      if (cmd.startsWith(startTimeArgKey)) {
288        startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
289        continue;
290      }
291
292      final String endTimeArgKey = "--endtime=";
293      if (cmd.startsWith(endTimeArgKey)) {
294        endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
295        continue;
296      }
297
298      final String batchArgKey = "--batch=";
299      if (cmd.startsWith(batchArgKey)) {
300        batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
301        continue;
302      }
303
304      final String cacheRowArgKey = "--cacheRow=";
305      if (cmd.startsWith(cacheRowArgKey)) {
306        cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
307        continue;
308      }
309
310      final String versionsArgKey = "--versions=";
311      if (cmd.startsWith(versionsArgKey)) {
312        versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
313        continue;
314      }
315
316      final String newNameArgKey = "--new.name=";
317      if (cmd.startsWith(newNameArgKey)) {
318        dstTableName = cmd.substring(newNameArgKey.length());
319        continue;
320      }
321
322      final String peerUriArgKey = "--peer.uri=";
323      if (cmd.startsWith(peerUriArgKey)) {
324        try {
325          peerUri = new URI(cmd.substring(peerUriArgKey.length()));
326        } catch (URISyntaxException e) {
327          LOG.error("Malformed peer uri specified: {}", cmd, e);
328          return false;
329        }
330        continue;
331      }
332
333      final String peerAdrArgKey = "--peer.adr=";
334      if (cmd.startsWith(peerAdrArgKey)) {
335        peerAddress = cmd.substring(peerAdrArgKey.length());
336        continue;
337      }
338
339      final String familiesArgKey = "--families=";
340      if (cmd.startsWith(familiesArgKey)) {
341        families = cmd.substring(familiesArgKey.length());
342        continue;
343      }
344
345      if (cmd.startsWith("--all.cells")) {
346        allCells = true;
347        continue;
348      }
349
350      if (cmd.startsWith("--bulkload")) {
351        bulkload = true;
352        continue;
353      }
354
355      if (cmd.startsWith("--shuffle")) {
356        shuffle = true;
357        continue;
358      }
359
360      if (cmd.startsWith("--snapshot")) {
361        readingSnapshot = true;
362        continue;
363      }
364
365      if (i == args.length - 1) {
366        if (readingSnapshot) {
367          snapshot = cmd;
368        } else {
369          tableName = cmd;
370        }
371      } else {
372        printUsage("Invalid argument '" + cmd + "'");
373        return false;
374      }
375    }
376    if (dstTableName == null && peerAddress == null) {
377      printUsage("At least a new table name or a peer address must be specified");
378      return false;
379    }
380    if ((endTime != 0) && (startTime > endTime)) {
381      printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
382      return false;
383    }
384
385    if (bulkload && (peerUri != null || peerAddress != null)) {
386      printUsage("Remote bulkload is not supported!");
387      return false;
388    }
389
390    if (readingSnapshot && (peerUri != null || peerAddress != null)) {
391      printUsage("Loading data from snapshot to remote peer cluster is not supported.");
392      return false;
393    }
394
395    if (readingSnapshot && dstTableName == null) {
396      printUsage("The --new.name=<table> for destination table should be "
397        + "provided when copying data from snapshot .");
398      return false;
399    }
400
401    if (readingSnapshot && snapshot == null) {
402      printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
403      return false;
404    }
405
406    // set dstTableName if necessary
407    if (dstTableName == null) {
408      dstTableName = tableName;
409    }
410    return true;
411  }
412
413  /**
414   * Main entry point.
415   * @param args The command line parameters.
416   * @throws Exception When running the job fails.
417   */
418  public static void main(String[] args) throws Exception {
419    int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args);
420    System.exit(ret);
421  }
422
423  @Override
424  public int run(String[] args) throws Exception {
425    Job job = createSubmittableJob(args);
426    if (job == null) {
427      return 1;
428    }
429    if (!job.waitForCompletion(true)) {
430      LOG.info("Map-reduce job failed!");
431      if (bulkload) {
432        LOG.info("Files are not bulkloaded!");
433      }
434      return 1;
435    }
436    int code = 0;
437    if (bulkload) {
438      LOG.info("Trying to bulk load data to destination table: " + dstTableName);
439      LOG.info("command: ./bin/hbase {} {} {}", BulkLoadHFilesTool.NAME,
440        this.bulkloadDir.toString(), this.dstTableName);
441      if (
442        !BulkLoadHFiles.create(getConf()).bulkLoad(TableName.valueOf(dstTableName), bulkloadDir)
443          .isEmpty()
444      ) {
445        // bulkloadDir is deleted only BulkLoadHFiles was successful so that one can rerun
446        // BulkLoadHFiles.
447        FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
448        if (!fs.delete(this.bulkloadDir, true)) {
449          LOG.error("Deleting folder " + bulkloadDir + " failed!");
450          code = 1;
451        }
452      }
453    }
454    return code;
455  }
456}