View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  import java.util.HashMap;
23  import java.util.Map;
24  
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.conf.Configured;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.client.Scan;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.mapreduce.Job;
34  import org.apache.hadoop.util.GenericOptionsParser;
35  import org.apache.hadoop.util.Tool;
36  import org.apache.hadoop.util.ToolRunner;
37  
38  /**
39   * Tool used to copy a table to another one which can be on a different setup.
40   * It is also configurable with a start and time as well as a specification
41   * of the region server implementation if different from the local cluster.
42   */
43  @InterfaceAudience.Public
44  @InterfaceStability.Stable
45  public class CopyTable extends Configured implements Tool {
46  
47    final static String NAME = "copytable";
48    static long startTime = 0;
49    static long endTime = 0;
50    static int versions = -1;
51    static String tableName = null;
52    static String startRow = null;
53    static String stopRow = null;
54    static String newTableName = null;
55    static String peerAddress = null;
56    static String families = null;
57    static boolean allCells = false;
58  
59    private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
60  
61    public CopyTable(Configuration conf) {
62      super(conf);
63    }
64    /**
65     * Sets up the actual job.
66     *
67     * @param conf  The current configuration.
68     * @param args  The command line parameters.
69     * @return The newly created job.
70     * @throws IOException When setting up the job fails.
71     */
72    public static Job createSubmittableJob(Configuration conf, String[] args)
73    throws IOException {
74      if (!doCommandLine(args)) {
75        return null;
76      }
77      Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
78      job.setJarByClass(CopyTable.class);
79      Scan scan = new Scan();
80      scan.setCacheBlocks(false);
81      if (startTime != 0) {
82        scan.setTimeRange(startTime,
83            endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
84      }
85      if (allCells) {
86        scan.setRaw(true);
87      }
88      if (versions >= 0) {
89        scan.setMaxVersions(versions);
90      }
91      
92      if (startRow != null) {
93        scan.setStartRow(Bytes.toBytes(startRow));
94      }
95      
96      if (stopRow != null) {
97        scan.setStopRow(Bytes.toBytes(stopRow));
98      }
99      
100     if(families != null) {
101       String[] fams = families.split(",");
102       Map<String,String> cfRenameMap = new HashMap<String,String>();
103       for(String fam : fams) {
104         String sourceCf;
105         if(fam.contains(":")) { 
106             // fam looks like "sourceCfName:destCfName"
107             String[] srcAndDest = fam.split(":", 2);
108             sourceCf = srcAndDest[0];
109             String destCf = srcAndDest[1];
110             cfRenameMap.put(sourceCf, destCf);
111         } else {
112             // fam is just "sourceCf"
113             sourceCf = fam; 
114         }
115         scan.addFamily(Bytes.toBytes(sourceCf));
116       }
117       Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
118     }
119     TableMapReduceUtil.initTableMapperJob(tableName, scan,
120         Import.Importer.class, null, null, job);
121     TableMapReduceUtil.initTableReducerJob(
122         newTableName == null ? tableName : newTableName, null, job,
123         null, peerAddress, null, null);
124     job.setNumReduceTasks(0);
125     return job;
126   }
127 
128   /*
129    * @param errorMsg Error message.  Can be null.
130    */
131   private static void printUsage(final String errorMsg) {
132     if (errorMsg != null && errorMsg.length() > 0) {
133       System.err.println("ERROR: " + errorMsg);
134     }
135     System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
136         "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
137     System.err.println();
138     System.err.println("Options:");
139     System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
140     System.err.println("              specify if different from current cluster");
141     System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
142     System.err.println(" startrow     the start row");
143     System.err.println(" stoprow      the stop row");
144     System.err.println(" starttime    beginning of the time range (unixtime in millis)");
145     System.err.println("              without endtime means from starttime to forever");
146     System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
147     System.err.println(" versions     number of cell versions to copy");
148     System.err.println(" new.name     new table's name");
149     System.err.println(" peer.adr     Address of the peer cluster given in the format");
150     System.err.println("              hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent");
151     System.err.println(" families     comma-separated list of families to copy");
152     System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
153     System.err.println("              To keep the same name, just give \"cfName\"");
154     System.err.println(" all.cells    also copy delete markers and deleted cells");
155     System.err.println();
156     System.err.println("Args:");
157     System.err.println(" tablename    Name of the table to copy");
158     System.err.println();
159     System.err.println("Examples:");
160     System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
161     System.err.println(" $ bin/hbase " +
162         "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
163         "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
164     System.err.println("For performance consider the following general option:\n"
165         + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
166         + "  decreases the round trip time to the server and may increase performance.\n"
167         + "    -Dhbase.client.scanner.caching=100\n"
168         + "  The following should always be set to false, to prevent writing data twice, which may produce \n"
169         + "  inaccurate results.\n"
170         + "    -Dmapreduce.map.speculative=false");
171   }
172 
173   private static boolean doCommandLine(final String[] args) {
174     // Process command-line args. TODO: Better cmd-line processing
175     // (but hopefully something not as painful as cli options).
176     if (args.length < 1) {
177       printUsage(null);
178       return false;
179     }
180     try {
181       for (int i = 0; i < args.length; i++) {
182         String cmd = args[i];
183         if (cmd.equals("-h") || cmd.startsWith("--h")) {
184           printUsage(null);
185           return false;
186         }
187         
188         final String startRowArgKey = "--startrow=";
189         if (cmd.startsWith(startRowArgKey)) {
190           startRow = cmd.substring(startRowArgKey.length());
191           continue;
192         }
193         
194         final String stopRowArgKey = "--stoprow=";
195         if (cmd.startsWith(stopRowArgKey)) {
196           stopRow = cmd.substring(stopRowArgKey.length());
197           continue;
198         }
199         
200         final String startTimeArgKey = "--starttime=";
201         if (cmd.startsWith(startTimeArgKey)) {
202           startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
203           continue;
204         }
205 
206         final String endTimeArgKey = "--endtime=";
207         if (cmd.startsWith(endTimeArgKey)) {
208           endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
209           continue;
210         }
211 
212         final String versionsArgKey = "--versions=";
213         if (cmd.startsWith(versionsArgKey)) {
214           versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
215           continue;
216         }
217 
218         final String newNameArgKey = "--new.name=";
219         if (cmd.startsWith(newNameArgKey)) {
220           newTableName = cmd.substring(newNameArgKey.length());
221           continue;
222         }
223 
224         final String peerAdrArgKey = "--peer.adr=";
225         if (cmd.startsWith(peerAdrArgKey)) {
226           peerAddress = cmd.substring(peerAdrArgKey.length());
227           continue;
228         }
229 
230         final String familiesArgKey = "--families=";
231         if (cmd.startsWith(familiesArgKey)) {
232           families = cmd.substring(familiesArgKey.length());
233           continue;
234         }
235 
236         if (cmd.startsWith("--all.cells")) {
237           allCells = true;
238           continue;
239         }
240 
241         if (i == args.length-1) {
242           tableName = cmd;
243         } else {
244           printUsage("Invalid argument '" + cmd + "'" );
245           return false;
246         }
247       }
248       if (newTableName == null && peerAddress == null) {
249         printUsage("At least a new table name or a " +
250             "peer address must be specified");
251         return false;
252       }
253       if ((endTime != 0) && (startTime > endTime)) {
254         printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
255         return false;
256       }
257     } catch (Exception e) {
258       e.printStackTrace();
259       printUsage("Can't start because " + e.getMessage());
260       return false;
261     }
262     return true;
263   }
264 
265   /**
266    * Main entry point.
267    *
268    * @param args  The command line parameters.
269    * @throws Exception When running the job fails.
270    */
271   public static void main(String[] args) throws Exception {
272     int ret = ToolRunner.run(new CopyTable(HBaseConfiguration.create()), args);
273     System.exit(ret);
274   }
275 
276   @Override
277   public int run(String[] args) throws Exception {
278     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
279     Job job = createSubmittableJob(getConf(), otherArgs);
280     if (job == null) return 1;
281     return job.waitForCompletion(true) ? 0 : 1;
282   }
283 }