View Javadoc

1   /**
2   * Copyright 2009 The Apache Software Foundation
3   *
4   * Licensed to the Apache Software Foundation (ASF) under one
5   * or more contributor license agreements.  See the NOTICE file
6   * distributed with this work for additional information
7   * regarding copyright ownership.  The ASF licenses this file
8   * to you under the Apache License, Version 2.0 (the
9   * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.IOException;
23  
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.fs.Path;
26  import org.apache.hadoop.hbase.HBaseConfiguration;
27  import org.apache.hadoop.hbase.client.Result;
28  import org.apache.hadoop.hbase.client.Scan;
29  import org.apache.hadoop.hbase.filter.PrefixFilter;
30  import org.apache.hadoop.hbase.filter.RowFilter;
31  import org.apache.hadoop.hbase.filter.RegexStringComparator;
32  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
33  import org.apache.hadoop.hbase.filter.Filter;
34  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.apache.hadoop.mapreduce.Job;
37  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
38  import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
39  import org.apache.hadoop.util.GenericOptionsParser;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  
43  /**
44  * Export an HBase table.
45  * Writes content to sequence files up in HDFS.  Use {@link Import} to read it
46  * back in again.
47  */
48  public class Export {
49    private static final Log LOG = LogFactory.getLog(Export.class);
50    final static String NAME = "export";
51    final static String RAW_SCAN="hbase.mapreduce.include.deleted.rows";
52  
53    /**
54     * Mapper.
55     */
56    static class Exporter
57    extends TableMapper<ImmutableBytesWritable, Result> {
58      /**
59       * @param row  The current table row key.
60       * @param value  The columns.
61       * @param context  The current context.
62       * @throws IOException When something is broken with the data.
63       * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
64       *   org.apache.hadoop.mapreduce.Mapper.Context)
65       */
66      @Override
67      public void map(ImmutableBytesWritable row, Result value,
68        Context context)
69      throws IOException {
70        try {
71          context.write(row, value);
72        } catch (InterruptedException e) {
73          e.printStackTrace();
74        }
75      }
76    }
77  
78    /**
79     * Sets up the actual job.
80     *
81     * @param conf  The current configuration.
82     * @param args  The command line parameters.
83     * @return The newly created job.
84     * @throws IOException When setting up the job fails.
85     */
86    public static Job createSubmittableJob(Configuration conf, String[] args)
87    throws IOException {
88      String tableName = args[0];
89      Path outputDir = new Path(args[1]);
90      Job job = new Job(conf, NAME + "_" + tableName);
91      job.setJobName(NAME + "_" + tableName);
92      job.setJarByClass(Exporter.class);
93      // Set optional scan parameters
94      Scan s = getConfiguredScanForJob(conf, args);
95      TableMapReduceUtil.initTableMapperJob(tableName, s, Exporter.class, null,
96        null, job);
97      // No reducers.  Just write straight to output files.
98      job.setNumReduceTasks(0);
99      job.setOutputFormatClass(SequenceFileOutputFormat.class);
100     job.setOutputKeyClass(ImmutableBytesWritable.class);
101     job.setOutputValueClass(Result.class);
102     FileOutputFormat.setOutputPath(job, outputDir);
103     return job;
104   }
105 
106   private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
107     Scan s = new Scan();
108     // Optional arguments.
109     // Set Scan Versions
110     int versions = args.length > 2? Integer.parseInt(args[2]): 1;
111     s.setMaxVersions(versions);
112     // Set Scan Range
113     long startTime = args.length > 3? Long.parseLong(args[3]): 0L;
114     long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE;
115     s.setTimeRange(startTime, endTime);
116     // Set cache blocks
117     s.setCacheBlocks(false);
118     // Set Scan Column Family
119     boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
120     if (raw) {
121       s.setRaw(raw);
122     }
123     
124     if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
125       s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
126     }
127     // Set RowFilter or Prefix Filter if applicable.
128     Filter exportFilter = getExportFilter(args);
129     if (exportFilter!= null) {
130         LOG.info("Setting Scan Filter for Export.");
131       s.setFilter(exportFilter);
132     }
133     LOG.info("versions=" + versions + ", starttime=" + startTime +
134       ", endtime=" + endTime + ", keepDeletedCells=" + raw);
135     return s;
136   }
137 
138   private static Filter getExportFilter(String[] args) {
139     Filter exportFilter = null;
140     String filterCriteria = (args.length > 5) ? args[5]: null;
141     if (filterCriteria == null) return null;
142     if (filterCriteria.startsWith("^")) {
143       String regexPattern = filterCriteria.substring(1, filterCriteria.length());
144       exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern));
145     } else {
146       exportFilter = new PrefixFilter(Bytes.toBytes(filterCriteria));
147     }
148     return exportFilter;
149   }
150 
151   /*
152    * @param errorMsg Error message.  Can be null.
153    */
154   private static void usage(final String errorMsg) {
155     if (errorMsg != null && errorMsg.length() > 0) {
156       System.err.println("ERROR: " + errorMsg);
157     }
158     System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
159       "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n");
160     System.err.println("  Note: -D properties will be applied to the conf used. ");
161     System.err.println("  For example: ");
162     System.err.println("   -D mapred.output.compress=true");
163     System.err.println("   -D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec");
164     System.err.println("   -D mapred.output.compression.type=BLOCK");
165     System.err.println("  Additionally, the following SCAN properties can be specified");
166     System.err.println("  to control/limit what is exported..");
167     System.err.println("   -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
168     System.err.println("   -D " + RAW_SCAN + "=true");
169     System.err.println("For performance consider the following properties:\n"
170         + "   -Dhbase.client.scanner.caching=100\n"
171         + "   -Dmapred.map.tasks.speculative.execution=false\n"
172         + "   -Dmapred.reduce.tasks.speculative.execution=false");
173   }
174 
175   /**
176    * Main entry point.
177    *
178    * @param args  The command line parameters.
179    * @throws Exception When running the job fails.
180    */
181   public static void main(String[] args) throws Exception {
182     Configuration conf = HBaseConfiguration.create();
183     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
184     if (otherArgs.length < 2) {
185       usage("Wrong number of arguments: " + otherArgs.length);
186       System.exit(-1);
187     }
188     Job job = createSubmittableJob(conf, otherArgs);
189     System.exit(job.waitForCompletion(true)? 0 : 1);
190   }
191 }