View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import org.apache.hadoop.classification.InterfaceAudience;
23  import org.apache.hadoop.classification.InterfaceStability;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.conf.Configured;
26  import org.apache.hadoop.hbase.HBaseConfiguration;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.util.Bytes;
29  import org.apache.hadoop.hbase.client.Scan;
30  import org.apache.hadoop.mapreduce.Job;
31  import org.apache.hadoop.util.GenericOptionsParser;
32  import org.apache.hadoop.util.Tool;
33  import org.apache.hadoop.util.ToolRunner;
34  
35  import java.io.IOException;
36  import java.util.HashMap;
37  import java.util.Map;
38  
39  /**
40   * Tool used to copy a table to another one which can be on a different setup.
41   * It is also configurable with a start and time as well as a specification
42   * of the region server implementation if different from the local cluster.
43   */
44  public class CopyTable extends Configured implements Tool {
45  
46    final static String NAME = "copytable";
47    static long startTime = 0;
48    static long endTime = 0;
49    static int versions = -1;
50    static String tableName = null;
51    static String startRow = null;
52    static String stopRow = null;
53    static String newTableName = null;
54    static String peerAddress = null;
55    static String families = null;
56    static boolean allCells = false;
57    
58    public CopyTable(Configuration conf) {
59      super(conf);
60    }
61    /**
62     * Sets up the actual job.
63     *
64     * @param conf  The current configuration.
65     * @param args  The command line parameters.
66     * @return The newly created job.
67     * @throws IOException When setting up the job fails.
68     */
69    public static Job createSubmittableJob(Configuration conf, String[] args)
70    throws IOException {
71      if (!doCommandLine(args)) {
72        return null;
73      }
74      Job job = new Job(conf, NAME + "_" + tableName);
75      job.setJarByClass(CopyTable.class);
76      Scan scan = new Scan();
77      scan.setCacheBlocks(false);
78      if (startTime != 0) {
79        scan.setTimeRange(startTime,
80            endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
81      }
82      if (allCells) {
83        scan.setRaw(true);
84      }
85      if (versions >= 0) {
86        scan.setMaxVersions(versions);
87      }
88      
89      if (startRow != null) {
90        scan.setStartRow(Bytes.toBytes(startRow));
91      }
92      
93      if (stopRow != null) {
94        scan.setStopRow(Bytes.toBytes(stopRow));
95      }
96      
97      if(families != null) {
98        String[] fams = families.split(",");
99        Map<String,String> cfRenameMap = new HashMap<String,String>();
100       for(String fam : fams) {
101         String sourceCf;
102         if(fam.contains(":")) { 
103             // fam looks like "sourceCfName:destCfName"
104             String[] srcAndDest = fam.split(":", 2);
105             sourceCf = srcAndDest[0];
106             String destCf = srcAndDest[1];
107             cfRenameMap.put(sourceCf, destCf);
108         } else {
109             // fam is just "sourceCf"
110             sourceCf = fam; 
111         }
112         scan.addFamily(Bytes.toBytes(sourceCf));
113       }
114       Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
115     }
116     TableMapReduceUtil.initTableMapperJob(tableName, scan,
117         Import.Importer.class, null, null, job);
118     TableMapReduceUtil.initTableReducerJob(
119         newTableName == null ? tableName : newTableName, null, job,
120         null, peerAddress, null, null);
121     job.setNumReduceTasks(0);
122     return job;
123   }
124 
125   /*
126    * @param errorMsg Error message.  Can be null.
127    */
128   private static void printUsage(final String errorMsg) {
129     if (errorMsg != null && errorMsg.length() > 0) {
130       System.err.println("ERROR: " + errorMsg);
131     }
132     System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
133         "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
134     System.err.println();
135     System.err.println("Options:");
136     System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
137     System.err.println("              specify if different from current cluster");
138     System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
139     System.err.println(" startrow     the start row");
140     System.err.println(" stoprow      the stop row");
141     System.err.println(" starttime    beginning of the time range (unixtime in millis)");
142     System.err.println("              without endtime means from starttime to forever");
143     System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
144     System.err.println(" versions     number of cell versions to copy");
145     System.err.println(" new.name     new table's name");
146     System.err.println(" peer.adr     Address of the peer cluster given in the format");
147     System.err.println("              hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent");
148     System.err.println(" families     comma-separated list of families to copy");
149     System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
150     System.err.println("              To keep the same name, just give \"cfName\"");
151     System.err.println(" all.cells    also copy delete markers and deleted cells");
152     System.err.println();
153     System.err.println("Args:");
154     System.err.println(" tablename    Name of the table to copy");
155     System.err.println();
156     System.err.println("Examples:");
157     System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
158     System.err.println(" $ bin/hbase " +
159         "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
160         "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
161     System.err.println("For performance consider the following general options:\n"
162         + "-Dhbase.client.scanner.caching=100\n"
163         + "-Dmapred.map.tasks.speculative.execution=false");
164   }
165 
166   private static boolean doCommandLine(final String[] args) {
167     // Process command-line args. TODO: Better cmd-line processing
168     // (but hopefully something not as painful as cli options).
169     if (args.length < 1) {
170       printUsage(null);
171       return false;
172     }
173     try {
174       for (int i = 0; i < args.length; i++) {
175         String cmd = args[i];
176         if (cmd.equals("-h") || cmd.startsWith("--h")) {
177           printUsage(null);
178           return false;
179         }
180         
181         final String startRowArgKey = "--startrow=";
182         if (cmd.startsWith(startRowArgKey)) {
183           startRow = cmd.substring(startRowArgKey.length());
184           continue;
185         }
186         
187         final String stopRowArgKey = "--stoprow=";
188         if (cmd.startsWith(stopRowArgKey)) {
189           stopRow = cmd.substring(stopRowArgKey.length());
190           continue;
191         }
192         
193         final String startTimeArgKey = "--starttime=";
194         if (cmd.startsWith(startTimeArgKey)) {
195           startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
196           continue;
197         }
198 
199         final String endTimeArgKey = "--endtime=";
200         if (cmd.startsWith(endTimeArgKey)) {
201           endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
202           continue;
203         }
204 
205         final String versionsArgKey = "--versions=";
206         if (cmd.startsWith(versionsArgKey)) {
207           versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
208           continue;
209         }
210 
211         final String newNameArgKey = "--new.name=";
212         if (cmd.startsWith(newNameArgKey)) {
213           newTableName = cmd.substring(newNameArgKey.length());
214           continue;
215         }
216 
217         final String peerAdrArgKey = "--peer.adr=";
218         if (cmd.startsWith(peerAdrArgKey)) {
219           peerAddress = cmd.substring(peerAdrArgKey.length());
220           continue;
221         }
222 
223         final String familiesArgKey = "--families=";
224         if (cmd.startsWith(familiesArgKey)) {
225           families = cmd.substring(familiesArgKey.length());
226           continue;
227         }
228 
229         if (cmd.startsWith("--all.cells")) {
230           allCells = true;
231           continue;
232         }
233 
234         if (i == args.length-1) {
235           tableName = cmd;
236         } else {
237           printUsage("Invalid argument '" + cmd + "'" );
238           return false;
239         }
240       }
241       if (newTableName == null && peerAddress == null) {
242         printUsage("At least a new table name or a " +
243             "peer address must be specified");
244         return false;
245       }
246       if (startTime > endTime) {
247         printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
248         return false;
249       }
250     } catch (Exception e) {
251       e.printStackTrace();
252       printUsage("Can't start because " + e.getMessage());
253       return false;
254     }
255     return true;
256   }
257 
258   /**
259    * Main entry point.
260    *
261    * @param args  The command line parameters.
262    * @throws Exception When running the job fails.
263    */
264   public static void main(String[] args) throws Exception {
265     int ret = ToolRunner.run(new CopyTable(HBaseConfiguration.create()), args);
266     System.exit(ret);
267   }
268 
269   @Override
270   public int run(String[] args) throws Exception {
271     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
272     Job job = createSubmittableJob(getConf(), otherArgs);
273     if (job == null) return 1;
274     return job.waitForCompletion(true) ? 0 : 1;
275   }
276 }