View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import org.apache.hadoop.classification.InterfaceAudience;
22  import org.apache.hadoop.classification.InterfaceStability;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.conf.Configured;
25  import org.apache.hadoop.hbase.HBaseConfiguration;
26  import org.apache.hadoop.hbase.HConstants;
27  import org.apache.hadoop.hbase.util.Bytes;
28  import org.apache.hadoop.hbase.client.Scan;
29  import org.apache.hadoop.mapreduce.Job;
30  import org.apache.hadoop.util.GenericOptionsParser;
31  import org.apache.hadoop.util.Tool;
32  import org.apache.hadoop.util.ToolRunner;
33  
34  import java.io.IOException;
35  import java.util.HashMap;
36  import java.util.Map;
37  
38  /**
39   * Tool used to copy a table to another one which can be on a different setup.
40   * It is also configurable with a start and time as well as a specification
41   * of the region server implementation if different from the local cluster.
42   */
43  @InterfaceAudience.Public
44  @InterfaceStability.Stable
45  public class CopyTable extends Configured implements Tool {
46  
47    final static String NAME = "copytable";
48    static long startTime = 0;
49    static long endTime = 0;
50    static int versions = -1;
51    static String tableName = null;
52    static String startRow = null;
53    static String stopRow = null;
54    static String newTableName = null;
55    static String peerAddress = null;
56    static String families = null;
57    static boolean allCells = false;
58    
59    public CopyTable(Configuration conf) {
60      super(conf);
61    }
62    /**
63     * Sets up the actual job.
64     *
65     * @param conf  The current configuration.
66     * @param args  The command line parameters.
67     * @return The newly created job.
68     * @throws IOException When setting up the job fails.
69     */
70    public static Job createSubmittableJob(Configuration conf, String[] args)
71    throws IOException {
72      if (!doCommandLine(args)) {
73        return null;
74      }
75      Job job = new Job(conf, NAME + "_" + tableName);
76      job.setJarByClass(CopyTable.class);
77      Scan scan = new Scan();
78      scan.setCacheBlocks(false);
79      if (startTime != 0) {
80        scan.setTimeRange(startTime,
81            endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
82      }
83      if (allCells) {
84        scan.setRaw(true);
85      }
86      if (versions >= 0) {
87        scan.setMaxVersions(versions);
88      }
89      
90      if (startRow != null) {
91        scan.setStartRow(Bytes.toBytes(startRow));
92      }
93      
94      if (stopRow != null) {
95        scan.setStopRow(Bytes.toBytes(stopRow));
96      }
97      
98      if(families != null) {
99        String[] fams = families.split(",");
100       Map<String,String> cfRenameMap = new HashMap<String,String>();
101       for(String fam : fams) {
102         String sourceCf;
103         if(fam.contains(":")) { 
104             // fam looks like "sourceCfName:destCfName"
105             String[] srcAndDest = fam.split(":", 2);
106             sourceCf = srcAndDest[0];
107             String destCf = srcAndDest[1];
108             cfRenameMap.put(sourceCf, destCf);
109         } else {
110             // fam is just "sourceCf"
111             sourceCf = fam; 
112         }
113         scan.addFamily(Bytes.toBytes(sourceCf));
114       }
115       Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
116     }
117     TableMapReduceUtil.initTableMapperJob(tableName, scan,
118         Import.Importer.class, null, null, job);
119     TableMapReduceUtil.initTableReducerJob(
120         newTableName == null ? tableName : newTableName, null, job,
121         null, peerAddress, null, null);
122     job.setNumReduceTasks(0);
123     return job;
124   }
125 
126   /*
127    * @param errorMsg Error message.  Can be null.
128    */
129   private static void printUsage(final String errorMsg) {
130     if (errorMsg != null && errorMsg.length() > 0) {
131       System.err.println("ERROR: " + errorMsg);
132     }
133     System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
134         "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
135     System.err.println();
136     System.err.println("Options:");
137     System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
138     System.err.println("              specify if different from current cluster");
139     System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
140     System.err.println(" startrow     the start row");
141     System.err.println(" stoprow      the stop row");
142     System.err.println(" starttime    beginning of the time range (unixtime in millis)");
143     System.err.println("              without endtime means from starttime to forever");
144     System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
145     System.err.println(" versions     number of cell versions to copy");
146     System.err.println(" new.name     new table's name");
147     System.err.println(" peer.adr     Address of the peer cluster given in the format");
148     System.err.println("              hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent");
149     System.err.println(" families     comma-separated list of families to copy");
150     System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
151     System.err.println("              To keep the same name, just give \"cfName\"");
152     System.err.println(" all.cells    also copy delete markers and deleted cells");
153     System.err.println();
154     System.err.println("Args:");
155     System.err.println(" tablename    Name of the table to copy");
156     System.err.println();
157     System.err.println("Examples:");
158     System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
159     System.err.println(" $ bin/hbase " +
160         "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
161         "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
162     System.err.println("For performance consider the following general options:\n"
163         + "-Dhbase.client.scanner.caching=100\n"
164         + "-Dmapreduce.map.speculative=false");
165   }
166 
167   private static boolean doCommandLine(final String[] args) {
168     // Process command-line args. TODO: Better cmd-line processing
169     // (but hopefully something not as painful as cli options).
170     if (args.length < 1) {
171       printUsage(null);
172       return false;
173     }
174     try {
175       for (int i = 0; i < args.length; i++) {
176         String cmd = args[i];
177         if (cmd.equals("-h") || cmd.startsWith("--h")) {
178           printUsage(null);
179           return false;
180         }
181         
182         final String startRowArgKey = "--startrow=";
183         if (cmd.startsWith(startRowArgKey)) {
184           startRow = cmd.substring(startRowArgKey.length());
185           continue;
186         }
187         
188         final String stopRowArgKey = "--stoprow=";
189         if (cmd.startsWith(stopRowArgKey)) {
190           stopRow = cmd.substring(stopRowArgKey.length());
191           continue;
192         }
193         
194         final String startTimeArgKey = "--starttime=";
195         if (cmd.startsWith(startTimeArgKey)) {
196           startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
197           continue;
198         }
199 
200         final String endTimeArgKey = "--endtime=";
201         if (cmd.startsWith(endTimeArgKey)) {
202           endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
203           continue;
204         }
205 
206         final String versionsArgKey = "--versions=";
207         if (cmd.startsWith(versionsArgKey)) {
208           versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
209           continue;
210         }
211 
212         final String newNameArgKey = "--new.name=";
213         if (cmd.startsWith(newNameArgKey)) {
214           newTableName = cmd.substring(newNameArgKey.length());
215           continue;
216         }
217 
218         final String peerAdrArgKey = "--peer.adr=";
219         if (cmd.startsWith(peerAdrArgKey)) {
220           peerAddress = cmd.substring(peerAdrArgKey.length());
221           continue;
222         }
223 
224         final String familiesArgKey = "--families=";
225         if (cmd.startsWith(familiesArgKey)) {
226           families = cmd.substring(familiesArgKey.length());
227           continue;
228         }
229 
230         if (cmd.startsWith("--all.cells")) {
231           allCells = true;
232           continue;
233         }
234 
235         if (i == args.length-1) {
236           tableName = cmd;
237         } else {
238           printUsage("Invalid argument '" + cmd + "'" );
239           return false;
240         }
241       }
242       if (newTableName == null && peerAddress == null) {
243         printUsage("At least a new table name or a " +
244             "peer address must be specified");
245         return false;
246       }
247       if (startTime > endTime) {
248         printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
249         return false;
250       }
251     } catch (Exception e) {
252       e.printStackTrace();
253       printUsage("Can't start because " + e.getMessage());
254       return false;
255     }
256     return true;
257   }
258 
259   /**
260    * Main entry point.
261    *
262    * @param args  The command line parameters.
263    * @throws Exception When running the job fails.
264    */
265   public static void main(String[] args) throws Exception {
266     int ret = ToolRunner.run(new CopyTable(HBaseConfiguration.create()), args);
267     System.exit(ret);
268   }
269 
270   @Override
271   public int run(String[] args) throws Exception {
272     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
273     Job job = createSubmittableJob(getConf(), otherArgs);
274     if (job == null) return 1;
275     return job.waitForCompletion(true) ? 0 : 1;
276   }
277 }