View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import org.apache.hadoop.hbase.classification.InterfaceAudience;
22  import org.apache.hadoop.hbase.classification.InterfaceStability;
23  
24  import java.io.IOException;
25  import java.util.HashMap;
26  import java.util.Map;
27  import java.util.Random;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.conf.Configured;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HBaseConfiguration;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.client.Connection;
39  import org.apache.hadoop.hbase.client.ConnectionFactory;
40  import org.apache.hadoop.hbase.client.Scan;
41  import org.apache.hadoop.hbase.client.Table;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.FSUtils;
44  import org.apache.hadoop.mapreduce.Job;
45  import org.apache.hadoop.util.GenericOptionsParser;
46  import org.apache.hadoop.util.Tool;
47  import org.apache.hadoop.util.ToolRunner;
48  
49  /**
50   * Tool used to copy a table to another one which can be on a different setup.
51   * It is also configurable with a start and time as well as a specification
52   * of the region server implementation if different from the local cluster.
53   */
54  @InterfaceAudience.Public
55  @InterfaceStability.Stable
56  public class CopyTable extends Configured implements Tool {
57    private static final Log LOG = LogFactory.getLog(CopyTable.class);
58  
59    final static String NAME = "copytable";
60    long startTime = 0;
61    long endTime = 0;
62    int versions = -1;
63    String tableName = null;
64    String startRow = null;
65    String stopRow = null;
66    String dstTableName = null;
67    String peerAddress = null;
68    String families = null;
69    boolean allCells = false;
70    static boolean shuffle = false;
71  
72    boolean bulkload = false;
73    Path bulkloadDir = null;
74  
75    private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
76  
77    public CopyTable(Configuration conf) {
78      super(conf);
79    }
80    /**
81     * Sets up the actual job.
82     *
83     * @param args  The command line parameters.
84     * @return The newly created job.
85     * @throws IOException When setting up the job fails.
86     */
87    public Job createSubmittableJob(String[] args)
88    throws IOException {
89      if (!doCommandLine(args)) {
90        return null;
91      }
92  
93      Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
94  
95      job.setJarByClass(CopyTable.class);
96      Scan scan = new Scan();
97      scan.setCacheBlocks(false);
98      if (startTime != 0) {
99        scan.setTimeRange(startTime,
100           endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
101     }
102     if (allCells) {
103       scan.setRaw(true);
104     }
105     if (shuffle) {
106       job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
107     }
108     if (versions >= 0) {
109       scan.setMaxVersions(versions);
110     }
111 
112     if (startRow != null) {
113       scan.setStartRow(Bytes.toBytes(startRow));
114     }
115 
116     if (stopRow != null) {
117       scan.setStopRow(Bytes.toBytes(stopRow));
118     }
119 
120     if(families != null) {
121       String[] fams = families.split(",");
122       Map<String,String> cfRenameMap = new HashMap<String,String>();
123       for(String fam : fams) {
124         String sourceCf;
125         if(fam.contains(":")) {
126             // fam looks like "sourceCfName:destCfName"
127             String[] srcAndDest = fam.split(":", 2);
128             sourceCf = srcAndDest[0];
129             String destCf = srcAndDest[1];
130             cfRenameMap.put(sourceCf, destCf);
131         } else {
132             // fam is just "sourceCf"
133             sourceCf = fam;
134         }
135         scan.addFamily(Bytes.toBytes(sourceCf));
136       }
137       Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
138     }
139     job.setNumReduceTasks(0);
140 
141     if (bulkload) {
142       TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
143         null, job);
144 
145       // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
146       TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
147 
148       FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
149       Random rand = new Random();
150       Path root = new Path(fs.getWorkingDirectory(), "copytable");
151       fs.mkdirs(root);
152       while (true) {
153         bulkloadDir = new Path(root, "" + rand.nextLong());
154         if (!fs.exists(bulkloadDir)) {
155           break;
156         }
157       }
158 
159       System.out.println("HFiles will be stored at " + this.bulkloadDir);
160       HFileOutputFormat2.setOutputPath(job, bulkloadDir);
161       try (Connection conn = ConnectionFactory.createConnection(getConf());
162           Table htable = conn.getTable(TableName.valueOf(dstTableName))) {
163         HFileOutputFormat2.configureIncrementalLoadMap(job, htable);
164       }
165     } else {
166       TableMapReduceUtil.initTableMapperJob(tableName, scan,
167         Import.Importer.class, null, null, job);
168 
169       TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
170         null);
171     }
172 
173     return job;
174   }
175 
176   /*
177    * @param errorMsg Error message.  Can be null.
178    */
179   private static void printUsage(final String errorMsg) {
180     if (errorMsg != null && errorMsg.length() > 0) {
181       System.err.println("ERROR: " + errorMsg);
182     }
183     System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
184         "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
185     System.err.println();
186     System.err.println("Options:");
187     System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
188     System.err.println("              specify if different from current cluster");
189     System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
190     System.err.println(" startrow     the start row");
191     System.err.println(" stoprow      the stop row");
192     System.err.println(" starttime    beginning of the time range (unixtime in millis)");
193     System.err.println("              without endtime means from starttime to forever");
194     System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
195     System.err.println(" versions     number of cell versions to copy");
196     System.err.println(" new.name     new table's name");
197     System.err.println(" peer.adr     Address of the peer cluster given in the format");
198     System.err.println("              hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent");
199     System.err.println(" families     comma-separated list of families to copy");
200     System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
201     System.err.println("              To keep the same name, just give \"cfName\"");
202     System.err.println(" all.cells    also copy delete markers and deleted cells");
203     System.err.println(" bulkload     Write input into HFiles and bulk load to the destination "
204         + "table");
205     System.err.println();
206     System.err.println("Args:");
207     System.err.println(" tablename    Name of the table to copy");
208     System.err.println();
209     System.err.println("Examples:");
210     System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
211     System.err.println(" $ bin/hbase " +
212         "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
213         "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
214     System.err.println("For performance consider the following general option:\n"
215         + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
216         + "  decreases the round trip time to the server and may increase performance.\n"
217         + "    -Dhbase.client.scanner.caching=100\n"
218         + "  The following should always be set to false, to prevent writing data twice, which may produce \n"
219         + "  inaccurate results.\n"
220         + "    -Dmapreduce.map.speculative=false");
221   }
222 
223   private boolean doCommandLine(final String[] args) {
224     // Process command-line args. TODO: Better cmd-line processing
225     // (but hopefully something not as painful as cli options).
226     if (args.length < 1) {
227       printUsage(null);
228       return false;
229     }
230     try {
231       for (int i = 0; i < args.length; i++) {
232         String cmd = args[i];
233         if (cmd.equals("-h") || cmd.startsWith("--h")) {
234           printUsage(null);
235           return false;
236         }
237 
238         final String startRowArgKey = "--startrow=";
239         if (cmd.startsWith(startRowArgKey)) {
240           startRow = cmd.substring(startRowArgKey.length());
241           continue;
242         }
243 
244         final String stopRowArgKey = "--stoprow=";
245         if (cmd.startsWith(stopRowArgKey)) {
246           stopRow = cmd.substring(stopRowArgKey.length());
247           continue;
248         }
249 
250         final String startTimeArgKey = "--starttime=";
251         if (cmd.startsWith(startTimeArgKey)) {
252           startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
253           continue;
254         }
255 
256         final String endTimeArgKey = "--endtime=";
257         if (cmd.startsWith(endTimeArgKey)) {
258           endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
259           continue;
260         }
261 
262         final String versionsArgKey = "--versions=";
263         if (cmd.startsWith(versionsArgKey)) {
264           versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
265           continue;
266         }
267 
268         final String newNameArgKey = "--new.name=";
269         if (cmd.startsWith(newNameArgKey)) {
270           dstTableName = cmd.substring(newNameArgKey.length());
271           continue;
272         }
273 
274         final String peerAdrArgKey = "--peer.adr=";
275         if (cmd.startsWith(peerAdrArgKey)) {
276           peerAddress = cmd.substring(peerAdrArgKey.length());
277           continue;
278         }
279 
280         final String familiesArgKey = "--families=";
281         if (cmd.startsWith(familiesArgKey)) {
282           families = cmd.substring(familiesArgKey.length());
283           continue;
284         }
285 
286         if (cmd.startsWith("--all.cells")) {
287           allCells = true;
288           continue;
289         }
290 
291         if (cmd.startsWith("--bulkload")) {
292           bulkload = true;
293           continue;
294         }
295 
296         if (cmd.startsWith("--shuffle")) {
297           shuffle = true;
298           continue;
299         }
300 
301         if (i == args.length-1) {
302           tableName = cmd;
303         } else {
304           printUsage("Invalid argument '" + cmd + "'" );
305           return false;
306         }
307       }
308       if (dstTableName == null && peerAddress == null) {
309         printUsage("At least a new table name or a " +
310             "peer address must be specified");
311         return false;
312       }
313       if ((endTime != 0) && (startTime > endTime)) {
314         printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
315         return false;
316       }
317 
318       if (bulkload && peerAddress != null) {
319         printUsage("Remote bulkload is not supported!");
320         return false;
321       }
322 
323       // set dstTableName if necessary
324       if (dstTableName == null) {
325         dstTableName = tableName;
326       }
327     } catch (Exception e) {
328       e.printStackTrace();
329       printUsage("Can't start because " + e.getMessage());
330       return false;
331     }
332     return true;
333   }
334 
335   /**
336    * Main entry point.
337    *
338    * @param args  The command line parameters.
339    * @throws Exception When running the job fails.
340    */
341   public static void main(String[] args) throws Exception {
342     int ret = ToolRunner.run(new CopyTable(HBaseConfiguration.create()), args);
343     System.exit(ret);
344   }
345 
346   @Override
347   public int run(String[] args) throws Exception {
348     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
349     Job job = createSubmittableJob(otherArgs);
350     if (job == null) return 1;
351     if (!job.waitForCompletion(true)) {
352       LOG.info("Map-reduce job failed!");
353       if (bulkload) {
354         LOG.info("Files are not bulkloaded!");
355       }
356       return 1;
357     }
358     int code = 0;
359     if (bulkload) {
360       code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(),
361           this.dstTableName});
362       if (code == 0) {
363         // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
364         // LoadIncrementalHFiles.
365         FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
366         if (!fs.delete(this.bulkloadDir, true)) {
367           LOG.error("Deleting folder " + bulkloadDir + " failed!");
368           code = 1;
369         }
370       }
371     }
372     return code;
373   }
374 }