View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  import java.util.HashMap;
23  import java.util.Map;
24  
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.conf.Configured;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.client.Scan;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.mapreduce.Job;
34  import org.apache.hadoop.util.GenericOptionsParser;
35  import org.apache.hadoop.util.Tool;
36  import org.apache.hadoop.util.ToolRunner;
37  
38  /**
39   * Tool used to copy a table to another one which can be on a different setup.
40   * It is also configurable with a start and time as well as a specification
41   * of the region server implementation if different from the local cluster.
42   */
43  @InterfaceAudience.Public
44  @InterfaceStability.Stable
45  public class CopyTable extends Configured implements Tool {
46  
47    final static String NAME = "copytable";
48    static long startTime = 0;
49    static long endTime = 0;
50    static int versions = -1;
51    static String tableName = null;
52    static String startRow = null;
53    static String stopRow = null;
54    static String newTableName = null;
55    static String peerAddress = null;
56    static String families = null;
57    static boolean allCells = false;
58    
59    public CopyTable(Configuration conf) {
60      super(conf);
61    }
62    /**
63     * Sets up the actual job.
64     *
65     * @param conf  The current configuration.
66     * @param args  The command line parameters.
67     * @return The newly created job.
68     * @throws IOException When setting up the job fails.
69     */
70    public static Job createSubmittableJob(Configuration conf, String[] args)
71    throws IOException {
72      if (!doCommandLine(args)) {
73        return null;
74      }
75      Job job = Job.getInstance(conf, NAME + "_" + tableName);
76      job.setJarByClass(CopyTable.class);
77      Scan scan = new Scan();
78      scan.setCacheBlocks(false);
79      if (startTime != 0) {
80        scan.setTimeRange(startTime,
81            endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
82      }
83      if (allCells) {
84        scan.setRaw(true);
85      }
86      if (versions >= 0) {
87        scan.setMaxVersions(versions);
88      }
89      
90      if (startRow != null) {
91        scan.setStartRow(Bytes.toBytes(startRow));
92      }
93      
94      if (stopRow != null) {
95        scan.setStopRow(Bytes.toBytes(stopRow));
96      }
97      
98      if(families != null) {
99        String[] fams = families.split(",");
100       Map<String,String> cfRenameMap = new HashMap<String,String>();
101       for(String fam : fams) {
102         String sourceCf;
103         if(fam.contains(":")) { 
104             // fam looks like "sourceCfName:destCfName"
105             String[] srcAndDest = fam.split(":", 2);
106             sourceCf = srcAndDest[0];
107             String destCf = srcAndDest[1];
108             cfRenameMap.put(sourceCf, destCf);
109         } else {
110             // fam is just "sourceCf"
111             sourceCf = fam; 
112         }
113         scan.addFamily(Bytes.toBytes(sourceCf));
114       }
115       Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
116     }
117     TableMapReduceUtil.initTableMapperJob(tableName, scan,
118         Import.Importer.class, null, null, job);
119     TableMapReduceUtil.initTableReducerJob(
120         newTableName == null ? tableName : newTableName, null, job,
121         null, peerAddress, null, null);
122     job.setNumReduceTasks(0);
123     return job;
124   }
125 
126   /*
127    * @param errorMsg Error message.  Can be null.
128    */
129   private static void printUsage(final String errorMsg) {
130     if (errorMsg != null && errorMsg.length() > 0) {
131       System.err.println("ERROR: " + errorMsg);
132     }
133     System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
134         "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
135     System.err.println();
136     System.err.println("Options:");
137     System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
138     System.err.println("              specify if different from current cluster");
139     System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
140     System.err.println(" startrow     the start row");
141     System.err.println(" stoprow      the stop row");
142     System.err.println(" starttime    beginning of the time range (unixtime in millis)");
143     System.err.println("              without endtime means from starttime to forever");
144     System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
145     System.err.println(" versions     number of cell versions to copy");
146     System.err.println(" new.name     new table's name");
147     System.err.println(" peer.adr     Address of the peer cluster given in the format");
148     System.err.println("              hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent");
149     System.err.println(" families     comma-separated list of families to copy");
150     System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
151     System.err.println("              To keep the same name, just give \"cfName\"");
152     System.err.println(" all.cells    also copy delete markers and deleted cells");
153     System.err.println();
154     System.err.println("Args:");
155     System.err.println(" tablename    Name of the table to copy");
156     System.err.println();
157     System.err.println("Examples:");
158     System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
159     System.err.println(" $ bin/hbase " +
160         "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
161         "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
162     System.err.println("For performance consider the following general option:\n"
163         + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
164         + "  decreases the round trip time to the server and may increase performance.\n"
165         + "    -Dhbase.client.scanner.caching=100\n"
166         + "  The following should always be set to false, to prevent writing data twice, which may produce \n"
167         + "  inaccurate results.\n"
168         + "    -Dmapreduce.map.speculative=false");
169   }
170 
171   private static boolean doCommandLine(final String[] args) {
172     // Process command-line args. TODO: Better cmd-line processing
173     // (but hopefully something not as painful as cli options).
174     if (args.length < 1) {
175       printUsage(null);
176       return false;
177     }
178     try {
179       for (int i = 0; i < args.length; i++) {
180         String cmd = args[i];
181         if (cmd.equals("-h") || cmd.startsWith("--h")) {
182           printUsage(null);
183           return false;
184         }
185         
186         final String startRowArgKey = "--startrow=";
187         if (cmd.startsWith(startRowArgKey)) {
188           startRow = cmd.substring(startRowArgKey.length());
189           continue;
190         }
191         
192         final String stopRowArgKey = "--stoprow=";
193         if (cmd.startsWith(stopRowArgKey)) {
194           stopRow = cmd.substring(stopRowArgKey.length());
195           continue;
196         }
197         
198         final String startTimeArgKey = "--starttime=";
199         if (cmd.startsWith(startTimeArgKey)) {
200           startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
201           continue;
202         }
203 
204         final String endTimeArgKey = "--endtime=";
205         if (cmd.startsWith(endTimeArgKey)) {
206           endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
207           continue;
208         }
209 
210         final String versionsArgKey = "--versions=";
211         if (cmd.startsWith(versionsArgKey)) {
212           versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
213           continue;
214         }
215 
216         final String newNameArgKey = "--new.name=";
217         if (cmd.startsWith(newNameArgKey)) {
218           newTableName = cmd.substring(newNameArgKey.length());
219           continue;
220         }
221 
222         final String peerAdrArgKey = "--peer.adr=";
223         if (cmd.startsWith(peerAdrArgKey)) {
224           peerAddress = cmd.substring(peerAdrArgKey.length());
225           continue;
226         }
227 
228         final String familiesArgKey = "--families=";
229         if (cmd.startsWith(familiesArgKey)) {
230           families = cmd.substring(familiesArgKey.length());
231           continue;
232         }
233 
234         if (cmd.startsWith("--all.cells")) {
235           allCells = true;
236           continue;
237         }
238 
239         if (i == args.length-1) {
240           tableName = cmd;
241         } else {
242           printUsage("Invalid argument '" + cmd + "'" );
243           return false;
244         }
245       }
246       if (newTableName == null && peerAddress == null) {
247         printUsage("At least a new table name or a " +
248             "peer address must be specified");
249         return false;
250       }
251       if ((endTime != 0) && (startTime > endTime)) {
252         printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
253         return false;
254       }
255     } catch (Exception e) {
256       e.printStackTrace();
257       printUsage("Can't start because " + e.getMessage());
258       return false;
259     }
260     return true;
261   }
262 
263   /**
264    * Main entry point.
265    *
266    * @param args  The command line parameters.
267    * @throws Exception When running the job fails.
268    */
269   public static void main(String[] args) throws Exception {
270     int ret = ToolRunner.run(new CopyTable(HBaseConfiguration.create()), args);
271     System.exit(ret);
272   }
273 
274   @Override
275   public int run(String[] args) throws Exception {
276     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
277     Job job = createSubmittableJob(getConf(), otherArgs);
278     if (job == null) return 1;
279     return job.waitForCompletion(true) ? 0 : 1;
280   }
281 }