View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  import java.util.HashMap;
23  import java.util.Map;
24  import java.util.Random;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configured;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.HBaseConfiguration;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.classification.InterfaceStability;
36  import org.apache.hadoop.hbase.client.Admin;
37  import org.apache.hadoop.hbase.client.Connection;
38  import org.apache.hadoop.hbase.client.ConnectionFactory;
39  import org.apache.hadoop.hbase.client.Scan;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.mapreduce.Job;
42  import org.apache.hadoop.util.Tool;
43  import org.apache.hadoop.util.ToolRunner;
44  
45  /**
46   * Tool used to copy a table to another one which can be on a different setup.
47   * It is also configurable with a start and time as well as a specification
48   * of the region server implementation if different from the local cluster.
49   */
50  @InterfaceAudience.Public
51  @InterfaceStability.Stable
52  public class CopyTable extends Configured implements Tool {
53    private static final Log LOG = LogFactory.getLog(CopyTable.class);
54  
55    final static String NAME = "copytable";
56    long startTime = 0;
57    long endTime = HConstants.LATEST_TIMESTAMP;
58    int batch = Integer.MAX_VALUE;
59    int cacheRow = -1;
60    int versions = -1;
61    String tableName = null;
62    String startRow = null;
63    String stopRow = null;
64    String dstTableName = null;
65    String peerAddress = null;
66    String families = null;
67    boolean allCells = false;
68    static boolean shuffle = false;
69  
70    boolean bulkload = false;
71    Path bulkloadDir = null;
72  
73    private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
74  
75    /**
76     * Sets up the actual job.
77     *
78     * @param args  The command line parameters.
79     * @return The newly created job.
80     * @throws IOException When setting up the job fails.
81     */
82    public Job createSubmittableJob(String[] args)
83    throws IOException {
84      if (!doCommandLine(args)) {
85        return null;
86      }
87      
88      Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
89      job.setJarByClass(CopyTable.class);
90      Scan scan = new Scan();
91      
92      scan.setBatch(batch);
93      scan.setCacheBlocks(false);
94      
95      if (cacheRow > 0) {
96        scan.setCaching(cacheRow);
97      } else {
98        scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100));
99      }
100     
101     scan.setTimeRange(startTime, endTime);
102     
103     if (allCells) {
104       scan.setRaw(true);
105     }
106     if (shuffle) {
107       job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
108     }
109     if (versions >= 0) {
110       scan.setMaxVersions(versions);
111     }
112 
113     if (startRow != null) {
114       scan.setStartRow(Bytes.toBytesBinary(startRow));
115     }
116 
117     if (stopRow != null) {
118       scan.setStopRow(Bytes.toBytesBinary(stopRow));
119     }
120 
121     if(families != null) {
122       String[] fams = families.split(",");
123       Map<String,String> cfRenameMap = new HashMap<String,String>();
124       for(String fam : fams) {
125         String sourceCf;
126         if(fam.contains(":")) {
127             // fam looks like "sourceCfName:destCfName"
128             String[] srcAndDest = fam.split(":", 2);
129             sourceCf = srcAndDest[0];
130             String destCf = srcAndDest[1];
131             cfRenameMap.put(sourceCf, destCf);
132         } else {
133             // fam is just "sourceCf"
134             sourceCf = fam;
135         }
136         scan.addFamily(Bytes.toBytes(sourceCf));
137       }
138       Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
139     }
140     job.setNumReduceTasks(0);
141 
142     if (bulkload) {
143       TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
144         null, job);
145 
146       // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
147       TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
148 
149       FileSystem fs = FileSystem.get(getConf());
150       Random rand = new Random();
151       Path root = new Path(fs.getWorkingDirectory(), "copytable");
152       fs.mkdirs(root);
153       while (true) {
154         bulkloadDir = new Path(root, "" + rand.nextLong());
155         if (!fs.exists(bulkloadDir)) {
156           break;
157         }
158       }
159 
160       System.out.println("HFiles will be stored at " + this.bulkloadDir);
161       HFileOutputFormat2.setOutputPath(job, bulkloadDir);
162       try (Connection conn = ConnectionFactory.createConnection(getConf());
163           Admin admin = conn.getAdmin()) {
164         HFileOutputFormat2.configureIncrementalLoadMap(job,
165             admin.getTableDescriptor((TableName.valueOf(dstTableName))));
166       }
167     } else {
168       TableMapReduceUtil.initTableMapperJob(tableName, scan,
169         Import.Importer.class, null, null, job);
170 
171       TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
172         null);
173     }
174 
175     return job;
176   }
177 
178   /*
179    * @param errorMsg Error message.  Can be null.
180    */
181   private static void printUsage(final String errorMsg) {
182     if (errorMsg != null && errorMsg.length() > 0) {
183       System.err.println("ERROR: " + errorMsg);
184     }
185     System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
186         "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
187     System.err.println();
188     System.err.println("Options:");
189     System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
190     System.err.println("              specify if different from current cluster");
191     System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
192     System.err.println(" startrow     the start row");
193     System.err.println(" stoprow      the stop row");
194     System.err.println(" starttime    beginning of the time range (unixtime in millis)");
195     System.err.println("              without endtime means from starttime to forever");
196     System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
197     System.err.println(" versions     number of cell versions to copy");
198     System.err.println(" new.name     new table's name");
199     System.err.println(" peer.adr     Address of the peer cluster given in the format");
200     System.err.println("              hbase.zookeeper.quorum:hbase.zookeeper.client"
201         + ".port:zookeeper.znode.parent");
202     System.err.println(" families     comma-separated list of families to copy");
203     System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
204     System.err.println("              To keep the same name, just give \"cfName\"");
205     System.err.println(" all.cells    also copy delete markers and deleted cells");
206     System.err.println(" bulkload     Write input into HFiles and bulk load to the destination "
207         + "table");
208     System.err.println();
209     System.err.println("Args:");
210     System.err.println(" tablename    Name of the table to copy");
211     System.err.println();
212     System.err.println("Examples:");
213     System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
214     System.err.println(" $ bin/hbase " +
215         "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
216         "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
217     System.err.println("For performance consider the following general option:\n"
218         + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
219         + "  decreases the round trip time to the server and may increase performance.\n"
220         + "    -Dhbase.client.scanner.caching=100\n"
221         + "  The following should always be set to false, to prevent writing data twice, which may produce \n"
222         + "  inaccurate results.\n"
223         + "    -Dmapreduce.map.speculative=false");
224   }
225 
226   private boolean doCommandLine(final String[] args) {
227     // Process command-line args. TODO: Better cmd-line processing
228     // (but hopefully something not as painful as cli options).
229     if (args.length < 1) {
230       printUsage(null);
231       return false;
232     }
233     try {
234       for (int i = 0; i < args.length; i++) {
235         String cmd = args[i];
236         if (cmd.equals("-h") || cmd.startsWith("--h")) {
237           printUsage(null);
238           return false;
239         }
240 
241         final String startRowArgKey = "--startrow=";
242         if (cmd.startsWith(startRowArgKey)) {
243           startRow = cmd.substring(startRowArgKey.length());
244           continue;
245         }
246 
247         final String stopRowArgKey = "--stoprow=";
248         if (cmd.startsWith(stopRowArgKey)) {
249           stopRow = cmd.substring(stopRowArgKey.length());
250           continue;
251         }
252 
253         final String startTimeArgKey = "--starttime=";
254         if (cmd.startsWith(startTimeArgKey)) {
255           startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
256           continue;
257         }
258 
259         final String endTimeArgKey = "--endtime=";
260         if (cmd.startsWith(endTimeArgKey)) {
261           endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
262           continue;
263         }
264         
265         final String batchArgKey = "--batch=";
266         if (cmd.startsWith(batchArgKey)) {
267           batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
268           continue;
269         }
270         
271         final String cacheRowArgKey = "--cacheRow=";
272         if (cmd.startsWith(cacheRowArgKey)) {
273           cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
274           continue;
275         }
276 
277         final String versionsArgKey = "--versions=";
278         if (cmd.startsWith(versionsArgKey)) {
279           versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
280           continue;
281         }
282 
283         final String newNameArgKey = "--new.name=";
284         if (cmd.startsWith(newNameArgKey)) {
285           dstTableName = cmd.substring(newNameArgKey.length());
286           continue;
287         }
288 
289         final String peerAdrArgKey = "--peer.adr=";
290         if (cmd.startsWith(peerAdrArgKey)) {
291           peerAddress = cmd.substring(peerAdrArgKey.length());
292           continue;
293         }
294 
295         final String familiesArgKey = "--families=";
296         if (cmd.startsWith(familiesArgKey)) {
297           families = cmd.substring(familiesArgKey.length());
298           continue;
299         }
300 
301         if (cmd.startsWith("--all.cells")) {
302           allCells = true;
303           continue;
304         }
305 
306         if (cmd.startsWith("--bulkload")) {
307           bulkload = true;
308           continue;
309         }
310 
311         if (cmd.startsWith("--shuffle")) {
312           shuffle = true;
313           continue;
314         }
315 
316         if (i == args.length-1) {
317           tableName = cmd;
318         } else {
319           printUsage("Invalid argument '" + cmd + "'");
320           return false;
321         }
322       }
323       if (dstTableName == null && peerAddress == null) {
324         printUsage("At least a new table name or a " +
325             "peer address must be specified");
326         return false;
327       }
328       if ((endTime != 0) && (startTime > endTime)) {
329         printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
330         return false;
331       }
332 
333       if (bulkload && peerAddress != null) {
334         printUsage("Remote bulkload is not supported!");
335         return false;
336       }
337 
338       // set dstTableName if necessary
339       if (dstTableName == null) {
340         dstTableName = tableName;
341       }
342     } catch (Exception e) {
343       e.printStackTrace();
344       printUsage("Can't start because " + e.getMessage());
345       return false;
346     }
347     return true;
348   }
349 
350   /**
351    * Main entry point.
352    *
353    * @param args  The command line parameters.
354    * @throws Exception When running the job fails.
355    */
356   public static void main(String[] args) throws Exception {
357     int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args);
358     System.exit(ret);
359   }
360 
361   @Override
362   public int run(String[] args) throws Exception {
363     Job job = createSubmittableJob(args);
364     if (job == null) return 1;
365     if (!job.waitForCompletion(true)) {
366       LOG.info("Map-reduce job failed!");
367       if (bulkload) {
368         LOG.info("Files are not bulkloaded!");
369       }
370       return 1;
371     }
372     int code = 0;
373     if (bulkload) {
374       code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(),
375           this.dstTableName});
376       if (code == 0) {
377         // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
378         // LoadIncrementalHFiles.
379         FileSystem fs = FileSystem.get(this.getConf());
380         if (!fs.delete(this.bulkloadDir, true)) {
381           LOG.error("Deleting folder " + bulkloadDir + " failed!");
382           code = 1;
383         }
384       }
385     }
386     return code;
387   }
388 }