1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import org.apache.hadoop.hbase.classification.InterfaceAudience;
22 import org.apache.hadoop.hbase.classification.InterfaceStability;
23
24 import java.io.IOException;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Random;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.conf.Configured;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HBaseConfiguration;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.client.Connection;
39 import org.apache.hadoop.hbase.client.ConnectionFactory;
40 import org.apache.hadoop.hbase.client.Scan;
41 import org.apache.hadoop.hbase.client.Table;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.mapreduce.Job;
44 import org.apache.hadoop.util.GenericOptionsParser;
45 import org.apache.hadoop.util.Tool;
46 import org.apache.hadoop.util.ToolRunner;
47
48
49
50
51
52
53 @InterfaceAudience.Public
54 @InterfaceStability.Stable
55 public class CopyTable extends Configured implements Tool {
56 private static final Log LOG = LogFactory.getLog(CopyTable.class);
57
58 final static String NAME = "copytable";
59 long startTime = 0;
60 long endTime = 0;
61 int versions = -1;
62 String tableName = null;
63 String startRow = null;
64 String stopRow = null;
65 String dstTableName = null;
66 String peerAddress = null;
67 String families = null;
68 boolean allCells = false;
69 static boolean shuffle = false;
70
71 boolean bulkload = false;
72 Path bulkloadDir = null;
73
74 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
75
76 public CopyTable(Configuration conf) {
77 super(conf);
78 }
79
80
81
82
83
84
85
86 public Job createSubmittableJob(String[] args)
87 throws IOException {
88 if (!doCommandLine(args)) {
89 return null;
90 }
91
92 Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
93
94 job.setJarByClass(CopyTable.class);
95 Scan scan = new Scan();
96 scan.setCacheBlocks(false);
97 if (startTime != 0) {
98 scan.setTimeRange(startTime,
99 endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
100 }
101 if (allCells) {
102 scan.setRaw(true);
103 }
104 if (shuffle) {
105 job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
106 }
107 if (versions >= 0) {
108 scan.setMaxVersions(versions);
109 }
110
111 if (startRow != null) {
112 scan.setStartRow(Bytes.toBytes(startRow));
113 }
114
115 if (stopRow != null) {
116 scan.setStopRow(Bytes.toBytes(stopRow));
117 }
118
119 if(families != null) {
120 String[] fams = families.split(",");
121 Map<String,String> cfRenameMap = new HashMap<String,String>();
122 for(String fam : fams) {
123 String sourceCf;
124 if(fam.contains(":")) {
125
126 String[] srcAndDest = fam.split(":", 2);
127 sourceCf = srcAndDest[0];
128 String destCf = srcAndDest[1];
129 cfRenameMap.put(sourceCf, destCf);
130 } else {
131
132 sourceCf = fam;
133 }
134 scan.addFamily(Bytes.toBytes(sourceCf));
135 }
136 Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
137 }
138 job.setNumReduceTasks(0);
139
140 if (bulkload) {
141 TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
142 null, job);
143
144
145 TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
146
147 FileSystem fs = FileSystem.get(getConf());
148 Random rand = new Random();
149 Path root = new Path(fs.getWorkingDirectory(), "copytable");
150 fs.mkdirs(root);
151 while (true) {
152 bulkloadDir = new Path(root, "" + rand.nextLong());
153 if (!fs.exists(bulkloadDir)) {
154 break;
155 }
156 }
157
158 System.out.println("HFiles will be stored at " + this.bulkloadDir);
159 HFileOutputFormat2.setOutputPath(job, bulkloadDir);
160 try (Connection conn = ConnectionFactory.createConnection(getConf());
161 Table htable = conn.getTable(TableName.valueOf(dstTableName))) {
162 HFileOutputFormat2.configureIncrementalLoadMap(job, htable);
163 }
164 } else {
165 TableMapReduceUtil.initTableMapperJob(tableName, scan,
166 Import.Importer.class, null, null, job);
167
168 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
169 null);
170 }
171
172 return job;
173 }
174
175
176
177
178 private static void printUsage(final String errorMsg) {
179 if (errorMsg != null && errorMsg.length() > 0) {
180 System.err.println("ERROR: " + errorMsg);
181 }
182 System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
183 "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
184 System.err.println();
185 System.err.println("Options:");
186 System.err.println(" rs.class hbase.regionserver.class of the peer cluster");
187 System.err.println(" specify if different from current cluster");
188 System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster");
189 System.err.println(" startrow the start row");
190 System.err.println(" stoprow the stop row");
191 System.err.println(" starttime beginning of the time range (unixtime in millis)");
192 System.err.println(" without endtime means from starttime to forever");
193 System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
194 System.err.println(" versions number of cell versions to copy");
195 System.err.println(" new.name new table's name");
196 System.err.println(" peer.adr Address of the peer cluster given in the format");
197 System.err.println(" hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent");
198 System.err.println(" families comma-separated list of families to copy");
199 System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. ");
200 System.err.println(" To keep the same name, just give \"cfName\"");
201 System.err.println(" all.cells also copy delete markers and deleted cells");
202 System.err.println(" bulkload Write input into HFiles and bulk load to the destination "
203 + "table");
204 System.err.println();
205 System.err.println("Args:");
206 System.err.println(" tablename Name of the table to copy");
207 System.err.println();
208 System.err.println("Examples:");
209 System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
210 System.err.println(" $ bin/hbase " +
211 "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
212 "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
213 System.err.println("For performance consider the following general option:\n"
214 + " It is recommended that you set the following to >=100. A higher value uses more memory but\n"
215 + " decreases the round trip time to the server and may increase performance.\n"
216 + " -Dhbase.client.scanner.caching=100\n"
217 + " The following should always be set to false, to prevent writing data twice, which may produce \n"
218 + " inaccurate results.\n"
219 + " -Dmapreduce.map.speculative=false");
220 }
221
222 private boolean doCommandLine(final String[] args) {
223
224
225 if (args.length < 1) {
226 printUsage(null);
227 return false;
228 }
229 try {
230 for (int i = 0; i < args.length; i++) {
231 String cmd = args[i];
232 if (cmd.equals("-h") || cmd.startsWith("--h")) {
233 printUsage(null);
234 return false;
235 }
236
237 final String startRowArgKey = "--startrow=";
238 if (cmd.startsWith(startRowArgKey)) {
239 startRow = cmd.substring(startRowArgKey.length());
240 continue;
241 }
242
243 final String stopRowArgKey = "--stoprow=";
244 if (cmd.startsWith(stopRowArgKey)) {
245 stopRow = cmd.substring(stopRowArgKey.length());
246 continue;
247 }
248
249 final String startTimeArgKey = "--starttime=";
250 if (cmd.startsWith(startTimeArgKey)) {
251 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
252 continue;
253 }
254
255 final String endTimeArgKey = "--endtime=";
256 if (cmd.startsWith(endTimeArgKey)) {
257 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
258 continue;
259 }
260
261 final String versionsArgKey = "--versions=";
262 if (cmd.startsWith(versionsArgKey)) {
263 versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
264 continue;
265 }
266
267 final String newNameArgKey = "--new.name=";
268 if (cmd.startsWith(newNameArgKey)) {
269 dstTableName = cmd.substring(newNameArgKey.length());
270 continue;
271 }
272
273 final String peerAdrArgKey = "--peer.adr=";
274 if (cmd.startsWith(peerAdrArgKey)) {
275 peerAddress = cmd.substring(peerAdrArgKey.length());
276 continue;
277 }
278
279 final String familiesArgKey = "--families=";
280 if (cmd.startsWith(familiesArgKey)) {
281 families = cmd.substring(familiesArgKey.length());
282 continue;
283 }
284
285 if (cmd.startsWith("--all.cells")) {
286 allCells = true;
287 continue;
288 }
289
290 if (cmd.startsWith("--bulkload")) {
291 bulkload = true;
292 continue;
293 }
294
295 if (cmd.startsWith("--shuffle")) {
296 shuffle = true;
297 continue;
298 }
299
300 if (i == args.length-1) {
301 tableName = cmd;
302 } else {
303 printUsage("Invalid argument '" + cmd + "'" );
304 return false;
305 }
306 }
307 if (dstTableName == null && peerAddress == null) {
308 printUsage("At least a new table name or a " +
309 "peer address must be specified");
310 return false;
311 }
312 if ((endTime != 0) && (startTime > endTime)) {
313 printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime);
314 return false;
315 }
316
317 if (bulkload && peerAddress != null) {
318 printUsage("Remote bulkload is not supported!");
319 return false;
320 }
321
322
323 if (dstTableName == null) {
324 dstTableName = tableName;
325 }
326 } catch (Exception e) {
327 e.printStackTrace();
328 printUsage("Can't start because " + e.getMessage());
329 return false;
330 }
331 return true;
332 }
333
334
335
336
337
338
339
340 public static void main(String[] args) throws Exception {
341 int ret = ToolRunner.run(new CopyTable(HBaseConfiguration.create()), args);
342 System.exit(ret);
343 }
344
345 @Override
346 public int run(String[] args) throws Exception {
347 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
348 Job job = createSubmittableJob(otherArgs);
349 if (job == null) return 1;
350 if (!job.waitForCompletion(true)) {
351 LOG.info("Map-reduce job failed!");
352 if (bulkload) {
353 LOG.info("Files are not bulkloaded!");
354 }
355 return 1;
356 }
357 int code = 0;
358 if (bulkload) {
359 code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(),
360 this.dstTableName});
361 if (code == 0) {
362
363
364 FileSystem fs = FileSystem.get(this.getConf());
365 if (!fs.delete(this.bulkloadDir, true)) {
366 LOG.error("Deleting folder " + bulkloadDir + " failed!");
367 code = 1;
368 }
369 }
370 }
371 return code;
372 }
373 }