1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22 import java.util.HashMap;
23 import java.util.Map;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.hbase.classification.InterfaceStability;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HBaseConfiguration;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.BufferedMutator;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.Delete;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Mutation;
38 import org.apache.hadoop.hbase.client.Put;
39 import org.apache.hadoop.hbase.client.Durability;
40 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.mapreduce.JobContext;
43 import org.apache.hadoop.mapreduce.OutputCommitter;
44 import org.apache.hadoop.mapreduce.OutputFormat;
45 import org.apache.hadoop.mapreduce.RecordWriter;
46 import org.apache.hadoop.mapreduce.TaskAttemptContext;
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 @InterfaceAudience.Public
65 @InterfaceStability.Stable
66 public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Mutation> {
67
68 public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
69
70 public static final boolean WAL_ON = true;
71
72 public static final boolean WAL_OFF = false;
73
74
75
76 protected static class MultiTableRecordWriter extends
77 RecordWriter<ImmutableBytesWritable, Mutation> {
78 private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class);
79 Connection connection;
80 Map<ImmutableBytesWritable, BufferedMutator> mutatorMap = new HashMap<>();
81 Configuration conf;
82 boolean useWriteAheadLogging;
83
84
85
86
87
88
89
90
91 public MultiTableRecordWriter(Configuration conf,
92 boolean useWriteAheadLogging) {
93 LOG.debug("Created new MultiTableRecordReader with WAL "
94 + (useWriteAheadLogging ? "on" : "off"));
95 this.conf = conf;
96 this.useWriteAheadLogging = useWriteAheadLogging;
97 }
98
99
100
101
102
103
104
105
106 BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException {
107 if(this.connection == null){
108 this.connection = ConnectionFactory.createConnection(conf);
109 }
110 if (!mutatorMap.containsKey(tableName)) {
111 LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
112
113 BufferedMutator mutator =
114 connection.getBufferedMutator(TableName.valueOf(tableName.get()));
115 mutatorMap.put(tableName, mutator);
116 }
117 return mutatorMap.get(tableName);
118 }
119
120 @Override
121 public void close(TaskAttemptContext context) throws IOException {
122 for (BufferedMutator mutator : mutatorMap.values()) {
123 mutator.close();
124 }
125 if (connection != null) {
126 connection.close();
127 }
128 }
129
130
131
132
133
134
135
136
137
138
139
140 @Override
141 public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
142 BufferedMutator mutator = getBufferedMutator(tableName);
143
144 if (action instanceof Put) {
145 Put put = new Put((Put) action);
146 put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL
147 : Durability.SKIP_WAL);
148 mutator.mutate(put);
149 } else if (action instanceof Delete) {
150 Delete delete = new Delete((Delete) action);
151 mutator.mutate(delete);
152 } else
153 throw new IllegalArgumentException(
154 "action must be either Delete or Put");
155 }
156 }
157
158 @Override
159 public void checkOutputSpecs(JobContext context) throws IOException,
160 InterruptedException {
161
162
163 }
164
165 @Override
166 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
167 throws IOException, InterruptedException {
168 return new TableOutputCommitter();
169 }
170
171 @Override
172 public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context)
173 throws IOException, InterruptedException {
174 Configuration conf = context.getConfiguration();
175 return new MultiTableRecordWriter(HBaseConfiguration.create(conf),
176 conf.getBoolean(WAL_PROPERTY, WAL_ON));
177 }
178
179 }