001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.IOException;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.Random;
025
026import org.apache.hadoop.conf.Configured;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.util.FSUtils;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.ConnectionFactory;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.util.Tool;
044import org.apache.hadoop.util.ToolRunner;
045
046/**
047 * Tool used to copy a table to another one which can be on a different setup.
048 * It is also configurable with a start and time as well as a specification
049 * of the region server implementation if different from the local cluster.
050 */
051@InterfaceAudience.Public
052public class CopyTable extends Configured implements Tool {
053  private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class);
054
055  final static String NAME = "copytable";
056  long startTime = 0;
057  long endTime = HConstants.LATEST_TIMESTAMP;
058  int batch = Integer.MAX_VALUE;
059  int cacheRow = -1;
060  int versions = -1;
061  String tableName = null;
062  String startRow = null;
063  String stopRow = null;
064  String dstTableName = null;
065  String peerAddress = null;
066  String families = null;
067  boolean allCells = false;
068  static boolean shuffle = false;
069
070  boolean bulkload = false;
071  Path bulkloadDir = null;
072
073  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
074
075  /**
076   * Sets up the actual job.
077   *
078   * @param args  The command line parameters.
079   * @return The newly created job.
080   * @throws IOException When setting up the job fails.
081   */
082  public Job createSubmittableJob(String[] args)
083  throws IOException {
084    if (!doCommandLine(args)) {
085      return null;
086    }
087
088    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
089    job.setJarByClass(CopyTable.class);
090    Scan scan = new Scan();
091
092    scan.setBatch(batch);
093    scan.setCacheBlocks(false);
094
095    if (cacheRow > 0) {
096      scan.setCaching(cacheRow);
097    } else {
098      scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100));
099    }
100
101    scan.setTimeRange(startTime, endTime);
102
103    if (allCells) {
104      scan.setRaw(true);
105    }
106    if (shuffle) {
107      job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
108    }
109    if (versions >= 0) {
110      scan.setMaxVersions(versions);
111    }
112
113    if (startRow != null) {
114      scan.setStartRow(Bytes.toBytesBinary(startRow));
115    }
116
117    if (stopRow != null) {
118      scan.setStopRow(Bytes.toBytesBinary(stopRow));
119    }
120
121    if(families != null) {
122      String[] fams = families.split(",");
123      Map<String,String> cfRenameMap = new HashMap<>();
124      for(String fam : fams) {
125        String sourceCf;
126        if(fam.contains(":")) {
127            // fam looks like "sourceCfName:destCfName"
128            String[] srcAndDest = fam.split(":", 2);
129            sourceCf = srcAndDest[0];
130            String destCf = srcAndDest[1];
131            cfRenameMap.put(sourceCf, destCf);
132        } else {
133            // fam is just "sourceCf"
134            sourceCf = fam;
135        }
136        scan.addFamily(Bytes.toBytes(sourceCf));
137      }
138      Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
139    }
140    job.setNumReduceTasks(0);
141
142    if (bulkload) {
143      TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null, null,
144        job);
145
146      // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
147      TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
148
149      FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
150      Random rand = new Random();
151      Path root = new Path(fs.getWorkingDirectory(), "copytable");
152      fs.mkdirs(root);
153      while (true) {
154        bulkloadDir = new Path(root, "" + rand.nextLong());
155        if (!fs.exists(bulkloadDir)) {
156          break;
157        }
158      }
159
160      System.out.println("HFiles will be stored at " + this.bulkloadDir);
161      HFileOutputFormat2.setOutputPath(job, bulkloadDir);
162      try (Connection conn = ConnectionFactory.createConnection(getConf());
163          Admin admin = conn.getAdmin()) {
164        HFileOutputFormat2.configureIncrementalLoadMap(job,
165          admin.getDescriptor((TableName.valueOf(dstTableName))));
166      }
167    } else {
168      TableMapReduceUtil.initTableMapperJob(tableName, scan,
169        Import.Importer.class, null, null, job);
170
171      TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
172        null);
173    }
174
175    return job;
176  }
177
178  /*
179   * @param errorMsg Error message.  Can be null.
180   */
181  private static void printUsage(final String errorMsg) {
182    if (errorMsg != null && errorMsg.length() > 0) {
183      System.err.println("ERROR: " + errorMsg);
184    }
185    System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
186        "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
187    System.err.println();
188    System.err.println("Options:");
189    System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
190    System.err.println("              specify if different from current cluster");
191    System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
192    System.err.println(" startrow     the start row");
193    System.err.println(" stoprow      the stop row");
194    System.err.println(" starttime    beginning of the time range (unixtime in millis)");
195    System.err.println("              without endtime means from starttime to forever");
196    System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
197    System.err.println(" versions     number of cell versions to copy");
198    System.err.println(" new.name     new table's name");
199    System.err.println(" peer.adr     Address of the peer cluster given in the format");
200    System.err.println("              hbase.zookeeper.quorum:hbase.zookeeper.client"
201        + ".port:zookeeper.znode.parent");
202    System.err.println(" families     comma-separated list of families to copy");
203    System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
204    System.err.println("              To keep the same name, just give \"cfName\"");
205    System.err.println(" all.cells    also copy delete markers and deleted cells");
206    System.err.println(" bulkload     Write input into HFiles and bulk load to the destination "
207        + "table");
208    System.err.println();
209    System.err.println("Args:");
210    System.err.println(" tablename    Name of the table to copy");
211    System.err.println();
212    System.err.println("Examples:");
213    System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
214    System.err.println(" $ hbase " +
215        "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
216        "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
217    System.err.println("For performance consider the following general option:\n"
218        + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
219        + "  decreases the round trip time to the server and may increase performance.\n"
220        + "    -Dhbase.client.scanner.caching=100\n"
221        + "  The following should always be set to false, to prevent writing data twice, which may produce \n"
222        + "  inaccurate results.\n"
223        + "    -Dmapreduce.map.speculative=false");
224  }
225
226  private boolean doCommandLine(final String[] args) {
227    // Process command-line args. TODO: Better cmd-line processing
228    // (but hopefully something not as painful as cli options).
229    if (args.length < 1) {
230      printUsage(null);
231      return false;
232    }
233    try {
234      for (int i = 0; i < args.length; i++) {
235        String cmd = args[i];
236        if (cmd.equals("-h") || cmd.startsWith("--h")) {
237          printUsage(null);
238          return false;
239        }
240
241        final String startRowArgKey = "--startrow=";
242        if (cmd.startsWith(startRowArgKey)) {
243          startRow = cmd.substring(startRowArgKey.length());
244          continue;
245        }
246
247        final String stopRowArgKey = "--stoprow=";
248        if (cmd.startsWith(stopRowArgKey)) {
249          stopRow = cmd.substring(stopRowArgKey.length());
250          continue;
251        }
252
253        final String startTimeArgKey = "--starttime=";
254        if (cmd.startsWith(startTimeArgKey)) {
255          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
256          continue;
257        }
258
259        final String endTimeArgKey = "--endtime=";
260        if (cmd.startsWith(endTimeArgKey)) {
261          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
262          continue;
263        }
264
265        final String batchArgKey = "--batch=";
266        if (cmd.startsWith(batchArgKey)) {
267          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
268          continue;
269        }
270
271        final String cacheRowArgKey = "--cacheRow=";
272        if (cmd.startsWith(cacheRowArgKey)) {
273          cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
274          continue;
275        }
276
277        final String versionsArgKey = "--versions=";
278        if (cmd.startsWith(versionsArgKey)) {
279          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
280          continue;
281        }
282
283        final String newNameArgKey = "--new.name=";
284        if (cmd.startsWith(newNameArgKey)) {
285          dstTableName = cmd.substring(newNameArgKey.length());
286          continue;
287        }
288
289        final String peerAdrArgKey = "--peer.adr=";
290        if (cmd.startsWith(peerAdrArgKey)) {
291          peerAddress = cmd.substring(peerAdrArgKey.length());
292          continue;
293        }
294
295        final String familiesArgKey = "--families=";
296        if (cmd.startsWith(familiesArgKey)) {
297          families = cmd.substring(familiesArgKey.length());
298          continue;
299        }
300
301        if (cmd.startsWith("--all.cells")) {
302          allCells = true;
303          continue;
304        }
305
306        if (cmd.startsWith("--bulkload")) {
307          bulkload = true;
308          continue;
309        }
310
311        if (cmd.startsWith("--shuffle")) {
312          shuffle = true;
313          continue;
314        }
315
316        if (i == args.length-1) {
317          tableName = cmd;
318        } else {
319          printUsage("Invalid argument '" + cmd + "'");
320          return false;
321        }
322      }
323      if (dstTableName == null && peerAddress == null) {
324        printUsage("At least a new table name or a " +
325            "peer address must be specified");
326        return false;
327      }
328      if ((endTime != 0) && (startTime > endTime)) {
329        printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
330        return false;
331      }
332
333      if (bulkload && peerAddress != null) {
334        printUsage("Remote bulkload is not supported!");
335        return false;
336      }
337
338      // set dstTableName if necessary
339      if (dstTableName == null) {
340        dstTableName = tableName;
341      }
342    } catch (Exception e) {
343      e.printStackTrace();
344      printUsage("Can't start because " + e.getMessage());
345      return false;
346    }
347    return true;
348  }
349
350  /**
351   * Main entry point.
352   *
353   * @param args  The command line parameters.
354   * @throws Exception When running the job fails.
355   */
356  public static void main(String[] args) throws Exception {
357    int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args);
358    System.exit(ret);
359  }
360
361  @Override
362  public int run(String[] args) throws Exception {
363    Job job = createSubmittableJob(args);
364    if (job == null) return 1;
365    if (!job.waitForCompletion(true)) {
366      LOG.info("Map-reduce job failed!");
367      if (bulkload) {
368        LOG.info("Files are not bulkloaded!");
369      }
370      return 1;
371    }
372    int code = 0;
373    if (bulkload) {
374      code = new LoadIncrementalHFiles(this.getConf())
375          .run(new String[] { this.bulkloadDir.toString(), this.dstTableName });
376      if (code == 0) {
377        // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
378        // LoadIncrementalHFiles.
379        FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
380        if (!fs.delete(this.bulkloadDir, true)) {
381          LOG.error("Deleting folder " + bulkloadDir + " failed!");
382          code = 1;
383        }
384      }
385    }
386    return code;
387  }
388}