001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.IOException;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.UUID;
025import org.apache.hadoop.conf.Configured;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
036import org.apache.hadoop.hbase.mapreduce.Import.Importer;
037import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.CommonFSUtils;
040import org.apache.hadoop.mapreduce.Job;
041import org.apache.hadoop.util.Tool;
042import org.apache.hadoop.util.ToolRunner;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * Tool used to copy a table to another one which can be on a different setup.
049 * It is also configurable with a start and time as well as a specification
050 * of the region server implementation if different from the local cluster.
051 */
052@InterfaceAudience.Public
053public class CopyTable extends Configured implements Tool {
054  private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class);
055
056  final static String NAME = "copytable";
057  long startTime = 0;
058  long endTime = HConstants.LATEST_TIMESTAMP;
059  int batch = Integer.MAX_VALUE;
060  int cacheRow = -1;
061  int versions = -1;
062  String tableName = null;
063  String startRow = null;
064  String stopRow = null;
065  String dstTableName = null;
066  String peerAddress = null;
067  String families = null;
068  boolean allCells = false;
069  static boolean shuffle = false;
070
071  boolean bulkload = false;
072  Path bulkloadDir = null;
073
074  boolean readingSnapshot = false;
075  String snapshot = null;
076
077  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
078
079  private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
080    FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
081    Path dir = new Path(fs.getWorkingDirectory(), NAME);
082    if (!fs.exists(dir)) {
083      fs.mkdirs(dir);
084    }
085    Path newDir = new Path(dir, UUID.randomUUID().toString());
086    if (withDirCreated) {
087      fs.mkdirs(newDir);
088    }
089    return newDir;
090  }
091
092  private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException {
093    Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class;
094    if (readingSnapshot) {
095      TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
096        generateUniqTempDir(true));
097    } else {
098      TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job);
099    }
100  }
101
102  /**
103   * Sets up the actual job.
104   *
105   * @param args  The command line parameters.
106   * @return The newly created job.
107   * @throws IOException When setting up the job fails.
108   */
109  public Job createSubmittableJob(String[] args) throws IOException {
110    if (!doCommandLine(args)) {
111      return null;
112    }
113
114    String jobName = NAME + "_" + (tableName == null ? snapshot : tableName);
115    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName));
116    job.setJarByClass(CopyTable.class);
117    Scan scan = new Scan();
118
119    scan.setBatch(batch);
120    scan.setCacheBlocks(false);
121
122    if (cacheRow > 0) {
123      scan.setCaching(cacheRow);
124    } else {
125      scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100));
126    }
127
128    scan.setTimeRange(startTime, endTime);
129
130    if (allCells) {
131      scan.setRaw(true);
132    }
133    if (shuffle) {
134      job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
135    }
136    if (versions >= 0) {
137      scan.readVersions(versions);
138    }
139
140    if (startRow != null) {
141      scan.withStartRow(Bytes.toBytesBinary(startRow));
142    }
143
144    if (stopRow != null) {
145      scan.withStopRow(Bytes.toBytesBinary(stopRow));
146    }
147
148    if(families != null) {
149      String[] fams = families.split(",");
150      Map<String,String> cfRenameMap = new HashMap<>();
151      for(String fam : fams) {
152        String sourceCf;
153        if(fam.contains(":")) {
154            // fam looks like "sourceCfName:destCfName"
155            String[] srcAndDest = fam.split(":", 2);
156            sourceCf = srcAndDest[0];
157            String destCf = srcAndDest[1];
158            cfRenameMap.put(sourceCf, destCf);
159        } else {
160            // fam is just "sourceCf"
161            sourceCf = fam;
162        }
163        scan.addFamily(Bytes.toBytes(sourceCf));
164      }
165      Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
166    }
167    job.setNumReduceTasks(0);
168
169    if (bulkload) {
170      initCopyTableMapperReducerJob(job, scan);
171
172      // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
173      TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
174
175      bulkloadDir = generateUniqTempDir(false);
176      LOG.info("HFiles will be stored at " + this.bulkloadDir);
177      HFileOutputFormat2.setOutputPath(job, bulkloadDir);
178      try (Connection conn = ConnectionFactory.createConnection(getConf());
179          Admin admin = conn.getAdmin()) {
180        HFileOutputFormat2.configureIncrementalLoadMap(job,
181          admin.getDescriptor((TableName.valueOf(dstTableName))));
182      }
183    } else {
184      initCopyTableMapperReducerJob(job, scan);
185      TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
186        null);
187    }
188
189    return job;
190  }
191
192  /*
193   * @param errorMsg Error message.  Can be null.
194   */
195  private static void printUsage(final String errorMsg) {
196    if (errorMsg != null && errorMsg.length() > 0) {
197      System.err.println("ERROR: " + errorMsg);
198    }
199    System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
200        "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>");
201    System.err.println();
202    System.err.println("Options:");
203    System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
204    System.err.println("              specify if different from current cluster");
205    System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
206    System.err.println(" startrow     the start row");
207    System.err.println(" stoprow      the stop row");
208    System.err.println(" starttime    beginning of the time range (unixtime in millis)");
209    System.err.println("              without endtime means from starttime to forever");
210    System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
211    System.err.println(" versions     number of cell versions to copy");
212    System.err.println(" new.name     new table's name");
213    System.err.println(" peer.adr     Address of the peer cluster given in the format");
214    System.err.println("              hbase.zookeeper.quorum:hbase.zookeeper.client"
215        + ".port:zookeeper.znode.parent");
216    System.err.println(" families     comma-separated list of families to copy");
217    System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
218    System.err.println("              To keep the same name, just give \"cfName\"");
219    System.err.println(" all.cells    also copy delete markers and deleted cells");
220    System.err.println(" bulkload     Write input into HFiles and bulk load to the destination "
221        + "table");
222    System.err.println(" snapshot     Copy the data from snapshot to destination table.");
223    System.err.println();
224    System.err.println("Args:");
225    System.err.println(" tablename    Name of the table to copy");
226    System.err.println();
227    System.err.println("Examples:");
228    System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
229    System.err.println(" $ hbase " +
230        "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
231        "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
232    System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': ");
233    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
234        + "--snapshot --new.name=destTable sourceTableSnapshot");
235    System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': ");
236    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
237        + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot");
238    System.err.println("For performance consider the following general option:\n"
239        + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
240        + "  decreases the round trip time to the server and may increase performance.\n"
241        + "    -Dhbase.client.scanner.caching=100\n"
242        + "  The following should always be set to false, to prevent writing data twice, which may produce \n"
243        + "  inaccurate results.\n"
244        + "    -Dmapreduce.map.speculative=false");
245  }
246
247  private boolean doCommandLine(final String[] args) {
248    if (args.length < 1) {
249      printUsage(null);
250      return false;
251    }
252    try {
253      for (int i = 0; i < args.length; i++) {
254        String cmd = args[i];
255        if (cmd.equals("-h") || cmd.startsWith("--h")) {
256          printUsage(null);
257          return false;
258        }
259
260        final String startRowArgKey = "--startrow=";
261        if (cmd.startsWith(startRowArgKey)) {
262          startRow = cmd.substring(startRowArgKey.length());
263          continue;
264        }
265
266        final String stopRowArgKey = "--stoprow=";
267        if (cmd.startsWith(stopRowArgKey)) {
268          stopRow = cmd.substring(stopRowArgKey.length());
269          continue;
270        }
271
272        final String startTimeArgKey = "--starttime=";
273        if (cmd.startsWith(startTimeArgKey)) {
274          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
275          continue;
276        }
277
278        final String endTimeArgKey = "--endtime=";
279        if (cmd.startsWith(endTimeArgKey)) {
280          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
281          continue;
282        }
283
284        final String batchArgKey = "--batch=";
285        if (cmd.startsWith(batchArgKey)) {
286          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
287          continue;
288        }
289
290        final String cacheRowArgKey = "--cacheRow=";
291        if (cmd.startsWith(cacheRowArgKey)) {
292          cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
293          continue;
294        }
295
296        final String versionsArgKey = "--versions=";
297        if (cmd.startsWith(versionsArgKey)) {
298          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
299          continue;
300        }
301
302        final String newNameArgKey = "--new.name=";
303        if (cmd.startsWith(newNameArgKey)) {
304          dstTableName = cmd.substring(newNameArgKey.length());
305          continue;
306        }
307
308        final String peerAdrArgKey = "--peer.adr=";
309        if (cmd.startsWith(peerAdrArgKey)) {
310          peerAddress = cmd.substring(peerAdrArgKey.length());
311          continue;
312        }
313
314        final String familiesArgKey = "--families=";
315        if (cmd.startsWith(familiesArgKey)) {
316          families = cmd.substring(familiesArgKey.length());
317          continue;
318        }
319
320        if (cmd.startsWith("--all.cells")) {
321          allCells = true;
322          continue;
323        }
324
325        if (cmd.startsWith("--bulkload")) {
326          bulkload = true;
327          continue;
328        }
329
330        if (cmd.startsWith("--shuffle")) {
331          shuffle = true;
332          continue;
333        }
334
335        if(cmd.startsWith("--snapshot")){
336          readingSnapshot = true;
337          continue;
338        }
339
340        if (i == args.length - 1) {
341          if (readingSnapshot) {
342            snapshot = cmd;
343          } else {
344            tableName = cmd;
345          }
346        } else {
347          printUsage("Invalid argument '" + cmd + "'");
348          return false;
349        }
350      }
351      if (dstTableName == null && peerAddress == null) {
352        printUsage("At least a new table name or a peer address must be specified");
353        return false;
354      }
355      if ((endTime != 0) && (startTime > endTime)) {
356        printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
357        return false;
358      }
359
360      if (bulkload && peerAddress != null) {
361        printUsage("Remote bulkload is not supported!");
362        return false;
363      }
364
365      if (readingSnapshot && peerAddress != null) {
366        printUsage("Loading data from snapshot to remote peer cluster is not supported.");
367        return false;
368      }
369
370      if (readingSnapshot && dstTableName == null) {
371        printUsage("The --new.name=<table> for destination table should be "
372            + "provided when copying data from snapshot .");
373        return false;
374      }
375
376      if (readingSnapshot && snapshot == null) {
377        printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
378        return false;
379      }
380
381      // set dstTableName if necessary
382      if (dstTableName == null) {
383        dstTableName = tableName;
384      }
385    } catch (Exception e) {
386      e.printStackTrace();
387      printUsage("Can't start because " + e.getMessage());
388      return false;
389    }
390    return true;
391  }
392
393  /**
394   * Main entry point.
395   *
396   * @param args  The command line parameters.
397   * @throws Exception When running the job fails.
398   */
399  public static void main(String[] args) throws Exception {
400    int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args);
401    System.exit(ret);
402  }
403
404  @Override
405  public int run(String[] args) throws Exception {
406    Job job = createSubmittableJob(args);
407    if (job == null) return 1;
408    if (!job.waitForCompletion(true)) {
409      LOG.info("Map-reduce job failed!");
410      if (bulkload) {
411        LOG.info("Files are not bulkloaded!");
412      }
413      return 1;
414    }
415    int code = 0;
416    if (bulkload) {
417      LOG.info("Trying to bulk load data to destination table: " + dstTableName);
418      LOG.info("command: ./bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {} {}",
419        this.bulkloadDir.toString(), this.dstTableName);
420      code = new LoadIncrementalHFiles(this.getConf())
421          .run(new String[] { this.bulkloadDir.toString(), this.dstTableName });
422      if (code == 0) {
423        // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
424        // LoadIncrementalHFiles.
425        FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
426        if (!fs.delete(this.bulkloadDir, true)) {
427          LOG.error("Deleting folder " + bulkloadDir + " failed!");
428          code = 1;
429        }
430      }
431    }
432    return code;
433  }
434}